编程语言
首页 > 编程语言> > Java NIO多线程服务器实现

Java NIO多线程服务器实现

作者:互联网

模型原理图

BOSS线程(ServerSocketChannel)专门负责建立链接,然后将accept到的SocketChannel分发给多个Worker线程。Worker线程有多个,可以分摊来自多个Client的SocketChannel。Worker线程专门负责read和write。

NIO多线程服务器实现

server端实现
package niomultithreadserver;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;


public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        // 设置boss线程的名称为"boss"
        Thread.currentThread().setName("boss");
        // 创建ServerSocketChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        /**
         * 1. ServerSocketChannel配置为非阻塞模式
         * 2. selector通常都是和非阻塞channel进行搭配
         * 非阻塞channel一旦感兴趣的事件,则可以通过selector.select()方法将事件追加到selector的selectedKeys中
         */
        ssc.configureBlocking(false);
        // 创建boss线程的selector,该selector专门监听accept事件
        Selector boss = Selector.open();
        // 将创建好的ServerSocketChannel和关注的accept事件注册到该selector
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        // ServerSocketChannel和端口进行绑定
        ssc.bind(new InetSocketAddress(8088));
        // 创建一定数量的worker
        int cpuNum = Runtime.getRuntime().availableProcessors();
        System.out.println("cpuNum:" + cpuNum);
        Worker[] workers = new Worker[cpuNum];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
            workers[i].initWorker();
        }

        AtomicInteger count = new AtomicInteger();
        while (true) {
            /**
             * 轮训check查询的时候是否就绪,如果未就绪则select不会返回,只有监听的事件发生select()方法才返回
             * 每个selector中有两个集合:
             * 集合1: interested keys集合,存储的是当前selector感兴趣的channel + 事件类型(accept, connect, read, write)
             * 集合2:selected keys集合,存储的是selector.select()方法调用之后扫描到的新发生的事件
             * 注意每次select()方法调用时,可以理解为是往"selected keys集合"中追加本次select()新扫描的事件,上次select()并且加入的事件不会自动清除。
             * 因此后面迭代器迭代"selected keys集合"时需要及时remove掉,否则下次循环会重复消费处理
             */
            boss.select();
            Iterator<SelectionKey> bossSelectedKeysIter = boss.selectedKeys().iterator();
            while (bossSelectedKeysIter.hasNext()) {
                SelectionKey key = bossSelectedKeysIter.next();
                /**
                 * 注意及时remove掉已经监听到并且马上就要处理的事件
                 * 因为selector的selectedKeys集合不会自动将key清除掉,这会导致下次循环重复处理
                 */
                bossSelectedKeysIter.remove();
                if (key.isAcceptable()) {
                    SelectableChannel keyChannel = key.channel();
                    ServerSocketChannel sscFromSelector = (ServerSocketChannel) keyChannel;
                    System.out.println("sscFromSelector == ssc?" + (sscFromSelector == ssc));

                    SocketChannel sc = sscFromSelector.accept();
                    sc.configureBlocking(false);
                    // 建立链接打印日志
                    System.out.println("线程:" + Thread.currentThread().getName() + ":" + "connected --- " + sc.getRemoteAddress());
                    // 分配worker
                    System.out.println("线程:" + Thread.currentThread().getName() + ":" + "before register --- " + sc.getRemoteAddress());
                    workers[count.incrementAndGet() % workers.length].registerChannel(sc);
                    System.out.println("线程:" + Thread.currentThread().getName() + ":" + "after register --- " + sc.getRemoteAddress());
                }
            }
        }
    }

    static class Worker implements Runnable {
        private Thread thread;
        private Selector workerSelector;
        private String name;
        ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();

        public Worker(String name) {
            this.name = name;
        }

        public void initWorker() throws IOException {
            thread = new Thread(this, name);
            workerSelector = Selector.open();
            thread.start();
        }

        /**
         * 将一个SocketChannel分配给当前worker
         *
         * @param sc
         */
        public void registerChannel(SocketChannel sc) {
            queue.add(() -> {
                try {
                    sc.register(workerSelector, SelectionKey.OP_READ, null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            // 注册到任务队列之后唤醒一下
            workerSelector.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
                    workerSelector.select();
                    /**
                     * 注册分配的channel
                     */
                    Runnable scRegisterTask = queue.poll();
                    if (scRegisterTask != null) {
                        scRegisterTask.run();
                    }

                    Iterator<SelectionKey> workerSelectedKeysIter = workerSelector.selectedKeys().iterator();
                    while (workerSelectedKeysIter.hasNext()) {
                        SelectionKey key = workerSelectedKeysIter.next();
                        workerSelectedKeysIter.remove();
                        if (key.isReadable()) {
                            SocketChannel channel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(16);
                            /**
                             * 将channel中的数据写入buffer
                             * 注意:两种情况需要做好处理:
                             * 1. 如果客户端正常关闭了socket,则read返回的是-1,这时需要将客户端对应的channel从boss selector中cancel
                             * 2. 如果客户端非正常关闭了socket,则需要捕获read方法,并将客户端对应的channel从boss selector中cancel
                             */
                            int readCnt = channel.read(byteBuffer);
                            if (readCnt == -1) {
                                System.out.println("线程:" + Thread.currentThread().getName() + ":" + "客户端关闭链接,取消channel监听...");
                                key.cancel();
                            }
                            System.out.println("线程:" + Thread.currentThread().getName() + ":" + "read data --- " + channel.getRemoteAddress());
                            // 切换buffer为读模式
                            byteBuffer.flip();
                            // 读取
                            System.out.println(Charset.forName("UTF-8").decode(byteBuffer));
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

client端实现
package niomultithreadserver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8088));
        sc.write(Charset.forName("UTF-8").encode("123abc"));
        System.in.read();
    }
}

标签:Java,NIO,java,selector,线程,sc,import,多线程,channel
来源: https://www.cnblogs.com/iamswf/p/16271353.html