2.3.2 netty针对于JT808的消息处理器
package cn.hylexus.jt808.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.hylexus.jt808.server.SessionManager;
import cn.hylexus.jt808.service.codec.MsgDecoder;
import cn.hylexus.jt808.vo.PackageData;
import cn.hylexus.jt808.vo.Session;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
public class TCPServerHandler extends ChannelInboundHandlerAdapter { // (1)
private final Logger logger = LoggerFactory.getLogger(getClass());
// 一个维护客户端连接的类
private final SessionManager sessionManager;
private MsgDecoder decoder = new MsgDecoder();
public TCPServerHandler() {
this.sessionManager = SessionManager.getInstance();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // (2)
try {
ByteBuf buf = (ByteBuf) msg;
if (buf.readableBytes() <= 0) {
// ReferenceCountUtil.safeRelease(msg);
return;
}
byte[] bs = new byte[buf.readableBytes()];
buf.readBytes(bs);
PackageData jt808Msg = this.decoder.queueElement2PackageData(bs);
// 处理客户端消息
this.processClientMsg(jt808Msg);
} finally {
release(msg);
}
}
private void processClientMsg(PackageData jt808Msg) {
// TODO 更加消息ID的不同,分别实现自己的业务逻辑
if (jt808Msg.getMsgHeader().getMsgId() == 0x900) {
// TODO ...
} else if (jt808Msg.getMsgHeader().getMsgId() == 0x9001) {
// TODO ...
}
// else if(){}
// else if(){}
// else if(){}
// else if(){}
// ...
else {
logger.error("位置消息,消息ID={}", jt808Msg.getMsgHeader().getMsgId());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
logger.error("发生异常:{}", cause.getMessage());
cause.printStackTrace();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Session session = Session.buildSession(ctx.channel());
sessionManager.put(session.getId(), session);
logger.debug("终端连接:{}", session);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
final String sessionId = ctx.channel().id().asLongText();
Session session = sessionManager.findBySessionId(sessionId);
this.sessionManager.removeBySessionId(sessionId);
logger.debug("终端断开连接:{}", session);
ctx.channel().close();
// ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));
logger.error("服务器主动断开连接:{}", session);
ctx.close();
}
}
}
private void release(Object msg) {
try {
ReferenceCountUtil.release(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}










