JAVA-000

返回主页

netty 初级

对应极客时间的课程 《Netty源码剖析与实战》第四章,练习程序的第一版。

PlantUML

本示例使用 PlantUML 生成图片,请参照 PlantUML Plugin Installation 在 IntelliJ IDEA 上安装相关插件。

Message Format

Client 和 Server 交流所使用的 Message 格式如下:

Message Format

Message Class Diagram

Message 的类图如下:

Message Family Class Diagram

Server Child Handler Pipeline

Server 的 childHandler pipeline 如下:

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new OrderFrameDecoder());
        pipeline.addLast(new OrderFrameEncoder());

        pipeline.addLast(new OrderProtocolDecoder());
        pipeline.addLast(new OrderProtocolEncoder());

        pipeline.addLast(new LoggingHandler(LogLevel.INFO));

        pipeline.addLast(new OrderServerProcessHandler());
    }
});

Server Child Handler Pipeline - Class Diagram

Server 的 childHandler 的类图如下:

:package 不正确,下图只是为了比较形象的区分Inbound、Outbound、Duplex和Message。

Server Child Pipeline Class Diagram

Server Child Handler Pipeline - Sequence Diagram

Server解析request的时序图如下(:这只是粗略步骤,用于解释逻辑):

Interpret Request Sequence Diagarm

Pipeline Order

根据上面的pipeline类图和时序图,可以得出pipeline的排序原则,此原则同时适用于client和server:

Pipeline Order

Client

下面详细介绍下三个Client的区别。

ClientV0

ClientV0比较简单粗糙,它可以发送RequestMessage给Server,但是不能接收Server返回的ResponseMessage

ClientV0使用如下pipeline:

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new OrderFrameDecoder());
        pipeline.addLast(new OrderFrameEncoder());

        pipeline.addLast(new OrderProtocolDecoder());
        pipeline.addLast(new OrderProtocolEncoder());

        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
    }
});

发送 RequestMessage至Server端。

RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
channelFuture.channel().writeAndFlush(requestMessage);

ClientV1

ClientV1它可以发送RequestMessage给Server,但是它仍然不能接收Server返回的ResponseMessage

ClientV1使用如下pipeline,相对ClientV0,它多了一个OperationToRequestMessageEncoder:

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new OrderFrameDecoder());
        pipeline.addLast(new OrderFrameEncoder());

        pipeline.addLast(new OrderProtocolEncoder());
        pipeline.addLast(new OrderProtocolDecoder());

        pipeline.addLast(new OperationToRequestMessageEncoder());

        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
    }
});

利用OperationToRequestMessageEncoder,可以发送OrderOperation至Server端。

OrderOperation orderOperation = new OrderOperation(1001, "tudou");
channelFuture.channel().writeAndFlush(orderOperation);

注意:其实ClientV1也支持发送RequestMessage至Server端。

RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
channelFuture.channel().writeAndFlush(requestMessage);

OperationToRequestMessageEncoder 继承自 **MessageToMessageEncoder**,**MessageToMessageEncoder**的**write**方法中,会通过**acceptOutboundMessage**检查msg是否为**Operation**:

if (this.acceptOutboundMessage(msg)) {
    out = CodecOutputList.newInstance();
    Object cast = msg;

    try {
        this.encode(ctx, cast, out);
    } finally {
        ReferenceCountUtil.release(msg);
    }

    if (out.isEmpty()) {
        throw new EncoderException(StringUtil.simpleClassName(this) + " must produce at least one message.");
    }

    var29 = false;
} else {
    ctx.write(msg, promise);
    var29 = false;
}

ClientV2

通过io.netty.example.study.client.handler.dispatcher package的如下三个class,ClientV2实现了响应分发,可以接收Server返回的ResponseMessage

ClientV2使用如下pipeline,相对ClientV1,它多了一个ResponseDispatcherHandler:

bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new OrderFrameDecoder());
        pipeline.addLast(new OrderFrameEncoder());

        pipeline.addLast(new OrderProtocolEncoder());
        pipeline.addLast(new OrderProtocolDecoder());

        pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));

        pipeline.addLast(new OperationToRequestMessageEncoder());

        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
    }
});

ClientV2发送RequestMessage给Server端:

long streamId = IdUtil.nextId();
RequestMessage requestMessage = new RequestMessage(streamId, new OrderOperation(1001, "tudou"));
OperationResultFuture operationResultFuture = new OperationResultFuture();

requestPendingCenter.add(streamId, operationResultFuture);
channelFuture.channel().writeAndFlush(requestMessage);

requestPendingCenter同时记录了streamId

public class RequestPendingCenter {
    private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();

    public void add(Long streamId, OperationResultFuture future) {
        this.map.put(streamId, future);
    }
}

然后,ClientV2可以接收到Server返回的OperationResult

OperationResult operationResult = operationResultFuture.get();

requestPendingCenter会通过streamId去map中找到相对应的operationResult,并且返回给正确的ClientV2 (可以跑多个ClientV2进行测试):

public class RequestPendingCenter {
    private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();

    public void set(Long streamId, OperationResult operationResult) {
        OperationResultFuture operationResultFuture = this.map.get(streamId);
        if (operationResultFuture != null) {
            operationResultFuture.setSuccess(operationResult);
            this.map.remove(streamId);
        }
    }
}