Skip to content

network

maggie edited this page Sep 27, 2020 · 2 revisions

Network模块重构

  • 模块介绍

  • 重构任务

  • 重构设计

1. 模块介绍

已满足的职能:

  • 加载配置和证书
  • 初始化连接
  • 监听端口(As Server)
  • 连接配置节点(As Client)
  • 连接管理(nodeID/ip:port 映射)
  • 周期性重连
  • 连接/断连/类型回调注册
  • 消息发送(随机发送/全员广播)

缺陷:

  • 网络层的配置功能不合理
    • 需要知道GroupID才能配置网络。很多情况下用不上GroupID,另外,用户可能并不知道GroupID信息。
    • 配置出错不提示。当调用与Group相关接口时,如果发现GroupID有错,会导致调用失败。但在此之前配置错误并不提示。

新需求:

  • 消息发送(指定节点发送/其他策略注册)。

  • 能够优雅关闭。

  • 允许同时连接多个组。

2. 重构任务

重构任务 类型 解决方法
网络层的配置功能不合理 缺陷 见Channel模块重构点1
网络层与Channel层强耦合,
同时Channel层又与其它模块强耦合。
缺陷 重构点1: 模块划分更明确,通过接口暴露给别的模块使用。
重构点2:网络模块收到的消息,交给业务模块自行处理。
重构点3: 每次只初始化必要的消息处理逻辑。
重构点4: 根据netty的规则实现消息编解码
消息发送(指定节点发送/其他策略注册) 新需求 重构点5: 新增两种的消息传输方式
能够优雅关闭 新需求 重构点6: 优雅关闭
允许同时连接多个组 新需求 见Channel模块重构点2

3. 重构设计

重构点1: 模块划分更明确,通过接口暴露给别的模块使用。

重构前:Network、Channel、AMOP、EventPush等模块全部放在了Channel中,耦合严重,代码很乱。

重构工作:

  • RPC、AMOP等模块与Network模块完全解耦,Network模块通过Channel模块仅暴露两个接口给上层使用。
    • 提供给上层Channel层两个接口:MsgHandler提供配置消息处理逻辑的接口;NetworkService提供发送消息的接口;
    • 该模块依赖于Netty。

重构点2:网络模块收到的消息,交给业务模块自行处理。

重构前: Channel模块会对所有消息进行处理,使得Channel模块跟别的模块耦合太严重,不便维护。

ConnectionCallback.java

public void onMessage(ChannelHandlerContext ctx, ByteBuf message) {
      ...
        if (msg.getType() == ChannelMessageType.AMOP_REQUEST.getType()
            || msg.getType() == ChannelMessageType.AMOP_RESPONSE.getType()
            || msg.getType() == ChannelMessageType.AMOP_MULBROADCAST.getType()) {
          ChannelMessage2 channelMessage = new ChannelMessage2(msg);
          channelMessage.readExtra(message);
          channelService.onReceiveChannelMessage2(ctx, channelMessage);
        } else if (msg.getType() == ChannelMessageType.CHANNEL_RPC_REQUEST.getType()) {
          BcosMessage fiscoMessage = new BcosMessage(msg);
          fiscoMessage.readExtra(message);
          channelService.onReceiveEthereumMessage(ctx, fiscoMessage);
        } else if (msg.getType() == ChannelMessageType.CLIENT_HEARTBEAT.getType()) {
          msg.readExtra(message);
          channelService.onReceiveHeartbeat(ctx, msg);
        } else if (msg.getType() == ChannelMessageType.CLIENT_HANDSHAKE.getType()) {
          BcosMessage fiscoMessage = new BcosMessage(msg);
          fiscoMessage.readExtra(message);
          channelService.onReceiveEthereumMessage(ctx, fiscoMessage);
        } else if (msg.getType() == ChannelMessageType.CLIENT_REGISTER_EVENT_LOG.getType()) 				{
          ChannelMessage2 channelMessage = new ChannelMessage2(msg);
          channelMessage.readExtra(message);
          channelService.onReceiveRegisterEventResponse(ctx, channelMessage);
        } else if (msg.getType() == ChannelMessageType.TRANSACTION_NOTIFY.getType()) {
          BcosMessage fiscoMessage = new BcosMessage(msg);
          fiscoMessage.readExtra(message);
          channelService.onReceiveTransactionMessage(ctx, fiscoMessage);
        }
  ...
}

重构后:我们将Channel拆分, 业务消息交给业务模块自行处理。

主要代码:

public class MsgManager implements MsgHandler {
  map<MsgType,MsgHandler> msgHandlers; // 消息处理逻辑MsgHandler列表
  
  public void MsgManager(){...}
  ...
   // 注册一个MsgHandler
  public void addMsgHandler(MsgType type,MsgHandler handler){
    msgHandlers.put(type,handler);
  }
  
