场景
提交任务由单线程提交到线程池多线程处理,在线程池达到处理上线时可以在提交的线程阻塞等待。
方案
1.最常见的方案就是直接设置线程池的拒绝策略为 CallerRunsPolicy,当触发拒绝策略时,会将该任务直接在提交任务所在的线程直接运行该任务。这个方案有个小问题,当如果提交的任务负载很重导致提交任务的线程长时间阻塞,就会造成线程池的饥饿。
2.较可行但是有点恶心的方案,自定义阻塞策略在触发拒绝时获取任务队列阻塞提交。
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("interrupted", e); } }
这个方案也有问题:
- coreSize=0时,有可能出现死锁,需要设置存活时间(关于这个我没看懂,可能跟源码实现有关)
- API强烈不建议getQueue()直接修改任务队列
如果你只有一个线程提交任务,而且任务的执行时间不可控,这个方案是我找到的算靠谱的了。
千万别这样用,你永远不知道别人会咋用你的线程池,一不小心就上当了!!!
3.较一般可行不那么恶心方案,自定义任务队列,直接让offer、and方法也阻塞
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
这个方案其实挺完美的,但是唯一的问题就是线程池永远只会有coreSize个线程,在任务队列达到上限时就直接阻塞了=.=丧失了线程池的伸缩能力。
4.优雅比较可行的方案,自定义实现线程池,用信号量控制同时进入的线程,这个方案代码很完美,但是操蛋的是无法精确控制线程数量。
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } } //这个方案也类似 他们问题也是一样的,就是任务执行完后线程并不是立马可用的,但semaphore释放了 class BlockingExecutor implements Executor { final Semaphore semaphore; final Executor delegate; private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) { semaphore = new Semaphore(concurrentTasksLimit); this.delegate = delegate; } @Override public void execute(final Runnable command) { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); return; } final Runnable wrapped = () -> { try { command.run(); } finally { semaphore.release(); } }; delegate.execute(wrapped); } }
这个方案的问题就是
- afterExecute这个方法不是线程执行完任务最后做的事情,也就是说线程执行完afterExecute还会要执行一些任务才能返回线程池,但是这个时候我们已经执行了semaphore.release(),任务进来以后发现没有线程可用又得创建一个线程!!
- 这个方案只能实现coreSize==maxSize,如果你尝试将任务队列长度修改和信号量长度修改,你会发现由于问题1,你总会莫名其妙的就触发了拒绝策略了
- 线程池里的线程总会比bound要多,而且如果你的任务很快完成(非常快那种),有可能创建非常多的线程。