-
Notifications
You must be signed in to change notification settings - Fork 62
network
-
模块介绍
-
重构任务
-
重构设计
已满足的职能:
- 加载配置和证书
- 初始化连接
- 监听端口(As Server)
- 连接配置节点(As Client)
- 连接管理(nodeID/ip:port 映射)
- 周期性重连
- 连接/断连/类型回调注册
- 消息发送(随机发送/全员广播)
缺陷:
- 网络层的配置功能不合理
- 需要知道GroupID才能配置网络。很多情况下用不上GroupID,另外,用户可能并不知道GroupID信息。
- 配置出错不提示。当调用与Group相关接口时,如果发现GroupID有错,会导致调用失败。但在此之前配置错误并不提示。
新需求:
-
消息发送(指定节点发送/其他策略注册)。
-
能够优雅关闭。
-
允许同时连接多个组。
| 重构任务 | 类型 | 解决方法 |
|---|---|---|
| 网络层的配置功能不合理 | 缺陷 | 见Channel模块重构点1 |
| 网络层与Channel层强耦合, 同时Channel层又与其它模块强耦合。 |
缺陷 | 重构点1: 模块划分更明确,通过接口暴露给别的模块使用。 重构点2:网络模块收到的消息,交给业务模块自行处理。 重构点3: 每次只初始化必要的消息处理逻辑。 重构点4: 根据netty的规则实现消息编解码 |
| 消息发送(指定节点发送/其他策略注册) | 新需求 | 重构点5: 新增两种的消息传输方式 |
| 能够优雅关闭 | 新需求 | 重构点6: 优雅关闭 |
| 允许同时连接多个组 | 新需求 | 见Channel模块重构点2 |
重构前:Network、Channel、AMOP、EventPush等模块全部放在了Channel中,耦合严重,代码很乱。
重构工作:
-
RPC、AMOP等模块与Network模块完全解耦,Network模块通过Channel模块仅暴露两个接口给上层使用。
- 提供给上层Channel层两个接口:MsgHandler提供配置消息处理逻辑的接口;NetworkService提供发送消息的接口;
- 该模块依赖于Netty。

重构前: Channel模块会对所有消息进行处理,使得Channel模块跟别的模块耦合太严重,不便维护。

ConnectionCallback.java
public void onMessage(ChannelHandlerContext ctx, ByteBuf message) {
...
if (msg.getType() == ChannelMessageType.AMOP_REQUEST.getType()
|| msg.getType() == ChannelMessageType.AMOP_RESPONSE.getType()
|| msg.getType() == ChannelMessageType.AMOP_MULBROADCAST.getType()) {
ChannelMessage2 channelMessage = new ChannelMessage2(msg);
channelMessage.readExtra(message);
channelService.onReceiveChannelMessage2(ctx, channelMessage);
} else if (msg.getType() == ChannelMessageType.CHANNEL_RPC_REQUEST.getType()) {
BcosMessage fiscoMessage = new BcosMessage(msg);
fiscoMessage.readExtra(message);
channelService.onReceiveEthereumMessage(ctx, fiscoMessage);
} else if (msg.getType() == ChannelMessageType.CLIENT_HEARTBEAT.getType()) {
msg.readExtra(message);
channelService.onReceiveHeartbeat(ctx, msg);
} else if (msg.getType() == ChannelMessageType.CLIENT_HANDSHAKE.getType()) {
BcosMessage fiscoMessage = new BcosMessage(msg);
fiscoMessage.readExtra(message);
channelService.onReceiveEthereumMessage(ctx, fiscoMessage);
} else if (msg.getType() == ChannelMessageType.CLIENT_REGISTER_EVENT_LOG.getType()) {
ChannelMessage2 channelMessage = new ChannelMessage2(msg);
channelMessage.readExtra(message);
channelService.onReceiveRegisterEventResponse(ctx, channelMessage);
} else if (msg.getType() == ChannelMessageType.TRANSACTION_NOTIFY.getType()) {
BcosMessage fiscoMessage = new BcosMessage(msg);
fiscoMessage.readExtra(message);
channelService.onReceiveTransactionMessage(ctx, fiscoMessage);
}
...
}重构后:我们将Channel拆分, 业务消息交给业务模块自行处理。
主要代码:
public class MsgManager implements MsgHandler {
map<MsgType,MsgHandler> msgHandlers; // 消息处理逻辑MsgHandler列表
public void MsgManager(){...}
...
// 注册一个MsgHandler
public void addMsgHandler(MsgType type,MsgHandler handler){
msgHandlers.put(type,handler);
}
// 移除一个MsgHandler
public void removeMsgHandler(MsgType type){
msgHandlers.remove(type);
}
// 设置整个MsgHandler列表
public void setMsgHandlers(MsgHandler[] handlers){
this.msgHandlers = handlers;
}
// 当收到消息时,根据消息类型,调用相应的业务模块中的MsgHandler处理该消息。
public void onMessage(ChannelHandlerContext ctx, ByteBuf message){
MsgType type = getMsgType(message);
if (msgHandlers.get(type) != null){
//调用对应模块的msgHandler
msgHandlers.get(type).onMsg(ctx,message);
}
}
}初始化过程:
-
启动网络模块时,只初始化必要的消息处理逻辑。