  // 移除一个MsgHandler
  public void removeMsgHandler(MsgType type){
    msgHandlers.remove(type);
  }
  // 设置整个MsgHandler列表
  public void setMsgHandlers(MsgHandler[] handlers){
    this.msgHandlers = handlers;
  }
  // 当收到消息时,根据消息类型,调用相应的业务模块中的MsgHandler处理该消息。
  public void onMessage(ChannelHandlerContext ctx, ByteBuf message){
    MsgType type = getMsgType(message);
    if (msgHandlers.get(type) != null){
      //调用对应模块的msgHandler
      msgHandlers.get(type).onMsg(ctx,message); 
    }
  }
}

重构点3: 每次只初始化必要的消息处理逻辑。

初始化过程:

  1. 启动网络模块时,只初始化必要的消息处理逻辑。

    关键代码:

    class ChannelImp implements Channel{
      private ChannelMsgHandler msgHandler;
      ...
        public void addConnectHandler(MsgHandler handler) {
            msgHandler.addConnectHandler(handler);
        }
    
        public void addEstablishHandler(MsgHandler handler) {
            msgHandler.addEstablishHandler(handler);
        }
    
        public void addMessageHandler(MsgType type, MsgHandler handler) {
            msgHandler.addMessageHandler(type, handler);
        }
        public void addDisconnectHandler(MsgHandler handler) {
            msgHandler.addDisconnectHandler(handler);
        }
    
    }
    
    public class ChannelMsgHandler implements MsgHandler {
      	private List<MsgHandler> msgConnectHandlerList = new CopyOnWriteArrayList<MsgHandler>();
        private List<MsgHandler> msgDisconnectHandleList = new CopyOnWriteArrayList<MsgHandler>();
        private Map<Integer, MsgHandler> msgHandlers = new ConcurrentHashMap<>();
        private List<MsgHandler> msgEstablishHandlerList = new CopyOnWriteArrayList<MsgHandler>();
      	...
      	public void addConnectHandler(MsgHandler handler) {
            msgConnectHandlerList.add(handler);
        }
    
        public void addEstablishHandler(MsgHandler handler) {
            msgEstablishHandlerList.add(handler);
        }
    
        public void addMessageHandler(MsgType type, MsgHandler handler) {
            msgHandlers.put(type.getType(), handler);
        }
    
        public void addDisconnectHandler(MsgHandler handler) {
            msgDisconnectHandleList.add(handler);
        }
      	...
        @Override
        public void onMessage(ChannelHandlerContext ctx, Message msg) {
            ...
            MsgHandler msgHandler = msgHandlers.get(msg.getType().intValue());
          	if (msgHandler != null) {
            	msgHandler.onMessage(ctx, msg);
          	}
            ....
        }
    }
  2. 启动其它模块时,初始化相关的消息处理逻辑。

    当启动RPC模块、AMOP模块、EventPush等模块时,初始化对应的消息处理逻辑MsgHandler,并注册到MsgManager中。

    关键代码:

    class AMOP{
      ...
      public void AMOP(Channel channel, ConfigOption config){
        ...
        
        //注册AMOP模块对应的消息处理逻辑
        channel.addMsgHandler(MsgType.AMOP_REQUEST,amopMsgHandler);
        channel.addMsgHandler(MsgType.AMOP_RESPONSE,amopMsgHandler);
        channel.addMsgHandler(MsgType.AMOP_MULBROADCAST,amopMsgHandler); 
        ...
      }
    }
     

    重构点4: 根据netty的规则实现消息编解码

    重构前:Channel模块自行解决编解码问题,到netty层是ByteBuf类型。

    s

    重构后:

    • 三个子类差别不大,只用Message一种数据结构即可。
  • 继承netty的Encoder和Decoder,在网络层完成编解码。

    关键代码:

public void startConnect() throws SSLException { ... // 装配Handler流水线 bootstrap.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( sslHandler, // SSlHandler new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, -4, 0), //心跳检测,空闲超时则创建空闲超时事件并传递到channelPipeline中,我们这里会主动断开连接。 new IdleStateHandler( idleTimeout, //读空闲 idleTimeout, //写空闲 idleTimeout, //读写空闲 TimeUnit.MILLISECONDS),//时间单位

         //此处加入编解码Handler
         new MessageEncoder(),//在流水线中加入Decoder,实现ChannelOutboundHandler,处理发送的请求
         new MessageDecoder(), //在流水线中加入Decoder,实现ChannelInboundHandler,处理收到的请求
         new ChannelHandler());//处理业务逻辑的Handler,实现ChannelInboundHandler,处理收到的请求
     }
   }

}


#### 重构点5: 新增两种的消息传输方式

**重构前:支持两种交易传输方式,1. 随机地传给一个节点。2. 广播给所有节点**

