编程语言
首页 > 编程语言> > 网络编程IO多路复用-服务端代码

网络编程IO多路复用-服务端代码

作者:互联网

使用Java NIO完成服务端代码的编写,代码写的不完善,本文主要想体现多路复用的几种编程模型和思想。


一、单线程版本

使用单线程+NIO完成服务端代码的编写,并且使用一个Selector注册器。在一个线程中处理ServerSocketChannel的accept、SocketChannel的read、write。

创建ServerSocketChannel,并将其注册到Selector中。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 *
 * 使用NIO多路复用处理与客户端的通信
 *
 */
public class Server {

    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    public Server(int port) throws Exception{
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void start() {
        while (!Thread.interrupted()) {
            try {
                if (selector.select()>0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        dispatch(selectionKey);
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    selector.close();
                } catch (Exception ee) {

                }
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        if (selectionKey.isAcceptable()) {
            new Acceptor(serverSocketChannel, selector).accept();
        } else if (selectionKey.isReadable()) {
            new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read();
        } else if (selectionKey.isWritable()) {
            new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write();
        }
    }

}

接收客户端的连接。

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责接收客户端的连接
 *
 */
public class Acceptor {

    private final ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    public void accept() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

接收客户端发送的内容。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责读取客户端数据,即read
 *
 */
public class ReadHandler {

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void read() {
        try {
            // 读取客户端数据
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            byteBuffer.flip();
            System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array()));
            // 注册写事件,给客户端回消息
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } catch (Exception e) {
            // 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听
            // 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

}

向客户端发送内容。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责向客户端响应数据,即wirte
 *
 */
public class WriteHandler {

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void write() {
        try {
            // 发送数据给客户端
            String msg = "你好,欢迎"+socketChannel.getRemoteAddress();
            ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
            socketChannel.write(buffer);
            // 注册读事件,继续等待客户端的信息
            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (Exception e) {
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

}

运行服务端:

/**
 *
 * 启动server服务
 */
public class Main {

    public static void main(String[] args) throws Exception {
        Server server = new Server(8800);
        server.start();
    }
}

二、多线程线程池版本

使用线程池在不同的线程中处理ServerSocketChannel的accept、SocketChannel的read、write。仍然使用一个Selector。

43行中让当前线程暂停500,目的是让dispatch方法逻辑执行完成之后再执行iterator.remove()将当前的SelectionKey从集合中移除。因为dispatch中使用了线程池异步处理,可能会存在代码先执行了iterator.remove(),后执行dispatch逻辑,这样会导致错误。(这种sleep方式处理是存在问题的)

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * 使用NIO多路复用处理与客户端的通信
 *
 */
public class Server {

    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    private final ExecutorService executorService = Executors.newFixedThreadPool(1024);

    public Server(int port) throws Exception{
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void start() {
        while (!Thread.interrupted()) {
            try {
                if (selector.select()>0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        dispatch(selectionKey);
                        try {
                            Thread.sleep(500);
                        } catch (Exception e) {}

                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    selector.close();
                } catch (Exception ee) {

                }
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        if (selectionKey.isAcceptable()) {
            executorService.execute(new Acceptor(serverSocketChannel, selector));
        } else if (selectionKey.isReadable()) {
            executorService.execute(new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey));
        } else if (selectionKey.isWritable()) {
            executorService.execute(new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey));
        }
    }

}

实现Runnable接口,重写run方法。

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责接收客户端的连接
 *
 */
public class Acceptor implements Runnable{

    private final ServerSocketChannel serverSocketChannel;

    private Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    public void accept() {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        accept();
    }
}

实现Runnable接口,重写run方法。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责读取客户端数据,即read
 *
 */
public class ReadHandler implements Runnable{

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void read() {
        try {
            // 读取客户端数据
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socketChannel.read(byteBuffer);
            byteBuffer.flip();
            System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array()));
            // 注册写事件,给客户端回消息
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        } catch (Exception e) {
            // 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听
            // 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        read();
    }
}

实现Runnable接口,重写run方法。

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 *
 * 负责向客户端响应数据,即wirte
 *
 */
public class WriteHandler implements Runnable{

    private final SocketChannel socketChannel;

    private final SelectionKey selectionKey;

    public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
        this.socketChannel = socketChannel;
        this.selectionKey = selectionKey;
    }

    public void write() {
        try {
            // 发送数据给客户端
            String msg = "你好,欢迎"+socketChannel.getRemoteAddress();
            ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
            socketChannel.write(buffer);
            // 注册读事件,继续等待客户端的信息
            selectionKey.interestOps(SelectionKey.OP_READ);
        } catch (Exception e) {
            selectionKey.cancel();
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        write();
    }
}

三、多Selector(主从)

一个Selector负责接收客户端的连接(ServerSocketChannel#accept),多个Selector负责客户端的读写数据(SocketChannel#read、SokcetChannel#write)。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * 使用NIO多路复用处理与客户端的通信
 *
 */
public class Server {

    /**
     * 主selector,负责监听accept事件,处理客户端的连接
     */
    private final Selector masterSelector;

    /**
     * 存放从selector集合
     */
    private Selector[] selectors;

    private final ServerSocketChannel serverSocketChannel;

    private final ExecutorService executorService = Executors.newFixedThreadPool(1024);

    public Server(int port) throws Exception{
        masterSelector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(masterSelector, SelectionKey.OP_ACCEPT);

        // 创建两个从selector
        Selector subSelector1 = Selector.open();
        Selector subSelector2 = Selector.open();
        selectors = new Selector[]{subSelector1, subSelector2};

    }

    /**
     * 每个Selector放进单独的线程的进行循环,
     * 避免select阻塞互相影响。
     */
    public void start() {
        executorService.execute(()->loop(masterSelector));
        for (Selector selector : selectors) {
            executorService.execute(()->loop(selector));
        }
    }

    /**
     * 因为Selector的select是阻塞方法,多个Selector在单线程中循环,
     * 会造成互相等待的影响,所以每个Selector都另起一个线程。
     * @param selector
     */
    private void loop(Selector selector) {
        while (!Thread.interrupted()) {
            try {
                // 这里最多阻塞1秒则直接返回
                int select = selector.select(1000);
                if (select>0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        dispatch(selectionKey);
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                try {
                    selector.close();
                } catch (Exception ee) {

                }
                e.printStackTrace();
            }
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        if (selectionKey.isAcceptable()) {
            // 从子selector中随机取出一个作为参数
            Random random = new Random();
            int i = random.nextInt(selectors.length);
            Selector subSelector = selectors[i];
            new Acceptor(serverSocketChannel, subSelector).accept();
        } else if (selectionKey.isReadable()) {
            new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read();
        } else if (selectionKey.isWritable()) {
            new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write();
        }
    }

}

一主二从,一个主Selector负责accept客户端连接。两个从Selector负责与客户端read、write。

其他类的代码同单线程版本。

标签:selector,SelectionKey,java,多路复用,Selector,IO,import,selectionKey,服务端
来源: https://blog.csdn.net/weixin_50518271/article/details/116720184