Java实现非阻塞式服务器的示例代码

作者:低吟不作语 时间:2022-01-29 19:29:11 

1.创建阻塞的服务器

当 ServerSocketChannel 与 SockelChannel 采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多线程

public class EchoServer {
private int port = 8000;
   private ServerSocketChannel serverSocketChannel = null;
   private ExecutorService executorService; //线程池
   private static final int POOL_MULTIPLE = 4; //线程池中工作线程的数目
   public EchoServer() throws IOException {
       //创建一个线程池
       executorService = Executors.newFixedThreadPool(
           Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
       //创建一个ServerSocketChannel对象
       serverSocketChannel = ServerSocketChannel.open();
       //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时,可以顺利绑定相同的端口
       serverSocketChannel.socket().setReuseAddress(true);
       //把服务器进程与一个本地端口绑定
       serverSocketChannel.socket().bind(new InetSocketAddress(port));
       System.out.println("服务器启动");
   }
   public void service() {
       while (true) {
           SocketChannel socketChannel = null;
           try {
               socketChannel = serverSocketChannel.accept();
               //处理客户连接
               executorService.execute(new Handler(socketChannel));
           } catch(IOException e) {
               e.printStackTrace();
           }
       }
   }
   public static void main(String args[])throws IOException {
       new EchoServer().service();
   }
   //处理客户连按
   class Handler implements Runnable {
       private SocketChannel socketChannel;
       public Handler(SocketChannel socketChannel) {
           this.socketChannel = socketChannel;
       }
       public void run() {
           handle(socketChannel);
       }
       public void handle(SocketChannel socketChannel) {
           try {
               //获得与socketChannel关联的Socket对象
               Socket socket = socketChannel.socket();
               System.out.println("接收到客户连接,来自:" + socket.getInetAddress() + ":" + socket.getPort());
               BufferedReader br = getReader(socket);
               PrintWriter pw = getWriter(socket);
               String msg = null;
               while ((msg = br.readLine()) != null) {
                   System.out.println(msg);
                   pw.println(echo(msg));
                   if (msg.equals("bye")) {
                       break;
                   }
               }
           } catch (IOException e) {
               e.printStackTrace();
           } finally {
               try {
                   if(socketChannel != null) {
                       socketChannel.close();
                   } catch (IOException e) {
                       e.printStackTrace();
                   }
               }
           }
       }
   }
   private PrintWriter getWriter(Socket socket) throws IOException {
       OutputStream socketOut = socket.getOutputStream();
       return new PrintWriter(socketOut,true);
   }
   private BufferedReader getReader(Socket socket) throws IOException {
       InputStream socketIn = socket.getInputStream();
       return new BufferedReader(new InputStreamReader(socketIn));
   }
   public String echo(String msg) {
       return "echo:" + msg;
   }
}

2.创建非阻塞的服务器

在非阻塞模式下,EchoServer 只需要启动一个主线程,就能同时处理三件事:

  • 接收客户的连接

  • 接收客户发送的数据

  • 向客户发回响应数据

EchoServer 委托 Selector 来负责监控接收连接就绪事件、读就绪事件和写就绪事件如果有特定事件发生,就处理该事件

// 创建一个Selector对象
selector = Selector.open();
//创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
//可以顺利绑定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服务器进程与一个本地端口绑定
serverSocketChannelsocket().bind(new InetSocketAddress(port));

EchoServer 类的 service() 方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:

public void service() throws IOException {
   serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
   //第1层while循环
   while(selector.select() > 0) {
       //获得Selector的selected-keys集合
       Set readyKeys = selector.selectedKeys();
       Iterator it = readyKeys.iterator();
       //第2层while循环
       while (it.hasNext()) {
           SelectionKey key = null;
           //处理SelectionKey
           try {
               //取出一个SelectionKey
               key = (SelectionKey) it.next();
               //把 SelectionKey从Selector 的selected-key 集合中删除
               it.remove();
               1f (key.isAcceptable()) { 处理接收连接就绪事件; }
               if (key.isReadable()) { 处理读就绪水件; }
               if (key.isWritable()) { 处理写就绪事件; }
           } catch(IOException e) {
               e.printStackTrace();
               try {
                   if(key != null) {
                       //使这个SelectionKey失效
                       key.cancel();
                       //关闭与这个SelectionKey关联的SocketChannel
                       key.channel().close();
                   }
               } catch(Exception ex) {
                   e.printStackTrace();
               }
           }
       }
   }
}
  • 首先由 ServerSocketChannel 向 Selector 注册接收连接就绪事件,如果 Selector 监控到该事件发生,就会把相应的 SelectionKey 对象加入 selected-keys 集合

