其他分享
首页 > 其他分享> > 网络IO-Reactor网络模型

网络IO-Reactor网络模型

作者:互联网

reactor=多路复用+线程池

抽象出组件,符合单一职责设计模式

reactor组件负责亲请求接收和分发

1 单reactor单线程模型

特点

1.1 reactor组件

package debug.io.model.reactor.singlereactorsinglethread;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * <p>单reactor单线程</p>
 * <p>弊端<ul>
 *     <li>服务端只能同时处理一个客户端的请求</li>
 * </ul></p>
 * @since 2022/5/22
 * @author dingrui
 */
public class Reactor implements Runnable {

    // 服务端socket
    ServerSocketChannel serverSocketChannel;
    // 复用器
    Selector selector;

    public Reactor(int port) {
        try {
            // 创建 非阻塞 绑定 监听
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.bind(new InetSocketAddress(port));
            // 注册复用器
            this.selector = Selector.open();
            // att用来处理连接请求
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT, new Acceptor(this.serverSocketChannel, this.selector));
        } catch (Exception ignored) {
        }
    }

    /**
     * <p>一个reactor收到请求后 不管是连接请求还是读写请求 一股脑处理<ul>
     *     <li>如果请求是连接请求 处理的会很快 交给acceptor处理器</li>
     *     <li>如果请求是读写请求 可能会包含着比较重的业务逻辑 分发给一个个的handler单独处理</li>
     * </ul></p>
     */
    @Override
    public void run() {
        while (true) {
            try {
                // os kqueue 收到的都是连接请求
                this.selector.select();
                Set<SelectionKey> selectionKeys = this.selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    it.remove();
                    // 请求交给dispatch进行派发 此时进来的可能是连接请求 也有可能是读请求
                    this.dispatch(sk);
                }
            } catch (Exception ignored) {
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        // 不管是连接请求还是读请求 他们两个处理器的顶层都是Runnable Acceptor收到的是连接请求 Handler收到的是读请求
        Runnable r = (Runnable) sk.attachment();
        r.run();
    }
}

1.2 acceptor组件

package debug.io.model.reactor.singlereactorsinglethread;

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * <p>用于处理{@link Reactor}中的连接请求</p>
 * @since 2022/5/22
 * @author dingrui
 */
public class Acceptor implements Runnable {

    ServerSocketChannel serverSocketChannel;
    Selector selector;

    public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    /**
     * <p>处理连接请求 进来的全部都是连接请求</p>
     */
    @Override
    public void run() {
        try {
            SocketChannel socketChannel = this.serverSocketChannel.accept();
            System.out.println("->服务端 来自" + socketChannel.getRemoteAddress()+"的连接请求");
            // 读写非阻塞 注册到复用器
            socketChannel.configureBlocking(false);
            socketChannel.register(this.selector, SelectionKey.OP_READ, new Handler(socketChannel));
        } catch (Exception ignored) {

        }
    }
}

1.3 handler组件

package debug.io.model.reactor.singlereactorsinglethread;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

/**
 * <p>接收{@link Reactor}的读写请求</p>
 * @since 2022/5/22
 * @author dingrui
 */
public class Handler implements Runnable {

    SocketChannel socketChannel;