关键代码:
class ChannelImp implements Channel{ private ChannelMsgHandler msgHandler; ... public void addConnectHandler(MsgHandler handler) { msgHandler.addConnectHandler(handler); } public void addEstablishHandler(MsgHandler handler) { msgHandler.addEstablishHandler(handler); } public void addMessageHandler(MsgType type, MsgHandler handler) { msgHandler.addMessageHandler(type, handler); } public void addDisconnectHandler(MsgHandler handler) { msgHandler.addDisconnectHandler(handler); } } public class ChannelMsgHandler implements MsgHandler { private List<MsgHandler> msgConnectHandlerList = new CopyOnWriteArrayList<MsgHandler>(); private List<MsgHandler> msgDisconnectHandleList = new CopyOnWriteArrayList<MsgHandler>(); private Map<Integer, MsgHandler> msgHandlers = new ConcurrentHashMap<>(); private List<MsgHandler> msgEstablishHandlerList = new CopyOnWriteArrayList<MsgHandler>(); ... public void addConnectHandler(MsgHandler handler) { msgConnectHandlerList.add(handler); } public void addEstablishHandler(MsgHandler handler) { msgEstablishHandlerList.add(handler); } public void addMessageHandler(MsgType type, MsgHandler handler) { msgHandlers.put(type.getType(), handler); } public void addDisconnectHandler(MsgHandler handler) { msgDisconnectHandleList.add(handler); } ... @Override public void onMessage(ChannelHandlerContext ctx, Message msg) { ... MsgHandler msgHandler = msgHandlers.get(msg.getType().intValue()); if (msgHandler != null) { msgHandler.onMessage(ctx, msg); } .... } }
-
启动其它模块时,初始化相关的消息处理逻辑。
当启动RPC模块、AMOP模块、EventPush等模块时,初始化对应的消息处理逻辑MsgHandler,并注册到MsgManager中。

