线程池基本参数解析
一、线程池构造方法参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
corePoolSize: 核心线程池数量
maximumPoolSize:最大线程池数量(包含核心线程池数量)
keepAliveTime: 线程执行完后的存活时间和 TimeUnit 联合使用
TimeUnit:线程执行完后的存活时间和 keepAliveTime 联合使用
BlockingQueue:任务队列,当新的任务到来,核心线程数已满,会加入任务队列
ThreadFactory: 线程工厂,生产线程
RejectedExecutionHandler:新的任务到来,如果已超过最大线程数 且任务队列已满,则会对该任务进行拒绝策略
二、keepAliveTime
作用在非核心线程,如果也需要作用在核心线程上,那需要调用
public void allowCoreThreadTimeOut(boolean value)
三、阻塞队列
1.ArrayBlockingQueue
存储方式:数组 final Object[] items;
构造方法两个参数:int capcity 数组大小,boolean fair是否是公平锁
ReentrantLock:公平锁和非公平锁 主要区别在获取锁的机制上
公平锁:获取时会检查队列中其他任务是否要获取锁,如果其他任务要获取锁,先让其他任务获取
非公平锁:获取时不管队列中是否有任务要获取锁,直接尝试获取
复制代码
2.LinkedBlockingDeque
存储方式:双向链表
构造方法参数:无参默认 int 最大容量,也可以传入容量值
3.PriorityBlockingQueue
存储方式:数组 private transient Object[] queue
构造函数:参数1初始容量 默认11,参数2 :比较器
4.SychronizeQueue
没有存储容量,必须找到执行线程,找不到就执行拒绝策略
5.DelayedWorkQueue
存储方式:数组 ,默认大小 16
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
复制代码
四、线程工厂
1.DefaultThreadFactory: 生成线程 组,线程编号,线程名字 线程优先级(默认是 5)
2.PrivilegedThreadFactory 继承 DefaultThreadFactory
五、拒绝策略
1.CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
复制代码
直接在调用者线程进行执行,前提是 线程池未关闭
2.AbortPolicy
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
复制代码
抛出异常
3.DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
复制代码
直接什么也不做 丢弃任务
4.DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
复制代码
从任务队列中删除最旧的,然后重新执行该任务,这里是个隐式循环,因为excute 可能会重新触发拒绝策略
六、ThreadPoolExecutor
1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
核心线程数和最大线程数相等机只有核心线程;任务队列大小无限制;DefaultThreadFactory(也可以传入定制);拒绝策略是AbortPolicy
2.CacheThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
复制代码
核心线程数是0 ,最大线程数是 MAX_VALUE,任务队列无容量,每来一个任务都会新开线程执行任务,执行完后存活一分钟 即可释放线程;DefaultThreadFactory(也可以传入定制);拒绝策略是AbortPolicy
3.SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
复制代码
核心线程和最大线程数量都是1,任务队列大小无限制,DefaultThreadFactory(也可以传入定制);拒绝策略是AbortPolicy
4.WorkStealingPool(java1.8)
Java8新增的创建线程池的方法,如果不主动设置它的并发数,那么这个方法就会以当前机器的CPU处理器个数为线程个数,这个线程池会并行处理任务,不能够保证任务执行的顺序
七、ThreadScheduledExecutor
1.SingleThreadScheduledExecutor
核心线程数是1,最大线程数无限制,非核心线程最大存活时间 是0 秒,执行完立即结束
2.ScheduledThreadPoolExecutor
核心线程数可传入,最大线程数无限制,非核心线程最大存活时间 是0 秒,执行完立即结束
八、基本执行与选择
cpu密集型任务,设置为CPU核心数+1; IO密集型任务,设置为CPU核心数*2;
CPU密集型任务指的是需要cpu进行大量计算的任务,提高CPU的利用率。核心线程数不宜设置过大,太多的线程会互相抢占cpu资源导致不断切换线程,反而浪费了cpu。最理想的情况是每个CPU都在进行计算,没有浪费,但很有可能其中的一个线程会突然挂起等待IO,此时额外的一个等待线程就可以马上进行工作,而不必等待挂起结束。
IO密集型任务指的是任务需要频繁进行IO操作,这些操作会导致线程长时间处于挂起状态,那么需要更多的线程来进行工作,不会让cpu都处于挂起状态,浪费资源。一般设置为cpu核心数的两倍即可
1.在线程数没有达到核心线程数时,每个新任务都会创建一个新的线程来执行任务。
2.当线程数达到核心线程数时,每个新任务会被放入到等待队列中等待被执行。
3.当等待队列已经满了之后,如果线程数没有到达总的线程数上限,那么会创建一个非核心线程来执行任务。
4.当线程数已经到达总的线程数限制时,新的任务会被拒绝策略者处理
九、三方使用的选择
1.Okhttp
核心线程数是 0 ,最大线程数是 Integer.MAX_VALUE,线程执行完后允许存活最大时间 60S,队列采用的是 SynchronousQueue,及无容量的队列,这里采用无容量的队列是因为 Dispatcher 自己有实现队列
//Dispatcher.java
//最大同时异步请求个数
private int maxRequests = 64;
//单个host的同时最大请求数
private int maxRequestsPerHost = 5;
//准备执行的异步队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在执行的异步队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在执行的同步队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
//加入准备队列 并判断能不能执行
void enqueue(AsyncCall call) {
synchronized (this) {
//加入准备队列
readyAsyncCalls.add(call);
}
//执行
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
//将可以执行的异步请求集合筛选出来 如果已经超过同时最大请求个数则直接跳出循环,否则如果超过最大host同时请求个数 继续下次循环
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//如果正在之子那个的
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//执行筛选出来的可执行任务
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
复制代码
2.EventBus
核心线程数是0 ,最大线程数是 MAX_VALUE,任务队列无容量,每来一个任务都会新开线程执行任务,执行完后存活一分钟 即可释放线程;拒绝策略是AbortPolicy
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
}
作者:YDG
链接:https://juejin.cn/post/6952676047695380487
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。