    public Handler(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    /**
     * <p>处理读写请求</p>
     */
    @Override
    public void run() {
        ByteBuffer b = ByteBuffer.allocate(1024);
        try {
            // 客户端信息
            SocketAddress remoteAddress = this.socketChannel.getRemoteAddress();
            this.socketChannel.read(b);
            // 收到了读请求的内容
            String msg = new String(b.array(), StandardCharsets.UTF_8);
            // TODO: 2022/5/22 业务处理
            System.out.println("->服务端 来自" + remoteAddress + "的一条消息=" + msg);
            // 回写数据给客户端
            String echo = "->客户端 服务端已经收到了你的信息=" + msg;
            this.socketChannel.write(ByteBuffer.wrap(echo.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception ignored) {
        }
    }
}

1.4 服务端api

package debug.io.model.reactor.singlereactorsinglethread;

import debug.io.model.reactor.ClientTest;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class SingleReactorSingleThreadTest {

    public static void main(String[] args) {
        Reactor reactor = new Reactor(ClientTest.PORT);
        reactor.run();
    }
}

2 单reactor多线程模型

特点

2.1 reactor组件

package debug.io.model.reactor.singlereactormultiplethread;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

/**
 * <p>单reactor多线程</p>
 * <p>相较于{@link debug.io.model.reactor.singlereactorsinglethread.Reactor}版本 仅仅是在处理读写请求的时候加了个线程池</p>
 * @since 2022/5/22
 * @author dingrui
 */
public class Reactor implements Runnable {

    ServerSocketChannel serverSocketChannel;
    Selector selector;

    public Reactor(int port) {
        try {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.bind(new InetSocketAddress(port));
            this.serverSocketChannel.configureBlocking(false);

            this.selector = Selector.open();
            this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT, new Acceptor(this.serverSocketChannel, this.selector));
        } catch (Exception ignored) {
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                this.selector.select();
                Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    it.remove();
                    this.dispatch(sk);
                }
            } catch (Exception ignored) {
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        Runnable r = (Runnable) sk.attachment();
        r.run();
    }
}

2.2 acceptor组件

package debug.io.model.reactor.singlereactormultiplethread;

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class Acceptor implements Runnable {

    ServerSocketChannel ss;
    Selector sl;

    public Acceptor(ServerSocketChannel ss, Selector sl) {
        this.ss = ss;
        this.sl = sl;
    }

    @Override
    public void run() {
        try {
            SocketChannel s = this.ss.accept();
            s.configureBlocking(false);
            s.register(this.sl, SelectionKey.OP_READ, new Handler(s));
        } catch (Exception ignored) {
        }
    }
}

2.3 handler组件

package debug.io.model.reactor.singlereactormultiplethread;

import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class Handler implements Runnable {

    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(5);

    SocketChannel s;

    public Handler(SocketChannel s) {
        this.s = s;
    }

    @Override
    public void run() {
        try {
            ByteBuffer b = ByteBuffer.allocate(1024);
            this.s.read(b);
            EXECUTOR.execute(new Process(this.s, b));
        } catch (Exception ignored) {
        }
    }

    private static class Process implements Runnable {

        SocketChannel s;
        ByteBuffer b;

        public Process(SocketChannel s, ByteBuffer b) {
            this.s = s;
            this.b = b;
        }

        @Override
        public void run() {
            try {
                SocketAddress remoteAddress = this.s.getRemoteAddress();
                String msg = new String(b.array(), StandardCharsets.UTF_8);
                System.out.println("->服务端 收到来自客户端的消息=" + msg);
                String echo = "->客户端 服务端处理业务的线程=" + Thread.currentThread().getName();
                this.s.write(ByteBuffer.wrap(echo.getBytes(StandardCharsets.UTF_8)));
            } catch (Exception ignored) {
            }
        }
    }
}

2.4 服务端api

package debug.io.model.reactor.singlereactormultiplethread;

import debug.io.model.reactor.ClientTest;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class singleReactorMultipleThreadTest {

    public static void main(String[] args) {
        Reactor reactor = new Reactor(ClientTest.PORT);
        reactor.run();
    }
}

3 主从reactor模型

特点

3.1 reactor组件

3.1.1 main

package debug.io.model.reactor.multiplereactormultiplethread;

import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class MainReactor implements Runnable {

    ServerSocketChannel ss;
    Selector sl;

    public MainReactor(int port) {
        try {
            this.ss = ServerSocketChannel.open();
            this.sl = Selector.open();
            this.ss.configureBlocking(false);
            this.ss.bind(new InetSocketAddress(port));
            this.ss.register(this.sl, SelectionKey.OP_ACCEPT, new Acceptor(this.ss));
        } catch (Exception ignored) {
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                this.sl.select();
                Iterator<SelectionKey> it = this.sl.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    it.remove();
                    this.dispatch(sk);
                }
            } catch (Exception ignored) {
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        Runnable r = (Runnable) sk.attachment();
        r.run();
    }
}

3.1.2 sub

package debug.io.model.reactor.multiplereactormultiplethread;

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class SubReactor implements Runnable {

    // sub编号
    int id;
    Selector sl;

    public SubReactor(int id, Selector sl) {
        this.id = id;
        this.sl = sl;
    }

    @Override
    public void run() {
        while (true) {
            try {
                this.sl.select();
                Iterator<SelectionKey> it = this.sl.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey sk = it.next();
                    it.remove();
                    this.dispatch(sk);
                }
            } catch (Exception ignored) {
            }
        }
    }

    private void dispatch(SelectionKey sk) {
        Runnable r = (Runnable) sk.attachment();
        r.run();
    }
}

3.2 acceptor组件

package debug.io.model.reactor.multiplereactormultiplethread;

import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class Acceptor implements Runnable {

    ServerSocketChannel ss;

    private int idx;

    private final int SUB_REACTOR_CNT = 5;
    private SubReactor[] subReactors = new SubReactor[SUB_REACTOR_CNT];
    private Thread[] threads = new Thread[SUB_REACTOR_CNT];
    private final Selector[] selectors = new Selector[SUB_REACTOR_CNT];

    public Acceptor(ServerSocketChannel ss) {
        this.ss = ss;

        for (int i = 0; i < SUB_REACTOR_CNT; i++) {
            try {
                this.selectors[i] = Selector.open();
            } catch (Exception e) {
                e.printStackTrace();
            }
            this.subReactors[i] = new SubReactor(i, this.selectors[i]);
            this.threads[i] = new Thread(this.subReactors[i]);
            this.threads[i].start();
        }
    }

    /**
     * mainReactor将连接请求都分发到acceptor中
     */
    @Override
    public void run() {
        try {
            SocketChannel s = this.ss.accept();
            s.configureBlocking(false);
            this.selectors[this.idx].wakeup();
            // 让subReactor中携带的复用器关注读请求
            s.register(this.selectors[this.idx], SelectionKey.OP_READ, new Handler(s));
            if (++this.idx == SUB_REACTOR_CNT) this.idx = 0;
        } catch (Exception ignored) {
        }
    }
}

3.3 handler组件

package debug.io.model.reactor.multiplereactormultiplethread;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class Handler implements Runnable {

    SocketChannel s;

    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(3);

    public Handler(SocketChannel s) {
        this.s = s;
    }

    @Override
    public void run() {
        try {
            // 读
            ByteBuffer b = ByteBuffer.allocate(1024);
            this.s.read(b);
            // 封装任务提交线程池
            EXECUTOR.execute(new Process(this.s, b));
        } catch (Exception ignored) {
        }
    }

    private static class Process implements Runnable {
        SocketChannel s;
        ByteBuffer b;

        public Process(SocketChannel s, ByteBuffer b) {
            this.s = s;
            this.b = b;
        }

        @Override
        public void run() {
            try {
                String msg = new String(this.b.array(), StandardCharsets.UTF_8);
                System.out.println("handler-> 收到来自" + this.s.getRemoteAddress() + "的消息=" + msg);
                // 写
                this.s.write(ByteBuffer.wrap("handler-> 来自handler的回写".getBytes(StandardCharsets.UTF_8)));
            } catch (Exception ignored) {
            }
        }
    }
}

3.4 服务端api

package debug.io.model.reactor.multiplereactormultiplethread;

import debug.io.model.reactor.ClientTest;

/**
 *
 * @since 2022/5/22
 * @author dingrui
 */
public class MultipleReactorMultipleThreadTest {

    public static void main(String[] args) {
        MainReactor mainReactor = new MainReactor(ClientTest.PORT);
        mainReactor.run();
    }
}

标签:java,Reactor,nio,网络,channels,IO,import,public,reactor
来源: https://www.cnblogs.com/miss-u/p/16297624.html