Java 线程池 阻塞提交任务

场景

提交任务由单线程提交到线程池多线程处理,在线程池达到处理上线时可以在提交的线程阻塞等待。

方案

1.最常见的方案就是直接设置线程池的拒绝策略为 CallerRunsPolicy,当触发拒绝策略时,会将该任务直接在提交任务所在的线程直接运行该任务。这个方案有个小问题,当如果提交的任务负载很重导致提交任务的线程长时间阻塞,就会造成线程池的饥饿。

2.较可行但是有点恶心的方案,自定义阻塞策略在触发拒绝时获取任务队列阻塞提交。

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("interrupted", e);
    }
}

这个方案也有问题:

如果你只有一个线程提交任务,而且任务的执行时间不可控,这个方案是我找到的算靠谱的了。

千万别这样用,你永远不知道别人会咋用你的线程池,一不小心就上当了!!!

3.较一般可行不那么恶心方案,自定义任务队列,直接让offer、and方法也阻塞

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

这个方案其实挺完美的,但是唯一的问题就是线程池永远只会有coreSize个线程,在任务队列达到上限时就直接阻塞了=.=丧失了线程池的伸缩能力。

4.优雅比较可行的方案,自定义实现线程池,用信号量控制同时进入的线程,这个方案代码很完美,但是操蛋的是无法精确控制线程数量。

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
//这个方案也类似 他们问题也是一样的,就是任务执行完后线程并不是立马可用的,但semaphore释放了
class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

这个方案的问题就是

  • afterExecute这个方法不是线程执行完任务最后做的事情,也就是说线程执行完afterExecute还会要执行一些任务才能返回线程池,但是这个时候我们已经执行了semaphore.release(),任务进来以后发现没有线程可用又得创建一个线程!!
  • 这个方案只能实现coreSize==maxSize,如果你尝试将任务队列长度修改和信号量长度修改,你会发现由于问题1,你总会莫名其妙的就触发了拒绝策略了
  • 线程池里的线程总会比bound要多,而且如果你的任务很快完成(非常快那种),有可能创建非常多的线程。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注