SpringBootdisruptor高性能队列使用

2023-02-03 09:47:52
目录
1、Disruptor简介2、Disruptor概念3、springboot+disruptor实例4、小结

Disruptor是一个高性能队列,常见的还有kafka、rabbitmq等,下面体验一下~

1、Disruptor简介

Disruptor>

其特点简单总结如下:

    开源的java框架,用于生产者-消费者场景;高吞吐量和低延迟;有界队列;

    disruptor在github网址为:https://github.com/LMAX-Exchange/disruptor

    2、Disruptor概念

      Ring>Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理;Sequencer:Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法;Sequence Barrier:用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用;Wait Strategy:定义 Consumer 如何进行等待下一个事件的策略;Event:在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定;EventProcessor:EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop);EventHandler:定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现;Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型;

      3、springboot+disruptor实例

      在pom.xml文件中添加依赖

      		<dependency>
                  <groupId>com.lmax</groupId>
                  <artifactId>disruptor</artifactId>
                  <version>3.3.4</version>
              </dependency>
      

      消息体Model

      @Data
      public class MessageModel {
          private String message;
      }
      

      构造EventFactory

      public class HelloEventFactory implements EventFactory<MessageModel> {
          @Override
          public MessageModel newInstance() {
              return new MessageModel();
          }
      }
      

      构造消费者

      @Slf4j
      public class HelloEventHandler implements EventHandler<MessageModel> {
          @Override
          public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
              try {
                  //这里停止1000ms是为了确定消费消息是异步的
                  Thread.sleep(1000);
                  log.info("消费者处理消息开始");
                  if (event != null) {
                      log.info("消费者消费的信息是:{}",event);
                  }
              } catch (Exception e) {
                  log.info("消费者处理消息失败");
              }
              log.info("消费者处理消息结束");
          }
      }
      

      构造MQManager

      @Configuration
      public class MqManager {
          @Bean("messageModel")
          public RingBuffer<MessageModel> messageModelRingBuffer() {
              //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
              ExecutorService executor = Executors.newFixedThreadPool(2);
              //指定事件工厂
              HelloEventFactory factory = new HelloEventFactory();
              //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
              int bufferSize = 1024 * 256;
              //单线程模式,获取额外的性能
              Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
              //设置事件业务处理器---消费者
              disruptor.handleEventsWith(new HelloEventHandler());
              //启动disruptor线程
              disruptor.start();
              //获取ringbuffer环,用于接取生产者生产的事件
              RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
              return ringBuffer;
          }
      }
      

      构造生产者

      @Configuration
      public class MqManager {
          @Bean("messageModel")
          public RingBuffer<MessageModel> messageModelRingBuffer() {
              //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
              ExecutorService executor = Executors.newFixedThreadPool(2);
              //指定事件工厂
              HelloEventFactory factory = new HelloEventFactory();
              //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
              int bufferSize = 1024 * 256;
              //单线程模式,获取额外的性能
              Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
              //设置事件业务处理器---消费者
              disruptor.handleEventsWith(new HelloEventHandler());
              //启动disruptor线程
              disruptor.start();
              //获取ringbuffer环,用于接取生产者生产的事件
              RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
              return ringBuffer;
          }
      }
      

      测试

      	/**
           * 项目内部使用Disruptor做消息队列
           * @throws Exception
           */
          @Test
          public void sayHelloMqTest() throws Exception{
              helloEventProducer.sayHelloMq("Hello world!");
              log.info("消息队列已发送完毕");
              //这里停止2000ms是为了确定是处理消息是异步的
              Thread.sleep(2000);
          }
      

      运行结果如下

      4、小结

      引用disruptor作为内部的高性能队列,应用于生产者-消费者模式中还是非常nice的,后面若有工程需求可以尝试一下。

      到此这篇关于SpringBoot>