**重构后:新增两种消息传输方式**

3. **传给一个指定节点。**
* Channel新增接口sendToPeerByID(Message,nodeId, responseCallBack)
4. **以自定义的某种规则选择要传输的节点。**
* Channel新增接口sendToPeerByRule(Message, PeerSelectRule, responseCallBack)

sendToPeerByRule接口实现关键代码:

```java
public Response sendToPeerByRuleWithTimeOut(Message out, PeerSelectRule rule, Options options) {
  Callback callback = new Callback();
asyncSendToPeerByRule(out, rule, callback, options);
  waitResponse(callback, options);
return callback.retResponse;
}
public void asyncSendToPeerByRule(
            Message out, PeerSelectRule rule, ResponseCallback callback, Options options) {
  String target = rule.select(getConnectionInfo());
  asyncSendToPeer(out, target, callback, options);
}

自定义规则,如选择有块高最大的节点:

Class MySelectRule implements PeerSelectRule{
  public String select(List<ConnectionInfo> allConnInfo){
    ConnectInfo heighest; //块高最大的节点
    for (ConnectInfo connInfo: allConnInfo){
      if(heighest == null){
     heighest = connInfo;
				continue;
   }
      if (allConnInfo.blockHeight > heighest.blockHeight){
        heighest = allConnInfo
      }
    }
    return heighest.nodeId
  }
}

重构点6: 优雅关闭

在Network的实现中,新增stop方法。

class NetworkImp implements Network{
  private ConnectionConfig allConnections;
  ...
public void stop(){
    for (ChannelConnection channelConnection:allConnections.getAllChannelConnections()){
         channelConnection.stopListen();
        }
}
}

通过调用EventLoopGroup的shutdownGracefully方法,让Netty优雅退出。Netty优雅退出会做三件事情:

  1. 把NIO线程的状态位设置成ST_SHUTTING_DOWN状态,不再处理新的消息(不允许再对外发送消息);
  2. 退出前的预处理操作:把发送队列中尚未发送或者正在发送的消息发送完、把已经到期或者在退出超时之前到期的定时任务执行完成、把用户注册到NIO线程的退出Hook任务执行完成;
  3. 资源的释放操作:所有Channel的释放、多路复用器的去注册和关闭、所有队列和定时任务的清空取消,最后是NIO线程的退出。
class ChannelConnection{
private EventLoopGroup bossGroup; //负责处理TCP/IP连接
  private EventLoopGroup workerGroup;//负责处理Channel的I/O事件
...
  public void stopListen(){
    if (running) {
      bossGroup.shutdownGracefully();
   workerGroup.shutdownGracefully();
      running = falsereturn;
    }
  }
}

6.4 接口说明

模块主要通过两个接口对外暴露功能:

  1. 处理消息:MsgHandler接口
public interface MsgHandler{
  // 收到消息时处理逻辑
  public void onMessage(ChannelHandlerContext chx, Message msg);
  // SSL握手成功的处理逻辑
  public void onConnect(ChannelHandlerContext chx);
  // 段连时的处理逻辑
  public void onDisconnect(ChannelHandlerContext chx);
}

2. 发送消息:Network接口

public interface Network{
  // 随机发送给几个节点。Message,要发送的消息; numOfPeer,要发送的节点数量,ResponseCallback得到返回值后调用的回调函数
	public void sendToRandomPeers(Message out, int numOfPeer, ResponseCallback responseCallback);
  // 发送给指定节点。NodeId,要发送的节点ID
  public void sendToPeerById(Message out, String peerIpPort, ResponseCallback responseCallback);
  // 广播给所有连接的节点
	public void broadcast(Message out,ResponseCallback responseCallback);
  // 根据某种规则发给指定的一个节点
	public void sendToPeerWithRule(Message out,PeerSelectRule rule,ResponseCallback responseCallback);
  // 获取当前的连接信息
	public List<ConnectionInfo> getAllConnectionInfo();
}

此外还有一个接口,和一个类在处理消息和发送消息中需要用到:

  1. 自定义节点选择规则:PeerSelectRule
public interface PeerSelectRule{
  // 节点选择逻辑
  public String select(List<ConnectionInfo> allConnInfo);
}
  1. 自定义节点请求响应的回调:ResponseCallBack
public interface ResponseCallBack{
  private Timeout timeout;
  
  public abstract void onResponse(Response response);
  public void onTimeout() {
        Response response = new Response();
        response.setErrorCode(ChannelMessageError.MESSAGE_TIMEOUT.getError());
        response.setErrorMessage("Processing bcos message timeout");
        response.setContent("");
        onResponse(response);
    }
  public Timeout getTimeout() { return timeout; }

  public void setTimeout(Timeout timeout) { this.timeout = timeout;}
}

Clone this wiki locally