  • 第一层 while 循环,不断询问 Selector 已经发生的事件,select() 方法返回当前相关事件已经发生的 SelectionKey 的个数,如果当前没有任何事件发生,该方法会阻塞下去,直到至少有一个事件发生。Selector 的 selectedKeys() 方法返回 selected-keys 集合,它存放了相关事件已经发生的 SelectionKey 对象

  • 第二层 while 循环,从 selected-keys 集合中依次取出每个 SelectionKey 对象并从集合中删除,,然后调用 isAcceptable()isReadable() 和 isWritable() 方法判断到底是哪种事件发生了,从而做出相应的处理

2.1处理接收连接就绪事件

if (key.isAcceptable()) {
   //获得与SelectionKey关联的ServerSocketChannel
   ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
   //获得与客户连接的SocketChannel
   SocketChannel socketChannel = (SocketChannel) ssc.accept();
   //把Socketchannel设置为非阻塞模式
   socketChannel.configureBlocking(false);
   //创建一个用于存放用户发送来的数据的级冲区
   ByteBuffer buffer = ByteBuffer.allocate(1024);
   //Socketchannel向Selector注册读就绪事件和写就绪事件
   socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}

2.2处理读就绪事件

public void receive(SelectionKey key) throws IOException {
   //获得与SelectionKey关联的附件
   ByteBuffer buffer = (ByteBuffer) key.attachment();
   //获得与SelectionKey关联的Socketchannel
   SocketChannel socketChannel = (SocketChannel)key.channel();
   //创建一个ByteBuffer用于存放读到的数据
   ByteBuffer readBuff = ByteBuffer.allocate(32);
   socketChannel.read(readBuff);
   readBuff.flip();
   //把buffer的极限设为容量
   buffer.limit(buffer.capacity());
   //把readBuff中的内容拷贝到buffer
   buffer.put(readBuff);
}

2.3处理写就绪事件

public void send(SelectionKey key) throws IOException {
   //获得与SelectionKey关联的ByteBuffer
   ByteBuffer buffer = (ByteBuffer) key.attachment();
   //获得与SelectionKey关联的SocketChannel
   SocketChannel socketChannel = (SocketChannel) key.channel();
   buffer.flip();
   //按照GBK编码把buffer中的字节转换为字符串
   String data = decode(buffer);
   //如果还没有读到一行数据就返回
   if(data.indexOf("\r\n") == -1)
       return;
   //截取一行数据
   String outputData = data.substring(0, data.indexOf("\n") + 1);
   //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
   ByteBuffer outputBuffer = encode("echo:" + outputData);
   //输出outputBuffer的所有字节
   while(outputBuffer,hasRemaining())
       socketChannel.write(outputBuffer);
   //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
   ByteBuffer temp = encode(outputData);
   //把buffer的位置设为temp的极限
   buffer.position(temp.limit()):
   //删除buffer已经处理的数据
   buffer.compact();
   //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
   if(outputData.equals("bye\r\n")) {
       key.cancel();
       socketChannel.close();
   }
}

完整代码如下:

public class EchoServer {
private int port = 8000;
   private ServerSocketChannel serverSocketChannel = null;
   private Selector selector;
   private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
       // 创建一个Selector对象
       selector = Selector.open();
       //创建一个ServerSocketChannel对象
       serverSocketChannel = ServerSocketChannel.open();
       //使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时
       //可以顺利绑定到相同的端口
       serverSocketChannel.socket().setReuseAddress(true);
       //使ServerSocketChannel工作于非阻塞模式
       serverSocketChannel.configureBlocking(false):
       //把服务器进程与一个本地端口绑定
       serverSocketChannelsocket().bind(new InetSocketAddress(port));
   }
   public void service() throws IOException {
       serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
       //第1层while循环
       while(selector.select() > 0) {
           //获得Selector的selected-keys集合
           Set readyKeys = selector.selectedKeys();
           Iterator it = readyKeys.iterator();
           //第2层while循环
           while (it.hasNext()) {
               SelectionKey key = null;
               //处理SelectionKey
               try {
                   //取出一个SelectionKey
                   key = (SelectionKey) it.next();
                   //把 SelectionKey从Selector 的selected-key 集合中删除
                   it.remove();
                   1f (key.isAcceptable()) {
                        //获得与SelectionKey关联的ServerSocketChannel
                       ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                       //获得与客户连接的SocketChannel
                       SocketChannel socketChannel = (SocketChannel) ssc.accept();
                       //把Socketchannel设置为非阻塞模式
                       socketChannel.configureBlocking(false);
                       //创建一个用于存放用户发送来的数据的级冲区
                       ByteBuffer buffer = ByteBuffer.allocate(1024);
                       //Socketchannel向Selector注册读就绪事件和写就绪事件
                       socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
                   }
                   if (key.isReadable()) { receive(key); }
                   if (key.isWritable()) { send(key); }
               } catch(IOException e) {
                   e.printStackTrace();
                   try {
                       if(key != null) {
                           //使这个SelectionKey失效
                           key.cancel();
                           //关闭与这个SelectionKey关联的SocketChannel
                           key.channel().close();
                       }
                   } catch(Exception ex) {
                       e.printStackTrace();
                   }
               }
           }
       }
   }
   public void receive(SelectionKey key) throws IOException {
       //获得与SelectionKey关联的附件
       ByteBuffer buffer = (ByteBuffer) key.attachment();
       //获得与SelectionKey关联的Socketchannel
       SocketChannel socketChannel = (SocketChannel)key.channel();
       //创建一个ByteBuffer用于存放读到的数据
       ByteBuffer readBuff = ByteBuffer.allocate(32);
       socketChannel.read(readBuff);
       readBuff.flip();
       //把buffer的极限设为容量
       buffer.limit(buffer.capacity());
       //把readBuff中的内容拷贝到buffer
       buffer.put(readBuff);
   }
   public void send(SelectionKey key) throws IOException {
       //获得与SelectionKey关联的ByteBuffer
       ByteBuffer buffer = (ByteBuffer) key.attachment();
       //获得与SelectionKey关联的SocketChannel
       SocketChannel socketChannel = (SocketChannel) key.channel();
       buffer.flip();
       //按照GBK编码把buffer中的字节转换为字符串
       String data = decode(buffer);
       //如果还没有读到一行数据就返回
       if(data.indexOf("\r\n") == -1)
           return;
       //截取一行数据
       String outputData = data.substring(0, data.indexOf("\n") + 1);
       //把输出的字符串按照GBK编码转换为字节,把它放在outputBuffer中
       ByteBuffer outputBuffer = encode("echo:" + outputData);
       //输出outputBuffer的所有字节
       while(outputBuffer,hasRemaining())
           socketChannel.write(outputBuffer);
       //把outputData字符审按照GBK编码,转换为字节,把它放在ByteBuffer
       ByteBuffer temp = encode(outputData);
       //把buffer的位置设为temp的极限
       buffer.position(temp.limit()):
       //删除buffer已经处理的数据
       buffer.compact();
       //如果已经输出了字符串“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel
       if(outputData.equals("bye\r\n")) {
           key.cancel();
           socketChannel.close();
       }
   }
   //解码
   public String decode(ByteBuffer buffer) {
       CharBuffer charBuffer = charset.decode(buffer);
       return charBuffer.toStrinq();
   }
   //编码
   public ByteBuffer encode(String str) {
       return charset.encode(str);
   }
   public static void main(String args[])throws Exception {
       EchoServer server = new EchoServer();
       server.service();
   }
}

