您现在的位置是:首页 > 正文

springboot+disruptor再体验

2024-04-01 00:48:05阅读 0

Disruptor是一个高性能队列,常见的还有kafka、rabbitmq等,下面体验一下~

1、Disruptor简介
Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。

其特点简单总结如下:

  • 开源的java框架,用于生产者-消费者场景;
  • 高吞吐量和低延迟;
  • 有界队列;

disruptor在github网址为:https://github.com/LMAX-Exchange/disruptor
在这里插入图片描述
2、Disruptor概念

  • Ring Buffer:环形的缓冲区,环形数组中的元素采用覆盖方式,避免了jvm的GC;
  • Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理;
  • Sequencer:Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法;
  • Sequence Barrier:用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用;
  • Wait Strategy:定义 Consumer 如何进行等待下一个事件的策略;
  • Event:在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定;
  • EventProcessor:EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop);
  • EventHandler:定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现;
  • Producer:生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型;

3、springboot+disruptor实例

在pom.xml文件中添加依赖

		<dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.3.4</version>
        </dependency>

消息体Model

@Data
public class MessageModel {
    private String message;
}

构造EventFactory

public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override
    public MessageModel newInstance() {
        return new MessageModel();
    }
}

构造消费者

@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
        try {
            //这里停止1000ms是为了确定消费消息是异步的
            Thread.sleep(1000);
            log.info("消费者处理消息开始");
            if (event != null) {
                log.info("消费者消费的信息是:{}",event);
            }
        } catch (Exception e) {
            log.info("消费者处理消息失败");
        }
        log.info("消费者处理消息结束");
    }
}

构造MQManager

@Configuration
public class MqManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //指定事件工厂
        HelloEventFactory factory = new HelloEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new HelloEventHandler());

        //启动disruptor线程
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }

}

构造生产者

@Slf4j
@Component
public class HelloEventProducer {

    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;

    public void sayHelloMq(String message) {
        log.info("生产消息: {}",message);
        //获取下一个Event槽的下标
        long sequence = messageModelRingBuffer.next();
        try {
            //给Event填充数据
            MessageModel event = messageModelRingBuffer.get(sequence);
            event.setMessage(message);
            log.info("往消息队列中添加消息:{}", event);
        } catch (Exception e) {
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
        } finally {
            //发布Event,激活观察者去消费,将sequence传递给改消费者
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
            messageModelRingBuffer.publish(sequence);
        }
    }
}

测试

	/**
     * 项目内部使用Disruptor做消息队列
     * @throws Exception
     */
    @Test
    public void sayHelloMqTest() throws Exception{
        helloEventProducer.sayHelloMq("Hello world!");
        log.info("消息队列已发送完毕");
        //这里停止2000ms是为了确定是处理消息是异步的
        Thread.sleep(2000);
    }

运行结果如下
在这里插入图片描述
4、小结
引用disruptor作为内部的高性能队列,应用于生产者-消费者模式中还是非常nice的,后面若有工程需求可以尝试一下。

网站文章

  • mysql max_allowed_packet查询和修改

    mysql max_allowed_packet查询和修改

    2)也可以直接创建 /etc/my.cnf, 或者从你安装的mysql的相关目录中(可能是/usr/include/mysql或/usr/share/mysql)找一个my.cnf 或 my-smal...

    2024-04-01 00:48:01
  • Dialog:手动输入信息

    Dialog:手动输入信息

    模仿"ofo"手动输入自行车ID,界面可能丑了了点,大致功能:1.如果客户输入的id号超过12位提示用户并且不可以再输入2.当字符串为空时,”x“消失,客户点击”x“能清空editText里面的内容3...

    2024-04-01 00:47:54
  • 解决pip安装时出现SSLError的问题

    错误代码:C:\Python27\lib\site-packages\pip\_vendor\urllib3\util\ssl_.py:150: InsecurePlatformWarning: A true SSLContext object is not available. This prevents urllib3 from configuring SSL appropriately...

    2024-04-01 00:47:22
  • 晶体管放大电路与Multisim仿真学习笔记

    晶体管放大电路与Multisim仿真学习笔记

    共射极放大电路的设计与Multisim仿真。主要讲参数的计算

    2024-04-01 00:47:12
  • 【设计模式】详解观察者模式

    【设计模式】详解观察者模式

    观察者模式是一种行为型设计模式,它定义了一种一对多的依赖关系,当一个对象的状态发生改变时,它的所有依赖者都会收到通知并自动更新。(MQ和它有点像)当对象间存在一对多关系时,则使用观察者模式(Obser...

    2024-04-01 00:47:05
  • ap计算机科学 容错率,2020年 AP CSA(计算机科学)考试真题考点分析及答题思路讲解-2020年AP CSA考情回顾...

    ap计算机科学 容错率,2020年 AP CSA(计算机科学)考试真题考点分析及答题思路讲解-2020年AP CSA考情回顾...

    嗨同学们大家好呀,在北京时间5.16号凌晨四点开考CSA,亲爱的TD小伙伴们,你们考的怎么样?由于为了防止有考生作弊,CB这次也是拼尽全力,准备了多套试卷,如果大家遇到了本篇推送中没有cover到的题...

    2024-04-01 00:46:40
  • -XX:NewSize=20m -XX:MaxNewSize=40m,-Xmn30m,-XX:NewRatio=5

    -XX:NewSize=20m -XX:MaxNewSize=40m,-Xmn30m,-XX:NewRatio=5

    【代码】-XX:NewSize=20m -XX:MaxNewSize=40m,-Xmn30m,-XX:NewRatio=5。

    2024-04-01 00:46:33
  • 年薪30万才能算码农,你顶多就是码畜...

    年薪30万才能算码农,你顶多就是码畜...

    图灵、香农、冯诺依曼等人。:AT&T 贝尔实验室里那几个开创了计算机世界的研究员,Thompson、里奇、伯纳斯李、Bjarne Stroustrup 等人。:仙童公司八叛逆天才 ( 罗伯特 · 诺伊...

    2024-04-01 00:46:25
  • NoSQL Manager for MongoDB 教程(基础篇)

    NoSQL Manager for MongoDB 教程(基础篇)

    前段时间,学习了一下mongodb,在客户端工具方面,个人认为NoSQL Manager for MongoDB 是体验比较好的一个,功能也较齐全。可惜在找教程的时候,发现很难找到比较详细的教程,也没...

    2024-04-01 00:46:00
  • 金蝶s-HR远程调试拒绝

    金蝶s-HR远程调试拒绝

    该配置的都配置了,jar包也引入无报错了。元数据也发布了。端口也改成了其他未占用的端口

    2024-04-01 00:45:54