1 FDBus简介 FDBus 基于 Socket (tcp 和 Unix domain) 之上的IPC机制, 采用 Google protobuf 做序列化和反序列化。 FDBus还支持字符串形
FDBus 基于 Socket (tcp 和 Unix domain) 之上的IPC机制, 采用 Google protobuf 做序列化和反序列化。 FDBus还支持字符串形式的名字作为server地址。通过 name server 自动为 server 分配Unix domain 地址和 TCP 端口号, 实现 client 和server 之间用服务名字寻址。一句话描述:FDBus (Fast Distributed Bus) 是一种 IPC 机制, 用于进程间通信。
特点:
分布式:基于TCP socket和Unix Domain socket(UDS),既可用于本地IPC,也支持网络主机之间的IPC;
跨平台:目前已在windows,linux和QNX上验证;
高性能:点对点直接通信,不通过中央Hub或Broker转发;
安全性:能够为server的方法调用也事件广播配置不同等级的访问权限,只有权限足够高的client才能特点方法和接收特定事件;
服务名解析:server地址用名字标识,通过name server注册服务和解析名字,从而server可以在全网络任意部署。
我们知道车载Android设备不止一台,如果我们要在中控和仪表盘这两个Android系统设备之间要进行通信怎么实现呢?同学们思考一下
不同Android设备间的通信已经无法通过Binder、Broadcast、Service来完成,所以我们是否可以参考FDBus通过Socket来进行不同Android设备的通讯呢?下面我们来看看大致思路:
通过Socket+动态代理
实现简单的跨设备通讯
:
YFDBus库中分为IPCNameManager类:实现客户端Socket的管理,动态代理的相关的逻辑;NameServerManager类:服务端的Socket管理。
2.Server模块为服务端,依赖YFDBus库,初始化时服务注册到IPCNameManager中,提供自定义的服务。接收到客户端Socket发送过来的消息后,将该消息解析做出响应返回对应的数据给客户端。
3.Client模块为客户端,依赖YFDBus库,通过Socket实现对Server端的服务发现,确实是数据传输后使用动态代理解析来自Server的响应数据,得到的服务对象。然后通过该对象进行服务调用,确实就是Socket发送数据到Server做处理然后给出响应,来实现服务的发现和调用。
最简单的理解就是:
1.通过Socket客户端向服务端发送消息,实现服务注册
、服务发现
、服务调用
。
2.通过动态代理的方式获取到服务端的类及方法在客户端进行调用。
下面我们来看看关键的代码:
客户端INameServer.java,同服务端的INameServer
目的是两边保持一致,在对应模块直接调用。
package com.yvan.nameserver.interfaces;import com.yvan.yfdbus.annotion.ClassId;@ClassId("com.yvan.nameserver.NameServer")public interface INameServer { SpeedState getSpeed(String userId, String token);}
客户端SpeedState.java类,是传输的数据类
package com.yvan.nameserver.interfaces;public class SpeedState { int speed; int energy; public SpeedState() { } public SpeedState(int speed, int energy) { this.speed = speed; this.energy = energy; } public int getSpeed() { return speed; } public void setSpeed(int speed) { this.speed = speed; } public int getEnergy() { return energy; } public void setEnergy(int energy) { this.energy = energy; }}
客户端页面展示ClientActivity.java,主要有:连接服务端
、服务发现
、服务调用
package com.yvan.client;import androidx.appcompat.app.AppCompatActivity;import android.os.Bundle;import android.util.Log;import android.view.View;import com.yvan.clent.R;import com.yvan.nameserver.interfaces.INameServer;import com.yvan.nameserver.interfaces.SpeedState;import com.yvan.yfdbus.IPCNameManager;import com.yvan.yfdbus.NameConfig;public class ClientActivity extends AppCompatActivity { private static final String TAG = ClientActivity.class.getSimpleName(); private INameServer iNameServer; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); IPCNameManager.getInstance().init(this); } public void connect(View view) { IPCNameManager.getInstance().connect(NameConfig.IP); } public void findServer(View view) { iNameServer = IPCNameManager.getInstance().getInstance(INameServer.class, "小明的车", "token666"); Log.i(TAG, "findServer: 服务发现 发现远端设备的服务 "); } public void invekeServer(View view) { Log.i(TAG, "invekeServer: 服务调用 " + iNameServer); new Thread() { @Override public void run() { SpeedState speedState = iNameServer.getSpeed("小明的车", "token666"); Log.i(TAG, "invekeServer: 结果远端设备的速度: " + speedState.getSpeed() + " 远端设备的电池容量: " + speedState.getEnergy()); } }.start(); }}
INameServer.java、SpeedState.java同客户端,这里不重复贴出来了。
服务端NameServer.java,主要是对INameServer的实现类
。
package com.yvan.nameserver;import android.util.Log;import com.yvan.nameserver.interfaces.INameServer;import com.yvan.nameserver.interfaces.SpeedState;public class NameServer implements INameServer { private final static String TAG = NameServer.class.getSimpleName(); private static NameServer sInstance = null; public static synchronized NameServer getInstance(String userId, String token) { Log.i(TAG, "getInstance:userId " + userId + " token " + token); if (sInstance == null) { sInstance = new NameServer(); } return sInstance; } @Override public SpeedState getSpeed(String userId, String token) { return new SpeedState(180, 55); }}
服务端页面展示ServerActivity.java,用于管理服务的启动
、服务注册
package com.yvan;import androidx.appcompat.app.AppCompatActivity;import android.content.Intent;import android.os.Bundle;import com.yvan.nameserver.NameServer;import com.yvan.nameserver.R;import com.yvan.yfdbus.IPCNameManager;import com.yvan.yfdbus.NameServerManager;public class ServerActivity extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // 管理服务的启动 Intent intent = new Intent(this, NameServerManager.class); startService(intent); // 服务注册 IPCNameManager.getInstance().reGISter(NameServer.class); }}
重点类IPCNameManager.java用于IPC通讯,服务注册、服务发现、服务调用的入口,管理着服务及动态代理。
package com.yvan.yfdbus;import android.content.Context;import android.text.TextUtils;import android.util.Log;import com.google.gson.Gson;import com.yvan.yfdbus.annotion.ClassId;import com.yvan.yfdbus.request.RequestBean;import com.yvan.yfdbus.request.RequestParamter;import org.java_websocket.client.WEBSocketClient;import org.java_websocket.handshake.ServerHandshake;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.URI;import java.NIO.ByteBuffer;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class IPCNameManager { private static final String TAG = IPCNameManager.class.getSimpleName(); private NameCenter nameCenter = new NameCenter(); private static final Gson GSON = new Gson(); private static final IPCNameManager ourInstance = new IPCNameManager(); private Context sContext; public static IPCNameManager getInstance() { return ourInstance; } public NameCenter getNameCenter() { return nameCenter; } public final Lock lock = new ReentrantLock(); private String responce = null; public void init(Context context) { sContext = context.getApplicationContext(); } public void register(Class<?> clazz) { nameCenter.register(clazz); } MyWebSocketClient myWebSocketClient; public void connect(String ip) { new Thread() { @Override public void run() { connectSocketServer(ip); } }.start(); } public <T> T getInstance(Class<T> clazz, Object... parameters) { // 实例化 服务发现 sendRequest(clazz, null, parameters, NameServerManager.TYPE_GET); return getProxy(clazz); } public <T> String sendRequest(Class<T> clazz, Method method, Object[] parameters, int type) { // socket 协议 RequestParamter[] requestParamters = null; if (parameters != null && parameters.length > 0) { requestParamters = new RequestParamter[parameters.length]; for (int i = 0; i < parameters.length; i++) { Object parameter = parameters[i]; String parameterClassName = parameter.getClass().getName(); String parameterValue = GSON.toJSON(parameter); RequestParamter requestParamter = new RequestParamter(parameterClassName, parameterValue); requestParamters[i] = requestParamter; } } String className = clazz.getAnnotation(ClassId.class).value(); String methodName = method == null ? "getInstance" : method.getName(); RequestBean requestBean = new RequestBean(type, className, methodName, requestParamters); String request = GSON.tojson(requestBean); synchronized (lock) { try { // 客户端给服务端发消息 myWebSocketClient.send(request.getBytes()); Log.i(TAG, "sendRequest: 锁住线程:" + Thread.currentThread().getName()); lock.wait(); Log.i(TAG, "sendRequest: 锁住成功"); } catch (Exception e) { e.printStackTrace(); } } Log.i(TAG, "sendRequest: 唤醒线程"); if (!TextUtils.isEmpty(responce)) { String data1 = responce; responce = null; return data1; } return null; } private <T> T getProxy(Class<T> clazz) { ClassLoader classLoader = sContext.getClassLoader(); return (T) Proxy.newProxyInstance(classLoader, new Class<?>[]{clazz}, new NameServerInvokeHandler(clazz)); } private void connectSocketServer(String ip) { try { URI url = new URI(ip + NameConfig.IP_PORT); myWebSocketClient = new MyWebSocketClient(url); myWebSocketClient.connect(); Log.i(TAG, "connect: 连接 "); } catch (Exception e) { e.printStackTrace(); } } class MyWebSocketClient extends WebSocketClient { public MyWebSocketClient(URI serverUri) { super(serverUri); Log.i(TAG, "MyWebSocketClient create serverUri:" + serverUri); } @Override public void onOpen(ServerHandshake handshakedata) { Log.i(TAG, "onOpen: "); } @Override public void onMessage(String message) { Log.i(TAG, "onMessage: " + message); } public void onMessage(ByteBuffer message) { byte[] buf = new byte[message.remaining()]; message.get(buf); String data = new String(buf); Log.i(TAG, "客户端收到信息 onMessage: " + data); responce = data; synchronized (lock) { try { lock.notify(); } catch (Exception e) { Log.e(TAG, "onMessage: 锁异常: " + e.getMessage()); } } } @Override public void onClose(int code, String reason, boolean remote) { Log.i(TAG, "onClose: code: " + code + ", reason: " + reason + ", remote: " + remote); } @Override public void onError(Exception ex) { Log.e(TAG, "onError: " + ex.getMessage()); } }}
动态代理类NameServerInvokeHandler.java
package com.yvan.yfdbus;import android.text.TextUtils;import android.util.Log;import com.google.gson.Gson;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;public class NameServerInvokeHandler implements InvocationHandler { private static final String TAG = NameServerInvokeHandler.class.getSimpleName(); private static final Gson GSON = new Gson(); private Class clazz; public NameServerInvokeHandler(Class clazz) { this.clazz = clazz; } @Override public Object invoke(Object o, Method method, Object[] objects) throws Throwable { if (method.getName().contains("toString")) { return "proxy"; } Log.i(TAG, "invoke: " + method.getName()); // 发送请求 String data = IPCNameManager.getInstance().sendRequest(clazz, method, objects, NameServerManager.TYPE_INVOKE); Log.i(TAG, "invoke: data " + data); if (!TextUtils.isEmpty(data)) { Object object = GSON.fromJson(data, method.getReturnType()); return object; } return null; }}
服务管理类NameServerManager.java,用于处理服务发现
和处理服务调用
给客户端响应
package com.yvan.yfdbus;import android.app.Service;import android.content.Intent;import android.os.IBinder;import android.util.Log;import androidx.annotation.Nullable;import com.google.gson.Gson;import com.yvan.yfdbus.request.RequestBean;import com.yvan.yfdbus.request.RequestParamter;import org.java_websocket.WebSocket;import org.java_websocket.handshake.ClientHandshake;import org.java_websocket.server.WebSocketServer;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.nio.ByteBuffer;public class NameServerManager extends Service { private static final String TAG = NameServerManager.class.getSimpleName(); private final Gson gson = new Gson(); //服务发现 public static final int TYPE_GET = 1; //服务调用 public static final int TYPE_INVOKE = 2; @Nullable @Override public IBinder onBind(Intent intent) { return null; } @Override public void onCreate() { super.onCreate(); Log.i(TAG, "onCreate: 开始 "); new Thread(new TcpServer()).start(); } private class TcpServer implements Runnable { @Override public void run() { // 服务端开启SocketServer,等待客户端的请求 SocketServer socketServer = new SocketServer(); socketServer.start(); } } class SocketServer extends WebSocketServer { public SocketServer() { super(new InetSocketAddress(NameConfig.IP_PORT)); } @Override public void onOpen(WebSocket conn, ClientHandshake handshake) { Log.i(TAG, "onOpen: " + conn.getRemoteSocketAddress().getAddress().getHostAddress()); } @Override public void onClose(WebSocket conn, int code, String reason, boolean remote) { Log.i(TAG, "onClose: " + conn.getRemoteSocketAddress().getAddress().getHostAddress() + ", code:" + code + ", reason:" + reason + ", remote:" + remote); } @Override public void onMessage(WebSocket conn, String message) { } @Override public void onMessage(WebSocket conn, ByteBuffer message) { Log.i(TAG, "onMessage: " + conn.getRemoteSocketAddress().getAddress().getHostAddress()); byte[] buf = new byte[message.remaining()]; message.get(buf); String request = new String(buf); Log.i(TAG, "onMessage 接收到客户端消息: " + request); String responce = dealRequest(request); Log.i(TAG, "onMessage 向客户端发送数据: " + responce); // 服务端 向 客户端 发送响应 conn.send(responce.getBytes()); } @Override public void onError(WebSocket conn, Exception ex) { Log.e(TAG, "onError: " + conn.getRemoteSocketAddress().getAddress().getHostAddress() + ", ex:" + ex); } @Override public void onStart() { Log.i(TAG, "onStart: "); } } String dealRequest(String request) { RequestBean requestBean = gson.fromJson(request, RequestBean.class); int type = requestBean.getType(); //服务发现, 服务初始化 switch (type) { case TYPE_INVOKE: Object object = IPCNameManager.getInstance().getNameCenter().getObject(requestBean.getClassName()); Method tempMethod = IPCNameManager.getInstance().getNameCenter().getMethod(requestBean); Object[] mParameters = makeParameterObject(requestBean); try { Object result = tempMethod.invoke(object, mParameters); String data = gson.toJson(result); return data; } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } break; case TYPE_GET: Method method = IPCNameManager.getInstance().getNameCenter().getMethod(requestBean); Object[] parameters = makeParameterObject(requestBean); try { Object object1 = method.invoke(null, parameters); IPCNameManager.getInstance().getNameCenter().putObject(requestBean.getClassName(), object1); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return "success"; } return null; } private Object[] makeParameterObject(RequestBean requestBean) { Object[] mParameters = null; RequestParamter[] requestParamters = requestBean.getRequestParamters(); if (requestParamters != null && requestParamters.length > 0) { mParameters = new Object[requestBean.getRequestParamters().length]; for (int i = 0; i < requestParamters.length; i++) { RequestParamter requestParamter = requestParamters[i]; Class<?> clazz = IPCNameManager.getInstance().getNameCenter().getClassType(requestParamter.getParameterClassName()); mParameters[i] = gson.fromJson(requestParamter.getParameterValue(), clazz); } } else { mParameters = new Object[0]; } return mParameters; }}
NameCenter.java类个方法管理中心
package com.yvan.yfdbus;import com.yvan.yfdbus.request.RequestBean;import com.yvan.yfdbus.request.RequestParamter;import java.lang.reflect.Method;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ConcurrentHashMap;public class NameCenter { private ConcurrentHashMap<String, Class<?>> mClaSSMap; private ConcurrentHashMap<String, ConcurrentHashMap<String, Method>> mAllMethodMap; private final ConcurrentHashMap<String, Object> mInstanceObjectMap; public NameCenter() { mClassMap = new ConcurrentHashMap<String, Class<?>>(); mAllMethodMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, Method>>(); mInstanceObjectMap = new ConcurrentHashMap<String, Object>(); } public void register(Class clazz) { mClassMap.put(clazz.getName(), clazz); Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { ConcurrentHashMap<String, Method> map = mAllMethodMap.get(clazz.getName()); if (map == null) { map = new ConcurrentHashMap<String, Method>(); mAllMethodMap.put(clazz.getName(), map); } // java重载 方法名+参数 String key = getMethodParameters(method); map.put(key, method); } } public void putObject(String className, Object instance) { mInstanceObjectMap.put(className, instance); } public Object getObject(String className) { return mInstanceObjectMap.get(className); } public Method getMethod(RequestBean requestBean) { ConcurrentHashMap<String, Method> map = mAllMethodMap.get(requestBean.getClassName()); if (map != null) { String key = getMethodParameters(requestBean); return map.get(key); } return null; } public Class<?> getClassType(String parameterClassName) { try { Class clazz = Class.forName(parameterClassName); return clazz; } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } public static String getMethodParameters(RequestBean requestBean) { List<String> parameterClassName = new ArrayList<>(); for (RequestParamter c : requestBean.getRequestParamters()) { parameterClassName.add(c.getParameterClassName()); } return getMethodParameters(requestBean.getMethodName(), parameterClassName); } public static String getMethodParameters(Method method) { List<String> parameterClassName = new ArrayList<>(); for (Class<?> c : method.getParameterTypes()) { parameterClassName.add(c.getName()); } return getMethodParameters(method.getName(), parameterClassName); } public static String getMethodParameters(String methodName, List<String> parameterClassName) { // 方法签名 StringBuilder result = new StringBuilder(); result.append(methodName); int size = parameterClassName.size(); if (size == 0) { return result.toString(); } for (int i = 0; i < size; ++i) { result.append("-").append(parameterClassName.get(i)); } return result.toString(); }}
项目实现目的:跨设备通信
项目实现方案:Socket实现IPC通讯
、动态代理实现类方法的调用
如果对您有帮助,请点赞收藏加关注,别忘了GitHub上给个Star喔。
来源地址:https://blog.csdn.net/u010687761/article/details/130230661
--结束END--
本文标题: 【Android车载系列】第9章 车载通信-Socket实现IPC通信机制(实现仿FDBus效果)
本文链接: https://lsjlt.com/news/391492.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-01-21
2023-10-28
2023-10-28
2023-10-27
2023-10-27
2023-10-27
2023-10-27
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0