java解析JT808协议的实现代码

2020-03-08 10:01:02于丽

2.3 和netty结合

2.3.1 netty处理器链

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.kkbc.tpms.tcp.service.TCPServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;

public class TCPServer2 {

  private Logger log = LoggerFactory.getLogger(getClass());
  private volatile boolean isRunning = false;

  private EventLoopGroup bossGroup = null;
  private EventLoopGroup workerGroup = null;
  private int port;

  public TCPServer2() {
  }

  public TCPServer2(int port) {
    this();
    this.port = port;
  }

  private void bind() throws Exception {
    this.bossGroup = new NioEventLoopGroup();
    this.workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)//
        .channel(NioServerSocketChannel.class) //
        .childHandler(new ChannelInitializer<SocketChannel>() { //
          @Override
          public void initChannel(SocketChannel ch) throws Exception {
            //超过15分钟未收到客户端消息则自动断开客户端连接
            ch.pipeline().addLast("idleStateHandler",
                new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
            //ch.pipeline().addLast(new Decoder4LoggingOnly());
            // 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
            ch.pipeline().addLast(
                new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
                    Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e })));
            ch.pipeline().addLast(new TCPServerHandler());
          }
        }).option(ChannelOption.SO_BACKLOG, 128) //
        .childOption(ChannelOption.SO_KEEPALIVE, true);

    this.log.info("TCP服务启动完毕,port={}", this.port);
    ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

    channelFuture.channel().closeFuture().sync();
  }

  public synchronized void startServer() {
    if (this.isRunning) {
      throw new IllegalStateException(this.getName() + " is already started .");
    }
    this.isRunning = true;

    new Thread(() -> {
      try {
        this.bind();
      } catch (Exception e) {
        this.log.info("TCP服务启动出错:{}", e.getMessage());
        e.printStackTrace();
      }
    }, this.getName()).start();
  }

  public synchronized void stopServer() {
    if (!this.isRunning) {
      throw new IllegalStateException(this.getName() + " is not yet started .");
    }
    this.isRunning = false;

    try {
      Future<?> future = this.workerGroup.shutdownGracefully().await();
      if (!future.isSuccess()) {
        log.error("workerGroup 无法正常停止:{}", future.cause());
      }

      future = this.bossGroup.shutdownGracefully().await();
      if (!future.isSuccess()) {
        log.error("bossGroup 无法正常停止:{}", future.cause());
      }
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    this.log.info("TCP服务已经停止...");
  }

  private String getName() {
    return "TCP-Server";
  }

  public static void main(String[] args) throws Exception {
    TCPServer2 server = new TCPServer2(20048);
    server.startServer();

    // Thread.sleep(3000);
    // server.stopServer();
  }

}