redis的发布订阅模式pubsub

时间:2022-05-02
本文章向大家介绍redis的发布订阅模式pubsub,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前言

redis支持发布订阅模式,在这个实现中,发送者(发送信息的客户端)不是将信息直接发送给特定的接收者(接收信息的客户端),而是将信息发送给频道(channel),然后由频道将信息转发给所有对这个频道感兴趣的订阅者。

发送者无须知道任何关于订阅者的信息,而订阅者也无须知道是那个客户端给它发送信息,它只要关注自己感兴趣的频道即可。

对发布者和订阅者进行解构(decoupling),可以极大地提高系统的扩展性(scalability),并得到一个更动态的网络拓扑(network topology)。

redis 发布订阅主要由三个entity组成:channel/subscriber/publisher。

channel:

频道有两种类型

明确型,news.sport,体育类新闻

模糊型,news.*,各种新闻

下面实现对于这两种是透明的。

# -*- coding:utf-8 -*-


class Channel(object):

    def __init__(self, channel=''):
        self.channel = channel

    def __str__(self):
        return self.channel


class ChannelFactory(object):

    def __init__(self, *channels):
        if isinstance(channels[0], (tuple, list)):
            self.channel_list = [Channel(channel) for channel in channels[0]]
        self.channel_list = [Channel(channel) for channel in channels]

    def get_channels(self):
        return self.channel_list

user:

主要有两类,订阅者subscriber和发布者publisher,他们都继承自Pubsub,由继承关系实现:

# -*- coding:utf-8 -*-
import redis


class Pubsub(object):

    def __init__(self, redis_config):
        pool = redis.ConnectionPool(
                host=redis_config.get('host'),
                port=redis_config.get('port'),
                db=redis_config.get('db')
        )
        self.redis = redis.StrictRedis(connection_pool=pool)

class Subscriber(Pubsub):

    def __init__(self, redis_config):
        Pubsub.__init__(self, redis_config=redis_config)
        self.pubsub = self.redis.pubsub()

    def subscribe(self, *channel):
        self.pubsub.subscribe(*channel)

    def psubscribe(self, *channel_pattern):
        self.pubsub.psubscribe(*channel_pattern)

    def listen(self):
        for item in self.pubsub.listen():
            yield item

    def unsubscribe(self, *channel):
        self.pubsub.unsubscribe(*channel)

    def punsubscribe(self, *channel_pattern):
        self.pubsub.unsubscribe(*channel_pattern)

class Publisher(Pubsub):
    def __init__(self, redis_config):
        Pubsub.__init__(self, redis_config=redis_config)

    def publish(self, channel, message):
        self.redis.publish(channel, message)

测试

分两部分,订阅进程和发布进程

订阅进程:

from config import redis as redis_config
from subscriber import Subscriber
from channel import ChannelFactory

if __name__ == '__main__':
    channel = ChannelFactory('news.*', 'movie.*').get_channels()
    sub = Subscriber(redis_config)
    sub.psubscribe(channel)
    for item in sub.listen():
        print item

发布进程:

from config import redis as redis_config
from publisher import Publisher
from channel import Channel

if __name__ == '__main__':
    channel = Channel('news.abc')
    pub = Publisher(redis_config)
    pub.publish(channel, 'aaaaaaaa')