其他分享
首页 > 其他分享> > 线程池 实现

线程池 实现

作者:互联网

线程池……

锁的对象为实际消费数据对象。
wait 释放锁,线程释放资源进入对象等待队列。

notify 将线程从等待队列移到阻塞队列,抢占资源继续执行。(等待/通知。生产者/消费者。订阅/发布???)

 

 

public interface ThreadPool<Job extends Runnable> {
    void execute(Job job);
    void shutdown();
    void addWorkers(int num);
    void removeWorker(int num);
    int getJobSize();
}

  

 

public class DefaultThreadPool<Job extends Runnable>  implements ThreadPool<Job> {

    private static final  int MAX_WORKER_SIZE = 10 ;

    private static final  int DEFAULT_WORKER_SIZE = 5;

    private static final  int MIN_WORKER_SIZE = 1;

    private List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());

    private final LinkedList<Job> jobs = new LinkedList<Job>();

    private AtomicLong threadNum = new AtomicLong();

    private int workerNum = DEFAULT_WORKER_SIZE;

    public void execute(Job job) {
        if(job!=null){
            synchronized (jobs){
                jobs.add(job);
                jobs.notify();
            }
        }
    }

    public void shutdown() {
        for(Worker  worker:workers){
             worker.shutdown();
        }

    }

    public void addWorkers(int num) {
            synchronized (jobs){
                if(num + workerNum> MAX_WORKER_SIZE){
                    num = MAX_WORKER_SIZE- workerNum;
                }
                initializeWorkers(num);
                workerNum += num;
            }
    }

    public void removeWorker(int num) {
            synchronized (jobs){
                if(num > workerNum){
                    try {
                        throw new IllegalAccessException("");
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    }
                }
                int count =0  ;
                while (count< num ){
                    Worker worker = workers.get(count);
                    if(workers.remove(worker)){
                        worker.shutdown();
                        count++;
                    }
                }
                workerNum -= count;
            }
    }

    public int getJobSize() {
        return jobs.size();
    }

    public DefaultThreadPool(){
        initializeWorkers(DEFAULT_WORKER_SIZE);
    }

    public DefaultThreadPool(int num ){
        workerNum = num > MAX_WORKER_SIZE ?MAX_WORKER_SIZE:num< MIN_WORKER_SIZE?MIN_WORKER_SIZE:num ;
        initializeWorkers(workerNum);
    }

    private void initializeWorkers(int num){
        for(int i = 0 ; i < num; i ++ ){
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker,"ThreadPool-Worker-"+ threadNum.incrementAndGet() );
            thread.start();

        }

    }

    class Worker implements  Runnable{
        private  volatile  boolean running = false ;
        public void run() {
            while (running){
                Job job = null ;
                synchronized (jobs){
                    while(jobs.isEmpty()){
                        try {
                            jobs.wait();
                        }catch (Exception e){
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    job = jobs.removeFirst();
                }
                if(job != null ){
                    try {
                        job.run();
                    }catch (Exception e){

                    }
                }

            }
        }

        public void  shutdown(){
            running = false ;
        }
    }
}

  

标签:jobs,实现,void,int,num,线程,public,SIZE
来源: https://www.cnblogs.com/heshana/p/15973385.html