3.阻塞模式与非阻塞模式混合使用

使用非阻塞模式时,ServerSocketChannel 以及 SocketChannel 都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer 采用一个线程同时完成这些操作

假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能

负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向 Selector 注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作

public class EchoServer {
private int port = 8000;
   private ServerSocketChannel serverSocketChannel = null;
   private Selector selector = null;
   private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
       selector = Selector.open();
       serverSocketChannel = ServerSocketChannel.open();
       serverSocketChannel.socket().setReuseAddress(true);
       serverSocketChannelsocket().bind(new InetSocketAddress(port));
   }
   public void accept() {
       while(true) {
           try {
               SocketChannel socketChannel = serverSocketChannel.accept();
               socketChannel.configureBlocking(false);
               ByteBuffer buffer = ByteBuffer.allocate(1024);
               synchronized(gate) {
                   selector.wakeup();
                   socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
               }
           } catch(IOException e) {
               e.printStackTrace();
           }
       }
   }
   private Object gate=new Object();
   public void service() throws IOException {
       while(true) {
           synchronized(gate){}
           int n = selector.select();
           if(n == 0) continue;
           Set readyKeys = selector.selectedKeys();
           Iterator it = readyKeys.iterator();
           while (it.hasNext()) {
               SelectionKey key = null;
               try {
   it.remove();
                   if (key.isReadable()) {
                       receive(key);
                   }
                   if (key.isWritable()) {
                       send(key);
                   }
               } catch(IOException e) {
                   e.printStackTrace();
                   try {
                       if(key != null) {
                           key.cancel();
                           key.channel().close();
                       }
                   } catch(Exception ex) { e.printStackTrace(); }
               }
           }
       }
   }
   public void receive(SelectionKey key) throws IOException {
       ...
   }
   public void send(SelectionKey key) throws IOException {
       ...
   }
   public String decode(ByteBuffer buffer) {
       ...
   }
   public ByteBuffer encode(String str) {
       ...
   }
   public static void main(String args[])throws Exception {
       final EchoServer server = new EchoServer();
       Thread accept = new Thread() {
           public void run() {
               server.accept();
           }
       };
       accept.start();
server.service();
   }
}

