讲解一下Redis中的订阅发布模式

发布与订阅模型在许多编程语言中都有实现,也就是我们经常说的设计模式中的一种–观察者模式。在一些应用场合,例如发送方并不是以固定频率发送消息,如果接收方频繁去咨询发送方,这种操作无疑是很麻烦并且不友好的。

为什么做订阅分布?

随着业务复杂, 业务的项目依赖关系增强, 使用消息队列帮助系统降低耦合度.

  • 订阅分布本身也是一种生产者消费者模式, 订阅者是消费者, 发布者是生产者.
  • 订阅发布模式, 发布者发布消息后, 只要有订阅方, 则多个订阅方会收到同样的消息
  • 生产者消费者模式, 生产者往队列里放入消息, 由多个消费者对一条消息进行抢占.
  • 订阅分布模式可以将一些不着急完成的工作放到其他进程或者线程中进行离线处理.

Redis中的订阅发布

Redis中的订阅发布模式, 当没有订阅者时, 消息会被直接丢弃(Redis不会持久化保存消息)

Redis生产者消费者

生产者使用Redis中的list数据结构进行实现, 将待处理的消息塞入到消息队列中.

class Producer(object):

def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"
self.value = "test_value_{id}"

def produce(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.lpush(self.key, msg)

消费者使用redis中brpop进行实现, brpop会从list头部消息, 并能够设置超时等待时间.

class Consumer(object):

def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = "test_key"

def consume(self, timeout=0):
# timeout=0 表示会无线阻塞, 直到获得消息
while True:
msg = self._conn.brpop(self.key, timeout=timeout)
process(msg)


def process(msg):
print msg

if __name__ == '__main__':
consumer = Consumer()
consumer.consume()
# 输出结果
('test_key''test_value_1')
('test_key''test_value_2')
('test_key''test_value_3')
('test_key''test_value_4')
('test_key''test_value_5')

Redis中订阅发布

在Redis Pubsub中, 一个频道(channel)相当于一个消息队列

class Publisher(object):

def __init__(self, host, port):
self._conn = redis.StrictRedis(host=host, port=port)
self.channel = "test_channel"
self.value = "test_value_{id}"

def pub(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.publish(self.channel, msg)

其中get_message使用了select IO多路复用来检查socket连接是否是否可读.

class Subscriber(object):

def __init__(self, host="localhost", port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self._pubsub = self._conn.pubsub() # 生成pubsub对象
self.channel = "test_channel"
self._pubsub.subscribe(self.channel)

def sub(self):
while True:
msg = self._pubsub.get_message()
if msg and isinstance(msg.get("data"), basestring):
process(msg.get("data"))

def close(self):
self._pubsub.close()

# 输出结果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_

Java Jedis踩过的坑

在Jedis中订阅方处理是采用同步的方式, 看源码中PubSub模块的process函数

do-while循环中, 会等到当前消息处理完毕才能够处理下一条消息, 这样会导致当入队列消息量过大的时候, redis链接被强制关闭.

解决方案: 将整个处理函数改为异步的方式.

原创文章,作者:晴川运维,如若转载,请注明出处:https://baike.qcidc.com/16705.html

(0)
晴川运维晴川运维
上一篇 2025年10月24日
下一篇 2025年10月24日

相关推荐

  • Linux下使用free命令查看内存使用情况

    free命令可以显示当前系统未使用的和已使用的内存数目,还可以显示被内核使用的内存缓冲区,下面为大家详细讲解一下 free命令使用实例。 显示系统内存 不带任何选项运行 free …

    Linux系统 2025年6月8日
  • Github 访问慢的若干解决方法

    我们经常要用到github,但是速度很慢,有时会有超时现象,十分地影响工作效率,可以考虑采取如下措施 1 使用 Github Mirror 下载 直接在 GitHub 仓库前面拼接…

    Linux系统 2025年6月12日
  • Linux下查看GPU信息命令

    现在不管是分布式还是其他服务器还是深度学习训练模型等,都离不开GPU,因此了解一下GPU常见的参数命令以及性能指标是十分重要的,本篇文章重点为大家讲解一下Linux下查看GPU信息…

    Linux系统 2025年7月1日
  • 详解Swift 可选(Optionals)类型

    Swift是一种适用于iOS和OS X应用的全新编程语言,它建立在最好的C和Objective-C语言之上,并且没有C语言的兼容性限制。Swift采用安全的编程模式,增加了现代功能…

    Linux系统 2025年10月6日
  • Linux下常见端口转发

    在一些实际的场景里,我们需要通过利用一些端口转发工具,比如系统自带的命令行工具或第三方小软件,来绕过网络访问限制触及目标系统,本篇文章重点为大家分享一下Linux下常见端口转发。 …

    Linux系统 2025年6月8日
  • 详解cron与anacron

    cron 是 linux 中运行例行性工作的一个服务,守护进程是crond,是一个用于运行计划任务如系统备份、更新等的守护进程。它适合在那些 7×24 小时不间断运行的机器如服务器…

    Linux系统 2025年6月27日
  • Linux下ulimit具体使用方法

    ulimit 是一个计算机命令,用于shell启动进程所占用的资源,可用于修改系统资源限制,本篇文章重点为大家分享一下Linux下ulimit具体使用方法,有需要的小伙伴可以参考一…

    Linux系统 2025年7月3日
  • 详解Bash提取子字符串具体方法

    所谓“子字符串”就是出现在其它字符串内的字符串。 比如 “3382” 就是 “this is a 3382 test” 的子字符串。 我们有多种方法可以从中把数字或指定部分字符串抽…

    Linux系统 2025年10月21日
  • Nginx的主要应用场景

    本文只针对Nginx在不加载第三方模块的情况能处理哪些事情,由于第三方模块太多所以也介绍不完,当然本文本身也可能介绍的不完整,毕竟只是我个人使用过和了解到过得。所以还请见谅,同时欢…

    Linux系统 2025年10月11日
  • 最受欢迎的10款Python开源框架总结

    本篇文章为大家讲解一下10个在GitHub等开源网站中最欢迎的Python开源框架,为大家做详细介绍和优劣势总结。这些框架包括Web开发,高性能网络通信,测试,爬虫等。 1.Dja…

    Linux系统 2025年6月29日

发表回复

登录后才能评论