返回顶部
首页 > 资讯 > 服务器 >基于Java NIO的即时聊天服务器模型怎么实现
  • 226
分享到

基于Java NIO的即时聊天服务器模型怎么实现

2023-06-17 12:06:21 226人浏览 薄情痞子
摘要

这篇文章主要讲解了“基于Java NIO的即时聊天服务器模型怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“基于Java Nio的即时聊天服务器模型怎么实现”吧!废话不多说,关于NIO

这篇文章主要讲解了“基于Java NIO的即时聊天服务器模型怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“基于Java Nio的即时聊天服务器模型怎么实现”吧!

废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:

JSONParser

json的解析类,随便封装了下,使用的最近比较火的fastjson

public class JsonParser {          private static JSONObject mJson;          public synchronized static String get(String json,String key) {         mJson = JSON.parseObject(json);         return mJson.getString(key);     } }

Main

入口,不解释

public class Main {      public static void main(String... args) {         new SeekServer().start();     } }

Log

public class Log {      public static void i(Object obj) {         System.out.println(obj);     }     public static void e(Object e) {         System.err.println(e);     } }

SeekServer:

服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的。

public class SeekServer extends Thread{     private final int ACCPET_PORT = 55555;     private final int TIME_OUT = 1000;     private Selector mSelector = null;     private ServerSocketChannel mSocketChannel = null;     private ServerSocket mServerSocket = null;     private InetSocketAddress mAddress = null;          public SeekServer() {         long sign = System.currentTimeMillis();         try {             mSocketChannel = ServerSocketChannel.open();             if(mSocketChannel == null) {                 System.out.println("can't open server socket channel");             }             mServerSocket = mSocketChannel.socket();             mAddress = new InetSocketAddress(ACCPET_PORT);             mServerSocket.bind(mAddress);             Log.i("server bind port is " + ACCPET_PORT);             mSelector = Selector.open();             mSocketChannel.configureBlocking(false);             SelectionKey key = mSocketChannel.reGISter(mSelector, SelectionKey.OP_ACCEPT);             key.attach(new Acceptor());                          //检测Session状态             Looper.getInstance().loop();                          //开始处理Session             SessionProcessor.start();                          Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!");         } catch (ClosedChannelException e) {             Log.e(e.getMessage());         } catch (IOException e) {             Log.e(e.getMessage());         }      }          public void run() {         Log.i("server is listening...");         while(!Thread.interrupted()) {             try {                 if(mSelector.select(TIME_OUT) > 0) {                     Set<SelectionKey> keys = mSelector.selectedKeys();                     Iterator<SelectionKey> iterator = keys.iterator();                     SelectionKey key = null;                     while(iterator.hasNext()) {                         key = iterator.next();                         Handler at = (Handler) key.attachment();                         if(at != null) {                             at.exec();                         }                         iterator.remove();                     }                 }             } catch (IOException e) {                 Log.e(e.getMessage());             }         }     }      class Acceptor extends Handler{          public void exec(){             try {                 SocketChannel sc = mSocketChannel.accept();                 new Session(sc, mSelector);             } catch (ClosedChannelException e) {                 Log.e(e);             } catch (IOException e) {                 Log.e(e);             }         }     } }

Handler:

只有一个抽象方法exec,Session将会继承它。

public abstract class Handler {          public abstract void exec(); }

Session:

封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。

public class Session extends Handler{      private SocketChannel mChannel;     private SelectionKey  mKey;     private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);       private Charset charset = Charset.forName("UTF-8");     private CharsetDecoder mDecoder = charset.newDecoder();     private CharsetEncoder mEncoder = charset.newEncoder();     private long lastPant;//最后活动时间     private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间     private String key;          private String sendData = "";     private String receiveData = null;          public static final int READING = 0,SENDING = 1;     int mState = READING;          public Session(SocketChannel socket, Selector selector) throws IOException {         this.mChannel = socket;         mChannel = socket;         mChannel.configureBlocking(false);         mKey = mChannel.register(selector, 0);         mKey.attach(this);         mKey.interestOps(SelectionKey.OP_READ);         selector.wakeup();         lastPant = Calendar.getInstance().getTimeInMillis();     }          public String getReceiveData() {         return receiveData;     }          public void clear() {         receiveData = null;     }      public void setSendData(String sendData) {         mState = SENDING;         mKey.interestOps(SelectionKey.OP_WRITE);         this.sendData = sendData + "\n";     }      public boolean isKeekAlive() {         return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();     }          public void setAlive() {         lastPant = Calendar.getInstance().getTimeInMillis();     }               public void distroy() {         try {             mChannel.close();             mKey.cancel();         } catch (IOException e) {}     }          @Override     public synchronized void exec() {         try {             if(mState == READING) {                 read();             }else if(mState == SENDING) {                 write();             }         } catch (IOException e) {             SessionManager.remove(key);             try {                 mChannel.close();             } catch (IOException e1) {                 Log.e(e1);             }             mKey.cancel();         }     }          public void read() throws IOException{         mRreceiveBuffer.clear();         int sign = mChannel.read(mRreceiveBuffer);         if(sign == -1) { //客户端连接关闭             mChannel.close();             mKey.cancel();         }         if(sign > 0) {             mRreceiveBuffer.flip();             receiveData = mDecoder.decode(mRreceiveBuffer).toString();             setAlive();             setSign();             SessionManager.addSession(key, this);         }     }          private void setSign() {         //设置当前Session的Key         key = JsonParser.get(receiveData,"imei");         //检测消息类型是否为心跳包 //        String type = jo.getString("type"); //        if(type.equals("HEART_BEAT")) { //            setAlive(); //        }     }                    public void write() {         try {             mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));             sendData = null;             mState = READING;             mKey.interestOps(SelectionKey.OP_READ);         } catch (CharactercodingException e) {             e.printStackTrace();         } catch (IOException e) {             try {                 mChannel.close();             } catch (IOException e1) {                 Log.e(e1);             }         }     } }

SessionManager:

将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,
封装了一些操作Session的方法例如get,remove等。

public class SessionManager {      private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();          public static void addSession(String key,Session session) {         sessions.put(key, session);     }         public static Session getSession(String key) {         return sessions.get(key);     }          public static Set<String> getSessionKeys() {         return sessions.keySet();     }          public static int getSessionCount() {         return sessions.size();     }          public static void remove(String[] keys) {         for(String key:keys) {             if(sessions.containsKey(key)) {                 sessions.get(key).distroy();                 sessions.remove(key);             }         }     }     public static void remove(String key) {         if(sessions.containsKey(key)) {             sessions.get(key).distroy();             sessions.remove(key);         }     } }

SessionProcessor

里面使用了jdk自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,javaweb里到处都是request和response)。

public class SessionProcessor implements Runnable{          private static Runnable processor = new SessionProcessor();     private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());     public static void start() {         new Thread(processor).start();     }          @Override     public void run() {         while(true) {             Session tmp = null;             for(String key:SessionManager.getSessionKeys()) {                 tmp = SessionManager.getSession(key);                 //处理Session未处理的请求                 if(tmp.getReceiveData() != null) {                     pool.execute(new Process(tmp));                 }             }             try {                 Thread.sleep(10);             } catch (InterruptedException e) {                 Log.e(e);             }         }     }          class Process implements Runnable {          private SocketRequest request;         private SocketResponse response;                  public Process(Session session) {             //将Session封装成Request和Response             request = new SocketRequest(session);             response = new SocketResponse(session);         }                  @Override         public void run() {             new RequestTransfORM().transfer(request, response);         }     }  }

RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)

public class RequestTransform {      public void transfer(SocketRequest request,SocketResponse response) {         String action = request.getValue("action");         String handlerName = request.getValue("handler");         //根据Session的请求类型,让不同的类方法去处理         try {             Class<?> c= Class.forName("com.seek.server.handler." + handlerName);             Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};             Method method=c.getMethod(action,arg);             method.invoke(c.newInstance(), new Object[]{request,response});         } catch (Exception e) {             e.printStackTrace();         }     } }

SocketRequest和SocketResponse

public class SocketRequest {      private Session mSession;     private String  mReceive;          public SocketRequest(Session session) {         mSession = session;         mReceive = session.getReceiveData();         mSession.clear();     }          public String getValue(String key) {         return JsonParser.get(mReceive, key);     }          public String getQueryString() {         return mReceive;     } }
public class SocketResponse {        private Session mSession;      public SocketResponse(Session session) {          mSession = session;      }            public void write(String msg) {          mSession.setSendData(msg);      }  }

最后则是两个处理请求的Handler

public class UserHandler {      public void login(SocketRequest request,SocketResponse response) {         System.out.println(request.getQueryString());         //TODO: 处理用户登录         response.write("你肯定收到消息了");     } }
public class MessageHandler {     public void send(SocketRequest request,SocketResponse response) {         System.out.println(request.getQueryString());         //消息发送         String key = request.getValue("imei");         Session session = SessionManager.getSession(key);         new SocketResponse(session).write(request.getValue("sms"));     } }

还有个监测是否超时的类Looper,定期去删除Session

public class Looper extends Thread{     private static Looper looper = new Looper();     private static boolean isStart = false;     private final int INTERVAL = 1000 * 60 * 5;     private Looper(){}     public static Looper getInstance() {         return looper;     }          public void loop() {         if(!isStart) {             isStart = true;             this.start();         }     }          public void run() {         Task task = new Task();         while(true) {             //Session过期检测             task.checkState();             //心跳包检测             //task.sendAck();             try {                 Thread.sleep(INTERVAL);             } catch (InterruptedException e) {                 Log.e(e);             }         }     } }
public class Task {     public void checkState() {         Set<String> keys = SessionManager.getSessionKeys();         if(keys.size() == 0) {             return;         }         List<String> removes = new ArrayList<String>();         Iterator<String> iterator = keys.iterator();         String key = null;         while(iterator.hasNext()) {             key = iterator.next();             if(!SessionManager.getSession(key).isKeekAlive()) {                 removes.add(key);             }        }         if(removes.size() > 0) {             Log.i("sessions is time out,remove " + removes.size() + "session");         }         SessionManager.remove(removes.toArray(new String[removes.size()]));     }          public void sendAck() {         Set<String> keys = SessionManager.getSessionKeys();         if(keys.size() == 0) {             return;         }         Iterator<String> iterator = keys.iterator();         while(iterator.hasNext()) {             iterator.next();             //TODO 发送心跳包         }     } }

注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,因为一直在JavaWEB上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -!

客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了。

感谢各位的阅读,以上就是“基于Java NIO的即时聊天服务器模型怎么实现”的内容了,经过本文的学习后,相信大家对基于Java NIO的即时聊天服务器模型怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

--结束END--

本文标题: 基于Java NIO的即时聊天服务器模型怎么实现

本文链接: https://lsjlt.com/news/288807.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 基于Java NIO的即时聊天服务器模型怎么实现
    这篇文章主要讲解了“基于Java NIO的即时聊天服务器模型怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“基于Java NIO的即时聊天服务器模型怎么实现”吧!废话不多说,关于NIO...
    99+
    2023-06-17
  • Java基于NIO怎么实现聊天室功能
    Java基于NIO怎么实现聊天室功能,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。Sever端package com.qst.one;import java...
    99+
    2023-06-21
  • java怎么实现即时聊天
    在Java中,可以使用Socket来实现即时聊天功能。具体步骤如下:1. 创建一个ServerSocket对象来监听客户端连接请求。...
    99+
    2023-10-09
    java
  • 基于websocket的聊天功能怎么实现
    本篇内容主要讲解“基于websocket的聊天功能怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“基于websocket的聊天功能怎么实现”吧!   一...
    99+
    2024-04-02
  • Python基于Google Bard怎么实现交互式聊天机器人
    这篇文章主要介绍“Python基于Google Bard怎么实现交互式聊天机器人”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Python基于Google Bard怎么实现交互式...
    99+
    2023-07-05
  • 怎么基于 Knative Serverless 技术实现天气服务
    本篇文章为大家展示了怎么基于 Knative Serverless 技术实现天气服务,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。提到天气预报服务,我们第一反应是很简单的一个服务啊,目前网上有大把的...
    99+
    2023-06-02
  • 基于ChatGPT+SpringBoot实现智能聊天AI机器人接口并上线至服务器的方法
    目录最终接口效果演示ChatGPT介绍SpringBoot介绍构建SpringBoot项目Post请求解析接口控制类打包发布接口到服务器 ChatGPT是最近很热门的AI智能聊天机器...
    99+
    2023-02-16
    ChatGPT SpringBoot打造智能聊天AI机器人接口 ChatGPT聊天AI机器人
  • 怎么用Python实现多任务版的udp聊天器
    本篇内容主要讲解“怎么用Python实现多任务版的udp聊天器”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么用Python实现多任务版的udp聊天器”吧!一、案例示例二、案例说明编写一个有2...
    99+
    2023-06-20
  • 怎么使用Python实现多任务版的udp聊天器
    这篇“怎么使用Python实现多任务版的udp聊天器”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“怎么使用Python实现多...
    99+
    2023-06-28
  • 基于Python怎么实现云服务器的CDN域名远程鉴权配置
    今天小编给大家分享一下基于Python怎么实现云服务器的CDN域名远程鉴权配置的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。...
    99+
    2023-06-30
  • 基于CRF序列标注的中文依存句法分析器的Java实现是怎么样的
    这篇文章给大家介绍基于CRF序列标注的中文依存句法分析器的Java实现是怎么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。这是一个基于CRF的中文依存句法分析器,内部CRF模型的特征函数采用 双数组Trie树(Do...
    99+
    2023-06-02
  • Java中的多线程回显服务器怎么利用Socket实现
    Java中的多线程回显服务器怎么利用Socket实现?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。具体如下:需要两个类,一个是EchoServer,代表服务器。另外一个是Ech...
    99+
    2023-05-31
    java socket 多线程
  • 关于设置sql server 2008服务器属性时出现的无法加载xplog70.dll该怎么办
    这篇文章将为大家详细讲解有关关于设置sql server 2008服务器属性时出现的无法加载xplog70.dll该怎么办,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作