注意一点:主线程的 selector select() 方法和 Accept 线程的 register(...) 方法都会造成阻塞,因为他们都会操作 Selector 对象的共享资源 all-keys 集合,这有可能会导致死锁

导致死锁的具体情形是:Selector 中尚没有任何注册的事件,即 all-keys 集合为空,主线程执行 selector.select() 方法时将进入阻塞状态,只有当 Accept 线程向 Selector 注册了事件,并且该事件发生后,主线程才会从 selector.select() 方法返回。然而,由于主线程正在 selector.select() 方法中阻塞,这使得 Acccept 线程也在 register() 方法中阻塞。Accept 线程无法向 Selector 注册事件,而主线程没有任何事件可以监控,所以这两个线程将永远阻塞下去

为了避免对共享资源的竞争,同步机制使得一个线程执行 register() 时,不允许另一个线程同时执行 select() 方法,反之亦然

来源:https://www.cnblogs.com/Yee-Q/p/17416992.html

标签:Java,非阻塞式,服务器
0
投稿

猜你喜欢

  • Spring Boot接口幂等插件用法示例解析

    2022-04-29 16:47:11
  • 浅谈JVM内存溢出原因和解决思路

    2023-11-23 12:24:15
  • 详解SpringCloud微服务之Rest

    2023-10-20 00:49:36
  • c#委托详解和和示例分享

    2022-10-26 12:29:41
  • Java 中解决Unsupported major.minor version 51.0的问题

    2022-07-22 03:53:08
  • 详解使用Spring Cloud Consul实现服务的注册和发现

    2023-06-08 03:46:23
  • JMeter中的后端监听器的实现

    2022-07-24 17:58:35
  • IDEA连接Mysql数据库的详细图文教程

    2023-10-09 09:51:24
  • 基于java 线程的几种状态(详解)

    2022-08-31 19:51:47
  • Unity3D实现扭动挤压浏览效果

    2022-04-23 22:23:16
  • Android中使用Matrix控制图形变换和制作倒影效果的方法

    2022-11-23 07:35:23
  • java获取当前时间的四种方法代码实例

    2023-11-28 19:22:53
  • spring boot整合RabbitMQ(Direct模式)

    2021-12-16 10:50:22
  • 详解Spring Boot Oauth2缓存UserDetails到Ehcache

    2023-02-26 21:57:12
  • Maven添加Tomcat插件实现热部署代码实例

    2021-12-09 02:03:49
  • 如何将Object类转换为实体类

    2021-11-05 04:45:11
  • C++实现经典24点纸牌益智游戏

    2023-04-22 01:05:02
  • 深入了解Java设计模式之策略模式

    2021-06-24 22:45:56
  • SpringBoot微信消息接口配置详解

    2023-08-23 09:51:21
  • mybatis源码解读之executor包懒加载功能 

    2022-09-17 00:28:05
  • asp之家 软件编程 m.aspxhome.com