redisson消息队列_消息队列实例

redisson消息队列_消息队列实例在很多时候,一个系统可能就那么一个两个场景需要使用到MQ,特意为此引入如 rabbitMQ、rocketMQ 等消息中间件,不但提高了运维成本、机器成本,还增加了系统复杂度,显得很不划算。通过 redis 的 list 数据结构,可以很方便的实现 MQ 的功能。我们通过最常见的…

前言

在很多时候,一个系统可能就那么一个两个场景需要使用到MQ,特意为此引入如 rabbitMQ、rocketMQ 等消息中间件,不但提高了运维成本、机器成本,还增加了系统复杂度,显得很不划算。通过 redis 的 list 数据结构,可以很方便的实现 MQ 的功能。我们通过最常见的三个 MQ 使用场景来学习如何用 redis 来实现简单的 MQ 功能。

单服务单节点接收

消息发往单个服务,且一条消息仅被一个节点接收。这是MQ使用当中最常见的场景。

单服务单节点接收

采用轮询的方式,不断的从 list 拿消息,达到MQ的效果。因为 redis 命令都是原子操作,所以可以不用考虑并发同步的问题。
接收端关键代码:

@PostConstruct
public void init() {
    new Thread(this).start();
}

@Override
public void run() {
    while (!shutdown) {
        Object obj = redisTemplate.boundListOps("OSON_LIST_MQ_KEY").rightPop(1, TimeUnit.SECONDS);
        if (Objects.nonNull(obj)) {
            System.out.printf("%s get message : %s \n", Thread.currentThread().getName(), obj);
        }
    }
}

发送端代码:

BoundListOperations<Object, Object> listOperations = redisTemplate.boundListOps("OSON_LIST_MQ_KEY");
for (int i = 0; i < 10; i++) {
    listOperations.leftPush("oson " + i);
    System.out.printf("send message %d \n", i);
}

接收端启动一条线程,不断的轮询 key 为 OSON_LIST_MQ_KEY 的 redis list, rightPop 方法会把最右边的消息移除并返回。
发送端则往 OSON_LIST_MQ_KEY 的左边写入消息,最终实现MQ效果。

多服务单节点接收

消息发往多个服务,在每个服务内一条消息仅被一个节点接收。

多服务单节点接收

实现原理和单服务接收一致,但需要发送方同时往两条不同 list 发送消息,每个服务各轮询一个 list ,以此来达到效果。
接收端代码和单服务一致。
发送端代码:

BoundListOperations<Object, Object> listOperationsA = redisTemplate.boundListOps("MSON_LIST_MQ_SERVICE_A_KEY");
BoundListOperations<Object, Object> listOperationsB = redisTemplate.boundListOps("MSON_LIST_MQ_SERVICE_B_KEY");
for (int i = 0; i < 10; i++) {
    String message = "mson " + i;
    listOperationsA.leftPush(message);
    listOperationsB.leftPush(message);
    System.out.printf("send message %d \n", i);
}

广播

消息被所有节点接收,例如本地缓存的更新。

广播

在redis中实现广播功能是通过 发布/订阅 来实现的。接收方监听指定 Channel ,发送方往 Channel 发送消息。

接收端关键代码:

@Bean
public MessageListener messageListener(RedisTemplate<String, String> redisTemplate, RedisMessageListenerContainer container) {
    MessageListener listener = (message, pattern) -> {
        String value = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
        System.out.println(value);
    };

    container.addMessageListener(listener, new ChannelTopic("BROADCASTING_PUBSUB_MQ_KEY"));
    return listener;
}

@Bean
public RedisMessageListenerContainer messageListenerContainer(RedisConnectionFactory factory) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(factory);
    return container;
}

发送端代码:

for (int i = 0; i < 10; i++) {
    String message = "BroadCastingPubsub " + i;
    redisTemplate.convertAndSend("BROADCASTING_PUBSUB_MQ_KEY", message);
    System.out.printf("send message BroadCastingPubsub %d \n", i);
}

接收方除了指定 channel 名称外,还可以使用模糊匹配的方式监听多个 channel。

docker 安装 redis

在学习各种中间件时,环境安装是个很头疼的问题,但有了 docker 之后一切都变的简单了,所以,学技术的人一定得在自己电脑上安装 docker。
用 docker 运行 redis :

docker pull redis:5.0.5
docker run -d --name redis5.0.5 -p 6379:6379 redis:5.0.5

简单两个命令即完成了 redis 环境,完全可以满足开发、学习的需要。

在本地模拟多个节点部署

有时候我们需要多节点部署的环境,但不是每个人都有多台机器可供折腾,开虚拟机或 docker 是个办法,但还可以用更加简单的办法来满足需求。

    1. 执行 maven install 完成打包,得到可运行的 jar 包 (Spring boot)。
    1. IDE 运行项目,得到一个节点。
    1. 进入 jar 包所在目录,打开cmd 执行 java -jar 命令 用 -Dserver.port 指定不同的端口,启动得到节点。

java 启动命令示例:

java -jar "-Dserver.port=18080" ./redis-mq-oson-list-1.0.jar

pom依赖

RedisTemplate 需要添加依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

结语

用 list 来实现 MQ 功能有一些缺陷,简单使用还可以,应对复杂场景就优点力不从心了。幸好 Redis 5.0 带来了全新的功能 Redis Stream ,可以完整的实现甚至是替换 MQ 中间件的功能。


示例中完整的源码: mq-learning

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/13429.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注