java解析JT808协议的实现代码

2020-03-08 10:01:02于丽

2.3.2 netty针对于JT808的消息处理器

package cn.hylexus.jt808.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.hylexus.jt808.server.SessionManager;
import cn.hylexus.jt808.service.codec.MsgDecoder;
import cn.hylexus.jt808.vo.PackageData;
import cn.hylexus.jt808.vo.Session;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;

public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)

  private final Logger logger = LoggerFactory.getLogger(getClass());

  // 一个维护客户端连接的类
  private final SessionManager sessionManager;
  private MsgDecoder decoder = new MsgDecoder();

  public TCPServerHandler() {
    this.sessionManager = SessionManager.getInstance();
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
    try {
      ByteBuf buf = (ByteBuf) msg;
      if (buf.readableBytes() <= 0) {
        // ReferenceCountUtil.safeRelease(msg);
        return;
      }

      byte[] bs = new byte[buf.readableBytes()];
      buf.readBytes(bs);

      PackageData jt808Msg = this.decoder.queueElement2PackageData(bs);
      // 处理客户端消息
      this.processClientMsg(jt808Msg);
    } finally {
      release(msg);
    }
  }

  private void processClientMsg(PackageData jt808Msg) {
    // TODO 更加消息ID的不同,分别实现自己的业务逻辑
    if (jt808Msg.getMsgHeader().getMsgId() == 0x900) {
      // TODO ...
    } else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) {
      // TODO ...
    }
    // else if(){}
    // else if(){}
    // else if(){}
    // else if(){}
    // ...
    else {
      logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId());
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
    logger.error("发生异常:{}", cause.getMessage());
    cause.printStackTrace();
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Session session = Session.buildSession(ctx.channel());
    sessionManager.put(session.getId(), session);
    logger.debug("终端连接:{}", session);
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    final String sessionId = ctx.channel().id().asLongText();
    Session session = sessionManager.findBySessionId(sessionId);
    this.sessionManager.removeBySessionId(sessionId);
    logger.debug("终端断开连接:{}", session);
    ctx.channel().close();
    // ctx.close();
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
      IdleStateEvent event = (IdleStateEvent) evt;
      if (event.state() == IdleState.READER_IDLE) {
        Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
        logger.error("服务器主动断开连接:{}", session);
        ctx.close();
      }
    }
  }

  private void release(Object msg) {
    try {
      ReferenceCountUtil.release(msg);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}