关键代码:
class AMOP{ ... public void AMOP(Channel channel, ConfigOption config){ ... //注册AMOP模块对应的消息处理逻辑 channel.addMsgHandler(MsgType.AMOP_REQUEST,amopMsgHandler); channel.addMsgHandler(MsgType.AMOP_RESPONSE,amopMsgHandler); channel.addMsgHandler(MsgType.AMOP_MULBROADCAST,amopMsgHandler); ... } }
重构前:Channel模块自行解决编解码问题,到netty层是ByteBuf类型。

重构后:
- 三个子类差别不大,只用Message一种数据结构即可。
-
继承netty的Encoder和Decoder,在网络层完成编解码。

关键代码:
public void startConnect() throws SSLException { ... // 装配Handler流水线 bootstrap.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( sslHandler, // SSlHandler new LengthFieldBasedFrameDecoder( Integer.MAX_VALUE, 0, 4, -4, 0), //心跳检测,空闲超时则创建空闲超时事件并传递到channelPipeline中,我们这里会主动断开连接。 new IdleStateHandler( idleTimeout, //读空闲 idleTimeout, //写空闲 idleTimeout, //读写空闲 TimeUnit.MILLISECONDS),//时间单位
//此处加入编解码Handler
new MessageEncoder(),//在流水线中加入Decoder,实现ChannelOutboundHandler,处理发送的请求
new MessageDecoder(), //在流水线中加入Decoder,实现ChannelInboundHandler,处理收到的请求
new ChannelHandler());//处理业务逻辑的Handler,实现ChannelInboundHandler,处理收到的请求
}
}
}
#### 重构点5: 新增两种的消息传输方式
**重构前:支持两种交易传输方式,1. 随机地传给一个节点。2. 广播给所有节点**
**重构后:新增两种消息传输方式**
3. **传给一个指定节点。**
* Channel新增接口sendToPeerByID(Message,nodeId, responseCallBack)
4. **以自定义的某种规则选择要传输的节点。**
* Channel新增接口sendToPeerByRule(Message, PeerSelectRule, responseCallBack)
sendToPeerByRule接口实现关键代码:
```java
public Response sendToPeerByRuleWithTimeOut(Message out, PeerSelectRule rule, Options options) {
Callback callback = new Callback();
asyncSendToPeerByRule(out, rule, callback, options);
waitResponse(callback, options);
return callback.retResponse;
}
public void asyncSendToPeerByRule(
Message out, PeerSelectRule rule, ResponseCallback callback, Options options) {
String target = rule.select(getConnectionInfo());
asyncSendToPeer(out, target, callback, options);
}
自定义规则,如选择有块高最大的节点:
Class MySelectRule implements PeerSelectRule{
public String select(List<ConnectionInfo> allConnInfo){
ConnectInfo heighest; //块高最大的节点
for (ConnectInfo connInfo: allConnInfo){
if(heighest == null){
heighest = connInfo;
continue;
}
if (allConnInfo.blockHeight > heighest.blockHeight){
heighest = allConnInfo
}
}
return heighest.nodeId
}
}在Network的实现中,新增stop方法。
class NetworkImp implements Network{
private ConnectionConfig allConnections;
...
public void stop(){
for (ChannelConnection channelConnection:allConnections.getAllChannelConnections()){
channelConnection.stopListen();
}
}
}通过调用EventLoopGroup的shutdownGracefully方法,让Netty优雅退出。Netty优雅退出会做三件事情:
- 把NIO线程的状态位设置成ST_SHUTTING_DOWN状态,不再处理新的消息(不允许再对外发送消息);
- 退出前的预处理操作:把发送队列中尚未发送或者正在发送的消息发送完、把已经到期或者在退出超时之前到期的定时任务执行完成、把用户注册到NIO线程的退出Hook任务执行完成;
- 资源的释放操作:所有Channel的释放、多路复用器的去注册和关闭、所有队列和定时任务的清空取消,最后是NIO线程的退出。
class ChannelConnection{
private EventLoopGroup bossGroup; //负责处理TCP/IP连接
private EventLoopGroup workerGroup;//负责处理Channel的I/O事件
...
public void stopListen(){
if (running) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
running = false;
return;
}
}
}模块主要通过两个接口对外暴露功能:
- 处理消息:MsgHandler接口
public interface MsgHandler{
// 收到消息时处理逻辑
public void onMessage(ChannelHandlerContext chx, Message msg);
// SSL握手成功的处理逻辑
public void onConnect(ChannelHandlerContext chx);
// 段连时的处理逻辑
public void onDisconnect(ChannelHandlerContext chx);
}2. 发送消息:Network接口
public interface Network{
// 随机发送给几个节点。Message,要发送的消息; numOfPeer,要发送的节点数量,ResponseCallback得到返回值后调用的回调函数
public void sendToRandomPeers(Message out, int numOfPeer, ResponseCallback responseCallback);
// 发送给指定节点。NodeId,要发送的节点ID
public void sendToPeerById(Message out, String peerIpPort, ResponseCallback responseCallback);
// 广播给所有连接的节点
public void broadcast(Message out,ResponseCallback responseCallback);
// 根据某种规则发给指定的一个节点
public void sendToPeerWithRule(Message out,PeerSelectRule rule,ResponseCallback responseCallback);
// 获取当前的连接信息
public List<ConnectionInfo> getAllConnectionInfo();
}此外还有一个接口,和一个类在处理消息和发送消息中需要用到:
- 自定义节点选择规则:PeerSelectRule
public interface PeerSelectRule{
// 节点选择逻辑
public String select(List<ConnectionInfo> allConnInfo);
}- 自定义节点请求响应的回调:ResponseCallBack
public interface ResponseCallBack{
private Timeout timeout;
public abstract void onResponse(Response response);
public void onTimeout() {
Response response = new Response();
response.setErrorCode(ChannelMessageError.MESSAGE_TIMEOUT.getError());
response.setErrorMessage("Processing bcos message timeout");
response.setContent("");
onResponse(response);
}
public Timeout getTimeout() { return timeout; }
public void setTimeout(Timeout timeout) { this.timeout = timeout;}
}