其他分享
首页 > 其他分享> > 线程池提交任务方法

线程池提交任务方法

作者:互联网

excute方法:  源码

submit方法通过提交参数构造FutrueTask,然后执行excute(FutrueTask)方法,返回一个future对象

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);//new FutureTask<T>(runnable, null)
    execute(ftask);
    return ftask;
}
submit(Runnable task)
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);//new FutureTask<T>(runnable, value)
    execute(ftask);
    return ftask;
}
submit(Runnable task, T result)
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
submit(Callable task)

任务批量提交

超时时间:执行invokeAll或者invokeAny的时间

会等待所有任务完成,如果异常终止(包括超时停止),取消所有任务

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            //构造FutureTask,添加进futures集合里
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);//执行任务
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {//任务已经完成
                try {
                    f.get();//这里只是起到阻塞的作用,如果线程中断,则跳到finally
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {//不处理任务执行抛出
                }
            }
        }
        //如果没走到这里,就说明抛出了异常,可能是线程中断异常或其他异常。
        done = true;
        return futures;
    } finally {
        //未正常结束,取消所有任务,通过future get时抛出CancellationException(
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}
invokeAll(Collection<? extends Callable> tasks)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)//超时返回
                return futures;
        }

        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            //如果任务未完成,则进去阻塞到任务完成
            if (!f.isDone()) {
                //超时返回
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;//get超时返回
                }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;//未走到这里,标识抛出异常了,在finally里取消所有任务
        return futures;
    } finally {
        //进而取消所有任务,取消的任务不能再执行了
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}
invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)

只要有任意一个任务完成,就会返回其结果值,如果超时,则抛出 TimeoutException异常,最终都会取消所有任务

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
invokeAny(Collection<? extends Callable> tasks)
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit)

真正执行invokeAny的方法

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        //可以优先获取到已经完成的任务
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            //任务执行时抛出的异常
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;//正在执行的任务数量

            for (;;) {
                //获取已完成的任务, 这里是通过ExecutorCompletionService的方法来获取
                Future<T> f = ecs.poll();
                //没有已经完成的任务
                if (f == null) {
                    if (ntasks > 0) {//当前未提交的任务
                        --ntasks;//未提交任务-1
                        futures.add(ecs.submit(it.next()));//提交任务
                        ++active;
                    }
                    else if (active == 0) //如果没有任务正在执行
                        break;
                    else if (timed) {//如果设置了超时时间
                        //阻塞获取
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        //获取不到,就抛出超时异常
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();//阻塞获取
                }
                if (f != null) {//有任务完成了
                    --active;//任务正在执行数量-1
                    try {
                        return f.get();//直接返回结果值
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            //最终取消所有任务
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
doInvokeAny

 

标签:tasks,futures,任务,task,线程,提交,new,null,size
来源: https://www.cnblogs.com/shuiyingyuan/p/15262979.html