场景
提交任务由单线程提交到线程池多线程处理,在线程池达到处理上线时可以在提交的线程阻塞等待。
方案
1.最常见的方案就是直接设置线程池的拒绝策略为 CallerRunsPolicy,当触发拒绝策略时,会将该任务直接在提交任务所在的线程直接运行该任务。这个方案有个小问题,当如果提交的任务负载很重导致提交任务的线程长时间阻塞,就会造成线程池的饥饿。
2.较可行但是有点恶心的方案,自定义阻塞策略在触发拒绝时获取任务队列阻塞提交。
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
if (!executor.isShutdown()) {
executor.getQueue().put(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("interrupted", e);
}
}
这个方案也有问题:
- coreSize=0时,有可能出现死锁,需要设置存活时间(关于这个我没看懂,可能跟源码实现有关)
- API强烈不建议getQueue()直接修改任务队列
如果你只有一个线程提交任务,而且任务的执行时间不可控,这个方案是我找到的算靠谱的了。
千万别这样用,你永远不知道别人会咋用你的线程池,一不小心就上当了!!!
3.较一般可行不那么恶心方案,自定义任务队列,直接让offer、and方法也阻塞
public class LimitedQueue<E> extends LinkedBlockingQueue<E>
{
public LimitedQueue(int maxSize)
{
super(maxSize);
}
@Override
public boolean offer(E e)
{
// turn offer() and add() into a blocking calls (unless interrupted)
try {
put(e);
return true;
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
}
这个方案其实挺完美的,但是唯一的问题就是线程池永远只会有coreSize个线程,在任务队列达到上限时就直接阻塞了=.=丧失了线程池的伸缩能力。
4.优雅比较可行的方案,自定义实现线程池,用信号量控制同时进入的线程,这个方案代码很完美,但是操蛋的是无法精确控制线程数量。
public class BoundedExecutor extends ThreadPoolExecutor{
private final Semaphore semaphore;
public BoundedExecutor(int bound) {
super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
semaphore = new Semaphore(bound);
}
/**Submits task to execution pool, but blocks while number of running threads
* has reached the bound limit
*/
public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{
semaphore.acquire();
return submit(task);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
semaphore.release();
}
}
//这个方案也类似 他们问题也是一样的,就是任务执行完后线程并不是立马可用的,但semaphore释放了
class BlockingExecutor implements Executor {
final Semaphore semaphore;
final Executor delegate;
private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
semaphore = new Semaphore(concurrentTasksLimit);
this.delegate = delegate;
}
@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
final Runnable wrapped = () -> {
try {
command.run();
} finally {
semaphore.release();
}
};
delegate.execute(wrapped);
}
}
这个方案的问题就是
- afterExecute这个方法不是线程执行完任务最后做的事情,也就是说线程执行完afterExecute还会要执行一些任务才能返回线程池,但是这个时候我们已经执行了semaphore.release(),任务进来以后发现没有线程可用又得创建一个线程!!
- 这个方案只能实现coreSize==maxSize,如果你尝试将任务队列长度修改和信号量长度修改,你会发现由于问题1,你总会莫名其妙的就触发了拒绝策略了
- 线程池里的线程总会比bound要多,而且如果你的任务很快完成(非常快那种),有可能创建非常多的线程。
