对应极客时间的课程 《Netty源码剖析与实战》第四章,练习程序的第一版。
本示例使用 PlantUML 生成图片,请参照 PlantUML Plugin Installation 在 IntelliJ IDEA 上安装相关插件。
Client 和 Server 交流所使用的 Message 格式如下:
Message 的类图如下:
Server 的 childHandler pipeline 如下:
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());
}
});
Server 的 childHandler 的类图如下:
注:package 不正确,下图只是为了比较形象的区分Inbound、Outbound、Duplex和Message。
Server解析request的时序图如下(注:这只是粗略步骤,用于解释逻辑):
根据上面的pipeline类图和时序图,可以得出pipeline的排序原则,此原则同时适用于client和server:
下面详细介绍下三个Client的区别。
ClientV0比较简单粗糙,它可以发送RequestMessage给Server,但是不能接收Server返回的ResponseMessage。
ClientV0使用如下pipeline:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
发送 RequestMessage至Server端。
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
channelFuture.channel().writeAndFlush(requestMessage);
ClientV1它可以发送RequestMessage给Server,但是它仍然不能接收Server返回的ResponseMessage。
ClientV1使用如下pipeline,相对ClientV0,它多了一个OperationToRequestMessageEncoder:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OperationToRequestMessageEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
利用OperationToRequestMessageEncoder,可以发送OrderOperation至Server端。
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
channelFuture.channel().writeAndFlush(orderOperation);
注意:其实ClientV1也支持发送RequestMessage至Server端。
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
channelFuture.channel().writeAndFlush(requestMessage);
OperationToRequestMessageEncoder 继承自 **MessageToMessageEncoder
if (this.acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
Object cast = msg;
try {
this.encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(msg);
}
if (out.isEmpty()) {
throw new EncoderException(StringUtil.simpleClassName(this) + " must produce at least one message.");
}
var29 = false;
} else {
ctx.write(msg, promise);
var29 = false;
}
通过io.netty.example.study.client.handler.dispatcher package的如下三个class,ClientV2实现了响应分发,可以接收Server返回的ResponseMessage。
ClientV2使用如下pipeline,相对ClientV1,它多了一个ResponseDispatcherHandler:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast(new OperationToRequestMessageEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
ClientV2发送RequestMessage给Server端:
long streamId = IdUtil.nextId();
RequestMessage requestMessage = new RequestMessage(streamId, new OrderOperation(1001, "tudou"));
OperationResultFuture operationResultFuture = new OperationResultFuture();
requestPendingCenter.add(streamId, operationResultFuture);
channelFuture.channel().writeAndFlush(requestMessage);
requestPendingCenter同时记录了streamId:
public class RequestPendingCenter {
private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();
public void add(Long streamId, OperationResultFuture future) {
this.map.put(streamId, future);
}
}
然后,ClientV2可以接收到Server返回的OperationResult:
OperationResult operationResult = operationResultFuture.get();
requestPendingCenter会通过streamId去map中找到相对应的operationResult,并且返回给正确的ClientV2 (可以跑多个ClientV2进行测试):
public class RequestPendingCenter {
private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();
public void set(Long streamId, OperationResult operationResult) {
OperationResultFuture operationResultFuture = this.map.get(streamId);
if (operationResultFuture != null) {
operationResultFuture.setSuccess(operationResult);
this.map.remove(streamId);
}
}
}