ThreadPoolExecutor提供了四个构造方法:
1 2 3 4 5 6 7 8 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
1 2 3 4 5 6 7 8 9 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
1 2 3 4 5 6 7 8 9 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
参数说明:
序号
名称
类型
含义
1
corePoolSize
int
核心线程池大小
2
maximumPoolSize
int
最大线程池大小
3
keepAliveTime
long
线程最大空闲时间
4
unit
TimeUnit
时间单位
5
workQueue
BlockingQueue
线程等待队列
6
threadFactory
ThreadFactory
线程创建工厂
7
handler
RejectedExecutionHandler
拒绝策略
线程池的构建 一、预定义线程池
FixedThreadPool
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
FixedThreadPool的任务执行是无序的;
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
CachedThreadPool
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
keepAliveTime = 60s,线程空闲60s后自动结束。
workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。
SingleThreadExecutor
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
咋一瞅,不就是newFixedThreadPool(1)吗?定眼一看,这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢,写个dome来解释一下:
**
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1 ); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService; System.out.println(threadPoolExecutor.getMaximumPoolSize()); threadPoolExecutor.setCorePoolSize(8 ); ExecutorService singleExecutorService = Executors.newSingleThreadExecutor(); }
对比可以看出,FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
ScheduledThreadPool
1 2 3 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); }
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
1 2 3 4 public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
对于ScheduledThreadPool本文不做描述,其特性请关注后续篇章。
二、自定义线程池 以下是自定义线程池,使用了有界队列,自定义ThreadFactory和拒绝策略的demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class ThreadTest { public static void main (String[] args) throws InterruptedException, IOException { int corePoolSize = 2 ; int maximumPoolSize = 4 ; long keepAliveTime = 10 ; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue <>(2 ); ThreadFactory threadFactory = new NameTreadFactory (); RejectedExecutionHandler handler = new MyIgnorePolicy (); ThreadPoolExecutor executor = new ThreadPoolExecutor (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); executor.prestartAllCoreThreads(); for (int i = 1 ; i <= 10 ; i++) { MyTask task = new MyTask (String.valueOf(i)); executor.execute(task); } System.in.read(); } static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger (1 ); @Override public Thread newThread (Runnable r) { Thread t = new Thread (r, "my-thread-" + mThreadNum.getAndIncrement()); System.out.println(t.getName() + " has been created" ); return t; } } public static class MyIgnorePolicy implements RejectedExecutionHandler { public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { doLog(r, e); } private void doLog (Runnable r, ThreadPoolExecutor e) { System.err.println( r.toString() + " rejected" ); } } static class MyTask implements Runnable { private String name; public MyTask (String name) { this .name = name; } @Override public void run () { try { System.out.println(this .toString() + " is running!" ); Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } } public String getName () { return name; } @Override public String toString () { return "MyTask [name=" + name + "]" ; } } }
结果:
由于线程预启动,首先创建了1,2号线程,然后task1,task2被执行;
但任务提交没有结束,此时任务task3,task6到达发现核心线程已经满了,进入等待队列;
等待队列满后创建任务线程3,4执行任务task3,task6,同时task4,task5进入队列;
此时创建线程数(4)等于最大线程数,且队列已满,所以7,8,9,10任务被拒绝;
任务执行完毕后回头来执行task4,task5,队列清空。
总结,通过自定义线程池,我们可以更好的让线程池为我们所用,更加适应我的实际场景。