跳至主要內容

消息处理器

hylexus约 634 字大约 2 分钟

消息处理器

MsgHandler 负责处理经过 RequestMsgBodyConverter 转换之后的 请求体消息实体类

对于请求消息的处理完全是由 MsgHandler 接口来实现的。

public interface MsgHandler<T extends RequestMsgBody> extends OrderedComponent {

    default Set<MsgType> getSupportedMsgTypes() {
        return Sets.newHashSet();
    }

    void handleMsg(RequestMsgMetadata metadata, T body, Session session) throws IOException, InterruptedException;

}

MsgHandler

  • 自定义的消息处理器 必须 实现 MsgHandler 这个泛型接口
  • 当然,这种实现接口并手动注册的方式显得非常繁琐,你完全可以 参考这里 使用 基于注解 的方式来实现 MsgHandler 的功能。

消息处理完成后对客户端的响应也是一个 byte[] ,可以通过 handleMsg() 方法的 Session 参数中拿到 NettyChannel,然后通过 Channel 发送给客户端。

手动实现并注册

实现

  • 所以一般的消息处理步骤应该像下面这样:
@Slf4j
public class SampleMsgHandler implements MsgHandler<AuthRequestMsgBody> {

    byte SUCCESS = 0;
    byte AUTH_CODE_ERROR = 2;

    @Override
    public Set<MsgType> getSupportedMsgTypes() {
        return Collections.singleton(BuiltinJt808MsgType.CLIENT_AUTH);
    }

    @Override
    public void handleMsg(RequestMsgMetadata metadata, AuthRequestMsgBody body, Session session) throws IOException, InterruptedException {
        final String authCode = body.getAuthCode();

        // 鉴权逻辑
        final byte result = isValidAuthCode(authCode, session.getTerminalId())
                ? SUCCESS
                : AUTH_CODE_ERROR;

        // 组装响应消息的字节数组(别忘了转义)
        final byte[] respMsgBody = this.encodeMsgBody(result, metadata, session);

        // 发送给客户端
        this.send2Client(session.getChannel(), respMsgBody);
    }

    private byte[] encodeMsgBody(byte result, RequestMsgMetadata metadata, Session session) {
        // ...
        // 按文档格式组装字节数组
        return new byte[0];
    }

    private boolean isValidAuthCode(String authCode, String terminalId) {
        // ...
        // 具体鉴权逻辑
        return true;
    }

    protected void send2Client(Channel channel, byte[] bytes) throws InterruptedException {
        final ChannelFuture future = channel.writeAndFlush(Unpooled.copiedBuffer(bytes)).sync();
        if (!future.isSuccess()) {
            log.error("ERROR : 'send data to client:'", future.cause());
        }
    }
}
  • 又或者像这样
    • 继承 AbstractMsgHandler
    • 这里的 RespMsgBody.toBytes() 不用转义
@Slf4j
@BuiltinComponent
public class AuthMsgHandler extends AbstractMsgHandler<AuthRequestMsgBody> {

    @Override
    public Set<MsgType> getSupportedMsgTypes() {
        return newHashSet(BuiltinJt808MsgType.CLIENT_AUTH);
    }

    @Override
    protected Optional<RespMsgBody> doProcess(RequestMsgMetadata metadata, AuthRequestMsgBody body, Session session) {
        log.debug("receive AuthMsg : {}", body);
        boolean valid = authCodeValidator.validateAuthCode(session, metadata, body);
        if (valid) {
            return of(commonSuccessReply(metadata, BuiltinJt808MsgType.CLIENT_AUTH));
        }
        return of(CommonReplyMsgBody.of(AUTH_CODE_ERROR, metadata.getHeader().getFlowId(), BuiltinJt808MsgType.CLIENT_AUTH));
    }
}
  • 以下为示例性的处理位置消息的 MsgHandler
@Slf4j
public class LocationInfoUploadMsgHandler extends AbstractMsgHandler<LocationUploadMsgBody> {

    @Override
    protected Optional<RespMsgBody> doProcess(RequestMsgMetadata metadata, LocationUploadMsgBody body, Session session) {

        log.info("{}", body);
        return Optional.of(commonSuccessReply(metadata, BuiltinJt808MsgType.CLIENT_LOCATION_INFO_UPLOAD));
    }
}

注册自定义MsgHandler

public class Jt808Config extends Jt808ServerConfigurationSupport { 
    @Override
    public void configureMsgHandlerMapping(MsgHandlerMapping mapping) {
        super.configureMsgHandlerMapping(mapping);
        // 如果你在这里注册了自定义的鉴权消息处理器,那么AuthCodeValidator也无需提供了
        // 此处也可以从Spring容器中获取bean来注入,不一定要手动new一个Handler注册
        mapping.registerConverter(Jt808MsgType.CLIENT_LOCATION_INFO_UPLOAD, new LocationInfoUploadMsgHandler());
    }
}

基于注解实现

传送门

基于注解来实现MsgHandler的功能 请移步这里


传送门

本小节示例可以在 samples/jt-808-server-sample-customizedopen in new window 找到相关代码。