使用redis实现消息队列_redis作为消息队列

使用redis实现消息队列_redis作为消息队列  众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,Z…

手把手教你用redis实现一个简单的mq消息队列(java)

  众所周知,消息队列是应用系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ.

  但是如果你不想为你的系统引入一个重量级(相对 redis 来说)的 mq,但是想要享受解耦、异步消息等特性,通过本文你就 get 到了,通过 redis 实现一个简单版的 mq。

  为什么是 redis

  redis 通常作为缓存服务引入,因此大部分系统都会有 redis

  redis 本身的资源消耗是极小的,符合我们的轻量要求

  redis 速度很快,几乎不会出现速度瓶颈

  redis 有持久化方案,调整配置项可以在数据安全和速度间进行取舍(参考这篇)[https://segmentfault.com/a/1190000002906345]

  如何实现

  利用 redis 的队列结构来实现消息队列。redis 单个队列最多支持 2*32-1 条数据,对于大部分应用是完全够用的。

  简单来说就是:

  每个 topic 对应一条队列

  从队列一段写入数据,从另一端读取数据

  消费失败,重新将消息放入队列

  注意:代码仅供个人尝鲜使用,请勿用于真实生产环境

  代码仅可在 springboot 环境中使用

  首先定义注解和接口类

  注解代码如下:

  @Target(ElementType.TYPE)

  @Retention(RetentionPolicy.RUNTIME)

  @Component

  public @interface MqConsumer {

  /**

  * 队列主题

  */

  String topic() default “default_es_topic”;

  }

  被该注解修饰的类,将会接收 topic 下的消息。

  接口代码如下:

  public interface RedisConsumer {

  /**

  * 功能描述: 消费方法,消费者类必须继承此方法

  *

  * @param message 数据载体

  * @author 123

  * @date 2020/3/28 22:41

  */

  void deal(String message);

  }

  本接口用于定于接受消息的处理方法。

  扫描注解修饰类

  本部分为核心代码,首先需要获取代码中被注解修饰的类,然后建立一个循环从 redis 队列中取数据,最后调用类对象的 deal 方法消费消息,如果 deal 方法抛出错误,认为消费失败,重新将该数据放入队列中。

  扫描部分代码如下:

  /**

  * MqConfiguration.java

  */

  @Override

  public void run(ApplicationArguments args) {

  Map map = context.getBeansWithAnnotation(MqConsumer.class);

  map.values().forEach(item -> {

  if (!(item instanceof RedisConsumer)) {

  log.warn(“注意检测到被@EsConsumer注解的类{}未实现RedisConsumer接口”, item.getClass().getCanonicalName());

  return;

  }

  MqConsumer[] annotations = item.getClass().getAnnotationsByType(MqConsumer.class);

  MqConsumer annotation = annotations[0];

  String topic = annotation.topic();

  if (topicMap.containsKey(topic)) {

  log.error(“多个消费者{},消费同一个消息:{},已忽略”, item.getClass().getCanonicalName(), topic);

  } else {

  topicMap.put(topic, (RedisConsumer) item);

  }

  });

  log.info(“redis订阅信息汇总完毕!!!!!!”);

  //由一个线程始终循环获取es队列数据

  threadPoolExecutor.execute(loop());

  }

  run 方法在 spring 扫描完毕后调用,通过实现ApplicationRunner接口实现,通过 spring 的方法来获取所有被MqConsumer接口注解的类(否则需要自己写类加载器)。数据汇总完毕后使用一个线程来进行无线循环从 redis 队列中取数据。

  执行线程部分代码如下:

  private Runnable loop() {

  return () -> {

  while (true) {

  AtomicInteger count = new AtomicInteger(0);

  topicMap.forEach((k, v) -> {

  try {

  String message = mqUtil.getRedisTemplate().opsForList().rightPop(k);

  if (message == null) {

  count.getAndIncrement();

  } else {

  pushTask(v, message, k);

  }

  } catch (RedisConnectionFailureException connException) {

  log.error(“redis无法连接,10s后重试”, connException);

  sleep(10);

  } catch (Exception e) {

  log.error(“redis消息队列异常”, e);

  }

  });

  if (count.get() == topicMap.keySet().size()) {

  //当所有的队列都为空时休眠1s

  sleep(1);

  }

  }

  };

  }

  private void pushTask(RedisConsumer item, String value, String key) {

  threadPoolExecutor.execute(() -> {

  try {

  item.deal(value);

  } catch (Exception e) {

  log.error(“执行消费任务出错”, e);

  //非广播消息进行数据回补

  mqUtil.getRedisTemplate().opsForList().rightPush(key, value);  郑州妇科医院哪家好:http://www.zztjfk.com/郑州看妇科哪家医院好:http://www.zztjfk.com/郑州哪里的妇科医院好:http://www.zztjfk.com/

  }

  });

  }

  loop 方法无限循环根据 topic 从 redis 中取数据,如果取到数据,调用 pushTask 方法执行,如果执行报错将会进行数据回补。

  完整代码见本文结尾

  测试

  运行项目后调用,MainController中的接口即可测试。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/8001.html

(0)
上一篇 2023-03-09
下一篇 2023-03-09

相关推荐

  • 蓝牙耳机什么牌子好?荣耀FlyPods3唯一心动妙不可言

    蓝牙耳机什么牌子好?荣耀FlyPods3唯一心动妙不可言     由于使用方便、携带便捷,蓝牙耳机逐渐成为了人们日常除智能手机以外随身必备的产品。同时在现代生活中,我们又随时都被日常通勤途中的人流、广告以及来往的机动车,还有公司里的键盘产生的噪音所包围着…

    2023-03-09
    154
  • python如何开发gui程序(python做GUI)

    python如何开发gui程序(python做GUI)在网页中下载python3解释器,我下载时候,版本是选择的是3.3,因为python3和python2语法上有些区别,所以大家就跟我一样都用python3吧,或许有朋友对于“语法”这个词不明白,没关系,后面小编会给大家讲到的。下好后,文件名为python-3.3.3.msi,当然因为我们下载时间不同,可能文件有所不同,比如你下的可能是python-3.4.0.msi,因为这个软件也是在不断升级中的

    2023-11-27
    124
  • 使用 Apache Superset 可视化 ClickHouse 数据[通俗易懂]

    使用 Apache Superset 可视化 ClickHouse 数据[通俗易懂]Apache Superset是一个强大的BI工具,它提供了查看和探索数据的方法。它在 ClickHouse 用户中也越来越受欢迎。 我们将介绍安装 Superset 的 2 种方法,然后展示如何从

    2023-04-17
    156
  • MySQL数据库:group分组

    MySQL数据库:group分组group by:分组 GroupBy语句从英文的字面意义上理解就是“根据(by)一定的规则进行分组(Group)”。它的作用是通过一定的规则将一个数据集划分成若干个小的区域,然后针对若干个小区域进行

    2022-12-21
    156
  • Python工程师

    Python工程师Python是一种简单易学、功能强大的编程语言,在人工智能、数据分析、Web开发等领域有着广泛的应用。Python工程师是指掌握Python语言,具有一定计算机编程基础,能够运用Python语言进行软件开发、数据处理等工作的工程师。

    2024-06-11
    62
  • ambari-server_ubuntu改主机名

    ambari-server_ubuntu改主机名版本:ambari 2.7.3 ,其他版本应该也差不多是一样的 一、背景说明 时不时就有小伙伴微信里面问我有没有做过,为已有的 ambari 集群修改主机名?之前是有修改过 ip 的,主机名还真没修改

    2023-04-25
    165
  • 论数据库项目的代码组织[通俗易懂]

    论数据库项目的代码组织[通俗易懂]数据库项目的代码组织大致有两种形式:增量式与快照式。 Entity Framework (Core)的数据库迁移工程就是经典的增量式组织形式:有专门的迁移历史表标识增量版本,不同版本之间的升级、降级由

    2023-05-15
    138
  • 用Python的pandasdatareader获取金融市场数据

    用Python的pandasdatareader获取金融市场数据pandasdatareader是一个使用Python语言获取数据的库,主要通过pandas库对Yahoo Finance、Google Finance、World Bank等数据源进行操作。pandasdatareader支持多个金融数据源,可以使用一个API来获取多个数据源的数据信息,使得数据的获取更加方便。

    2024-08-11
    31

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注