其他分享
首页 > 其他分享> > channel通信简单示列

channel通信简单示列

作者:互联网

服务端:接收客户端发送的消息,并进行转发。

package socket.demo2;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;


/**
* 聊天服务端
* @author 一池春水倾半城
* @date 2019/10/22
*/
public class Server {
    private Selector selector;
    // 人数统计、昵称和主机地址记录
    private Map<String, String> users = new HashMap<>();
    ByteBuffer buffer = ByteBuffer.allocate(2048);


    public Server(int port) throws IOException {
        // 开启服务端通道
        ServerSocketChannel server = ServerSocketChannel.open();
        // 监听端口
        server.bind(new InetSocketAddress(port));
        // 切换非阻塞模式
        server.configureBlocking(false);
        // 开启选择器
        selector = Selector.open();
        // 选择器注册到服务端通道上
        server.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务端启动...");
    }


    /**
     * 通过监听选择键来监听客户端连接
     * @throws IOException
     */
    public void listen() throws IOException {
        while(true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 移除已处理的选择键
                iterator.remove();
                // 处理选择键
                handle(key);
            }
            // 清空选择键
            selector.selectedKeys().clear();
        }
    }


    /**
     * 处理选择键
     * @param key
     * @throws IOException
     */
    private void handle(SelectionKey key) throws IOException {
        ServerSocketChannel server;
        SocketChannel client;
        if (key.isAcceptable()) {
            // 获取key对应的通道
            server = (ServerSocketChannel) key.channel();
            // 获取服务端连接
            client = server.accept();
            client.configureBlocking(false);
            // 注册到选择器,指定行为是"读"
            client.register(selector, SelectionKey.OP_READ);
            System.out.println("接收到来自 " + client.getRemoteAddress() + " 的新连接!");
            boardMsg("当前在线人数:" + users.size());
            write("\n欢迎来到本聊天室,请输入昵称:", client);
            key.interestOps(SelectionKey.OP_ACCEPT);
        } else if (key.isReadable()) {
            client = (SocketChannel) key.channel();
            try {
                String[] msg = rec(client).split("###");
                if (msg.length == 1) {      // 设置昵称
                    if (users.containsValue(msg[0])) {
                        write("昵称重复,请重新输入!", client);
                    } else {
                        users.put(client.getRemoteAddress().toString(), msg[0]);
                        write("hello " + msg[0], client);
                    }
                } else if (msg.length == 2) {
                    System.out.println(client.getRemoteAddress() + " named " + msg[0] + " said to all: " + msg[1]);
                    boardMsg(msg[0] + "说:" + msg[1]);
                } else if (msg.length == 3) {
                    System.out.println(client.getRemoteAddress() + " named " + msg[0] + " said to " + msg[2] + ": " + msg[1]);
                    p2pChat(msg[0] + "说:" + msg[1], msg[2], client);
                }
            } catch (Exception e) {
                String address = client.getRemoteAddress().toString();
                System.out.println(address + " 断开了连接!");
                client.close();
                String name = users.get(address);
                users.remove(address);
                boardMsg("用户 " + name + " 离开了!当前在线人数:" + users.size());
            }
        }
    }


    /**
     * 读消息
     * @param channel
     * @return
     * @throws IOException
     */
    private String rec(SocketChannel channel) throws IOException {
        buffer.clear();
        int count = channel.read(buffer);
        buffer.flip();
        return new String(buffer.array(), 0, count, StandardCharsets.UTF_8);
    }


    /**
     * 写消息
     * @param msg
     * @param channel
     * @throws IOException
     */
    private void write(String msg, SocketChannel channel) throws IOException {
        buffer.clear();
        buffer.put(msg.getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        channel.write(buffer);
    }


    /**
     * 分发消息给全部客户端,群聊
     * @param msg
     * @throws IOException
     */
    private void boardMsg(String msg) throws IOException {
        for (SelectionKey key:selector.keys()) {
            Channel target = key.channel();
            if (target.isOpen() && target instanceof SocketChannel) {
                write(msg, (SocketChannel) target);
            }
        }
    }


    /**
     * 发送消息给指定客户端,单聊
     * @param msg
     * @param targetName
     * @param source
     * @throws IOException
     */
    private void p2pChat(String msg, String targetName, SocketChannel source) throws IOException {
        boolean flag = false;
        for (SelectionKey key:selector.keys()) {
            Channel target = key.channel();
            if (target.isOpen() && target instanceof SocketChannel) {
                SocketChannel tar = (SocketChannel) target;
                String name = users.get(tar.getRemoteAddress().toString());
                if (name.equals(targetName)) {
                    write(msg, (SocketChannel) target);
                    write(msg, source);
                    flag = true;
                    break;
                }
            }
        }
        if (!flag) {
            write("找不到该用户!", source);
        }
    }


    public static void main(String[] args) throws IOException {
        Server server = new Server(7777);
        server.listen();
    }


}

客户端:发送消息和读取消息

package socket.demo2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * @author 一池春水倾半城
 * @date 2019/10/22
 */
public class Client {
    static ByteBuffer buffer = ByteBuffer.allocate(1024);
    // 记录昵称是否设置成功
    volatile static boolean success = false;
    // 用户昵称
    volatile static String name = "sxh";

    /**
     * 读消息
     * @param channel
     * @return
     * @throws IOException
     */
    private static String rec(SocketChannel channel) throws IOException {
        buffer.clear();
        int count = channel.read(buffer);
        buffer.flip();
        return new String(buffer.array(), 0, count, StandardCharsets.UTF_8);
    }

    /**
     * 写消息
     * @param msg
     * @param channel
     * @throws IOException
     */
    private static void write(String msg, SocketChannel channel) throws IOException {
        buffer.clear();
        buffer.put(msg.getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        channel.write(buffer);
    }

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",7777));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);

        // 开启新线程,从服务端读取消息
        new Thread(() -> {
            SocketChannel client = null;
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            client = (SocketChannel) key.channel();
                            String msg = rec(client);
                            // 昵称设置成功
                            if (msg.contains("hello")) {
                                // 标识置为true
                                success = true;
                                name = msg.substring(6);
                            }
                            System.out.println(msg);
                            key.interestOps(SelectionKey.OP_READ);
                        }
                    }
                    selectionKeys.clear();
                } catch (IOException e) {
                    if (client != null) {
                        try {
                            client.close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }).start();

        // 主线程,用于写消息给服务端
        Scanner scanner = new Scanner(System.in);
        String tmp = "";
        while (true) {
            tmp = scanner.nextLine();
            if (success) {  // 昵称设置成功,开始聊天
                // 单聊([消息]@[接收人])
                if (tmp.contains("@")) {
                    tmp = tmp.replace("@", "###");
                }
                write(name + "###" + tmp, socketChannel);
            } else {    // 昵称尚未设置成功,继续设置
                write(tmp, socketChannel);
            }
        }
    }
}

标签:java,通信,client,IOException,msg,import,示列,throws,channel
来源: https://www.cnblogs.com/www9527/p/16089589.html