博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能
阅读量:5936 次
发布时间:2019-06-19

本文共 15947 字,大约阅读时间需要 53 分钟。

本文转载自链接:

最近使用redis的c接口——hiredis,使客户端与redis服务器通信,实现消息订阅和发布(PUB/SUB)的功能,我把遇到的一些问题和解决方法列出来供大家学习。
       废话不多说,先贴代码。
redis_publisher.h
/*************************************************************************    > File Name: redis_publisher.h    > Author: chenzengba    > Mail: chenzengba@gmail.com     > Created Time: Sat 23 Apr 2016 10:15:09 PM CST    > Description: 封装hiredis,实现消息发布给redis功能 ************************************************************************/#ifndef REDIS_PUBLISHER_H#define REDIS_PUBLISHER_H#include 
#include
#include
#include
#include
#include
#include
#include
#include
class CRedisPublisher{public: CRedisPublisher(); ~CRedisPublisher(); bool init(); bool uninit(); bool connect(); bool disconnect(); bool publish(const std::string &channel_name, const std::string &message);private: // 下面三个回调函数供redis服务调用 // 连接回调 static void connect_callback(const redisAsyncContext *redis_context, int status); // 断开连接的回调 static void disconnect_callback(const redisAsyncContext *redis_context, int status); // 执行命令回调 static void command_callback(redisAsyncContext *redis_context, void *reply, void *privdata); // 事件分发线程函数 static void *event_thread(void *data); void *event_proc();private: // libevent事件对象 event_base *_event_base; // 事件线程ID pthread_t _event_thread; // 事件线程的信号量 sem_t _event_sem; // hiredis异步对象 redisAsyncContext *_redis_context;};#endif
redis_publisher.cpp
/*************************************************************************    > File Name: redis_publisher.cpp    > Author: chenzengba    > Mail: chenzengba@gmail.com     > Created Time: Sat 23 Apr 2016 10:15:09 PM CST    > Description:  ************************************************************************/ #include 
#include
#include
#include "redis_publisher.h"CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),_redis_context(0){}CRedisPublisher::~CRedisPublisher(){}bool CRedisPublisher::init(){ // initialize the event _event_base = event_base_new(); // 创建libevent对象 if (NULL == _event_base) { printf(": Create redis event failed.\n"); return false; } memset(&_event_sem, 0, sizeof(_event_sem)); int ret = sem_init(&_event_sem, 0, 0); if (ret != 0) { printf(": Init sem failed.\n"); return false; } return true;}bool CRedisPublisher::uninit(){ _event_base = NULL; sem_destroy(&_event_sem); return true;}bool CRedisPublisher::connect(){ // connect redis _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用默认端口 if (NULL == _redis_context) { printf(": Connect redis failed.\n"); return false; } if (_redis_context->err) { printf(": Connect redis error: %d, %s\n", _redis_context->err, _redis_context->errstr); // 输出错误信息 return false; } // attach the event redisLibeventAttach(_redis_context, _event_base); // 将事件绑定到redis context上,使设置给redis的回调跟事件关联 // 创建事件处理线程 int ret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread, this); if (ret != 0) { printf(": create event thread failed.\n"); disconnect(); return false; } // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(_redis_context, &CRedisPublisher::connect_callback); // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(_redis_context, &CRedisPublisher::disconnect_callback); // 启动事件线程 sem_post(&_event_sem); return true;}bool CRedisPublisher::disconnect(){ if (_redis_context) { redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; } return true;}bool CRedisPublisher::publish(const std::string &channel_name, const std::string &message){ int ret = redisAsyncCommand(_redis_context, &CRedisPublisher::command_callback, this, "PUBLISH %s %s", channel_name.c_str(), message.c_str()); if (REDIS_ERR == ret) { printf("Publish command failed: %d\n", ret); return false; } return true;}void CRedisPublisher::connect_callback(const redisAsyncContext *redis_context, int status){ if (status != REDIS_OK) { printf(": Error: %s\n", redis_context->errstr); } else { printf(": Redis connected!\n"); }}void CRedisPublisher::disconnect_callback( const redisAsyncContext *redis_context, int status){ if (status != REDIS_OK) { // 这里异常退出,可以尝试重连 printf(": Error: %s\n", redis_context->errstr); }}// 消息接收回调函数void CRedisPublisher::command_callback(redisAsyncContext *redis_context, void *reply, void *privdata){ printf("command callback.\n"); // 这里不执行任何操作}void *CRedisPublisher::event_thread(void *data){ if (NULL == data) { printf(": Error!\n"); assert(false); return NULL; } CRedisPublisher *self_this = reinterpret_cast
(data); return self_this->event_proc();}void *CRedisPublisher::event_proc(){ sem_wait(&_event_sem); // 开启事件分发,event_base_dispatch会阻塞 event_base_dispatch(_event_base); return NULL;}
redis_subscriber.h
/*************************************************************************    > File Name: redis_subscriber.h    > Author: chenzengba    > Mail: chenzengba@gmail.com     > Created Time: Sat 23 Apr 2016 10:15:09 PM CST    > Description: 封装hiredis,实现消息订阅redis功能 ************************************************************************/#ifndef REDIS_SUBSCRIBER_H#define REDIS_SUBSCRIBER_H#include 
#include
#include
#include
#include
#include
#include
#include
#include
class CRedisSubscriber{public: typedef std::tr1::function
NotifyMessageFn; // 回调函数对象类型,当接收到消息后调用回调把消息发送出去 CRedisSubscriber(); ~CRedisSubscriber(); bool init(const NotifyMessageFn &fn); // 传入回调对象 bool uninit(); bool connect(); bool disconnect(); // 可以多次调用,订阅多个频道 bool subscribe(const std::string &channel_name); private: // 下面三个回调函数供redis服务调用 // 连接回调 static void connect_callback(const redisAsyncContext *redis_context, int status); // 断开连接的回调 static void disconnect_callback(const redisAsyncContext *redis_context, int status); // 执行命令回调 static void command_callback(redisAsyncContext *redis_context, void *reply, void *privdata); // 事件分发线程函数 static void *event_thread(void *data); void *event_proc(); private: // libevent事件对象 event_base *_event_base; // 事件线程ID pthread_t _event_thread; // 事件线程的信号量 sem_t _event_sem; // hiredis异步对象 redisAsyncContext *_redis_context; // 通知外层的回调函数对象 NotifyMessageFn _notify_message_fn;};#endif
redis_subscriber.cpp:
/*************************************************************************    > File Name: redis_subscriber.cpp    > Author: chenzengba    > Mail: chenzengba@gmail.com     > Created Time: Sat 23 Apr 2016 10:15:09 PM CST    > Description:  ************************************************************************/ #include 
#include
#include
#include "redis_subscriber.h"CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),_redis_context(0){}CRedisSubscriber::~CRedisSubscriber(){}bool CRedisSubscriber::init(const NotifyMessageFn &fn){ // initialize the event _notify_message_fn = fn; _event_base = event_base_new(); // 创建libevent对象 if (NULL == _event_base) { printf(": Create redis event failed.\n"); return false; } memset(&_event_sem, 0, sizeof(_event_sem)); int ret = sem_init(&_event_sem, 0, 0); if (ret != 0) { printf(": Init sem failed.\n"); return false; } return true;}bool CRedisSubscriber::uninit(){ _event_base = NULL; sem_destroy(&_event_sem); return true;}bool CRedisSubscriber::connect(){ // connect redis _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用默认端口 if (NULL == _redis_context) { printf(": Connect redis failed.\n"); return false; } if (_redis_context->err) { printf(": Connect redis error: %d, %s\n", _redis_context->err, _redis_context->errstr); // 输出错误信息 return false; } // attach the event redisLibeventAttach(_redis_context, _event_base); // 将事件绑定到redis context上,使设置给redis的回调跟事件关联 // 创建事件处理线程 int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this); if (ret != 0) { printf(": create event thread failed.\n"); disconnect(); return false; } // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(_redis_context, &CRedisSubscriber::connect_callback); // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(_redis_context, &CRedisSubscriber::disconnect_callback); // 启动事件线程 sem_post(&_event_sem); return true;}bool CRedisSubscriber::disconnect(){ if (_redis_context) { redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; } return true;}bool CRedisSubscriber::subscribe(const std::string &channel_name){ int ret = redisAsyncCommand(_redis_context, &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s", channel_name.c_str()); if (REDIS_ERR == ret) { printf("Subscribe command failed: %d\n", ret); return false; } printf(": Subscribe success: %s\n", channel_name.c_str()); return true;}void CRedisSubscriber::connect_callback(const redisAsyncContext *redis_context, int status){ if (status != REDIS_OK) { printf(": Error: %s\n", redis_context->errstr); } else { printf(": Redis connected!"); }}void CRedisSubscriber::disconnect_callback( const redisAsyncContext *redis_context, int status){ if (status != REDIS_OK) { // 这里异常退出,可以尝试重连 printf(": Error: %s\n", redis_context->errstr); }}// 消息接收回调函数void CRedisSubscriber::command_callback(redisAsyncContext *redis_context, void *reply, void *privdata){ if (NULL == reply || NULL == privdata) { return ; } // 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问 CRedisSubscriber *self_this = reinterpret_cast
(privdata); redisReply *redis_reply = reinterpret_cast
(reply); // 订阅接收到的消息是一个带三元素的数组 if (redis_reply->type == REDIS_REPLY_ARRAY && redis_reply->elements == 3) { printf(": Recieve message:%s:%d:%s:%d:%s:%d\n", redis_reply->element[0]->str, redis_reply->element[0]->len, redis_reply->element[1]->str, redis_reply->element[1]->len, redis_reply->element[2]->str, redis_reply->element[2]->len); // 调用函数对象把消息通知给外层 self_this->_notify_message_fn(redis_reply->element[1]->str, redis_reply->element[2]->str, redis_reply->element[2]->len); }}void *CRedisSubscriber::event_thread(void *data){ if (NULL == data) { printf(": Error!\n"); assert(false); return NULL; } CRedisSubscriber *self_this = reinterpret_cast
(data); return self_this->event_proc();}void *CRedisSubscriber::event_proc(){ sem_wait(&_event_sem); // 开启事件分发,event_base_dispatch会阻塞 event_base_dispatch(_event_base); return NULL;}

问题1:hiredis官网没有异步接口的实现例子。

        hiredis提供了几个异步通信的API,一开始根据API名字的理解,我们实现了跟redis服务器建立连接、订阅和发布的功能,可在实际使用的时候,程序并没有像我们预想的那样,除了能够建立连接外,任何事情都没发生。
        网上查了很多资料,原来hiredis的异步实现是通过事件来分发redis发送过来的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一个实现事件的分发,网上的资料提示使用libae、libev和libuv可能发生其他问题,这里为了方便就选用libevent。
hireds官网并没有对libevent做任何介绍,也没用说明使用异步机制需要引入事件的接口,所以一开始走了很多弯路。
        关于libevent的使用这里就不再赘述,详情可以见libevent官网。
libevent官网:http://libevent.org/
libevent api文档:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae
CRedisPublisher和CRedisSubscriber的初始化过程:
初始化事件处理,并获得事件处理的实例:
_event_base = event_base_new();
在获得redisAsyncContext *之后,调用
redisLibeventAttach(_redis_context, _event_base);
这样就将事件处理和redis关联起来,最后在另一个线程调用
event_base_dispatch(_event_base);
启动事件的分发,这是一个阻塞函数,因此,创建了一个新的线程处理事件分发,值得注意的是,这里用信号灯_event_sem控制线程的启动,意在程序调用
redisAsyncSetConnectCallback(_redis_context,         &CRedisSubscriber::connect_callback);    redisAsyncSetDisconnectCallback(_redis_context,        &CRedisSubscriber::disconnect_callback);
之后,能够完全捕捉到这两个回调。

问题2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’错误

        有些人会觉得这两个类设计有点冗余,我们发现CRedisPublisher和CRedisSubscriber很多逻辑是一样的,为什么不把他们整合到一起成一个类,既能够发布消息也能够订阅消息。其实一开始我就是这么干的,在使用的时候发现,
用同个redisAsynContex *对象进行消息订阅和发布,与redis服务连接会自动断开,disconnect_callback回调会被调用,并且返回奇怪的错误:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同个redisAsyncContext *对象实现发布和订阅。这里为了减少设计的复杂性,就将两个类的逻辑分开了。
        当然,你也可以将相同的逻辑抽象到一个基类里,并实现publish和subscribe接口。

问题3 相关依赖的库

        编译之前,需要安装hiredis、libevent和boost库,我是用的是Ubuntu x64系统。
hiredis官网:https://github.com/redis/hiredis
下载源码解压,进入解压目录,执行make && make install命令。
libevent官网:http://libevent.org/下载最新的稳定版
解压后进入解压目录,执行命令
./configure -prefix=/usr
sudo make && make install
boost库:直接执行安装:sudo apt-get install libboost-dev
如果你不是用std::tr1::function的函数对象来给外层通知消息,就不需要boost库。你可以用接口的形式实现回调,把接口传给CRedisSubscribe类,让它在接收到消息后调用接口回调,通知外层。

问题4 如何使用

        最后贴出例子代码。
publisher.cpp,实现发布消息:
/*************************************************************************    > File Name: publisher.cpp    > Author: chenzengba    > Mail: chenzengba@gmail.com     > Created Time: Sat 23 Apr 2016 12:13:24 PM CST ************************************************************************/#include "redis_publisher.h"int main(int argc, char *argv[]){    CRedisPublisher publisher;    bool ret = publisher.init();    if (!ret)     {        printf("Init failed.\n");        return 0;    }    ret = publisher.connect();    if (!ret)    {        printf("connect failed.");        return 0;    }    while (true)    {        publisher.publish("test-channel", "Test message");        sleep(1);    }    publisher.disconnect();    publisher.uninit();    return 0;}
subscriber.cpp实现订阅消息:
/*************************************************************************    > File Name: subscriber.cpp    > Author: chenzengba    > Mail: chenzengba@gmail.com     > Created Time: Sat 23 Apr 2016 12:26:42 PM CST ************************************************************************/#include "redis_subscriber.h"void recieve_message(const char *channel_name,    const char *message, int len){    printf("Recieve message:\n    channel name: %s\n    message: %s\n",        channel_name, message);}int main(int argc, char *argv[]){    CRedisSubscriber subscriber;    CRedisSubscriber::NotifyMessageFn fn =         bind(recieve_message, std::tr1::placeholders::_1,        std::tr1::placeholders::_2, std::tr1::placeholders::_3);    bool ret = subscriber.init(fn);    if (!ret)    {        printf("Init failed.\n");        return 0;    }    ret = subscriber.connect();    if (!ret)    {        printf("Connect failed.\n");        return 0;    }    subscriber.subscribe("test-channel");    while (true)    {        sleep(1);    }    subscriber.disconnect();    subscriber.uninit();    return 0;}
关于编译的问题:在g++中编译,
注意要加上-lhiredis -levent参数,下面是一个简单的Makefile:
EXE=server_main client_mainCC=g++FLAG=-lhiredis -leventOBJ=redis_publisher.o publisher.o redis_subscriber.o subscriber.oall:$(EXE)$(EXE):$(OBJ)	$(CC) -o publisher redis_publisher.o publisher.o $(FLAG)	$(CC) -o subscriber redis_subscriber.o subscriber.o $(FLAG)redis_publisher.o:redis_publisher.hredis_subscriber.o:redis_subscriber.hpublisher.o:publisher.cpp	$(CC) -c publisher.cppsubscriber.o:subscriber.cpp	$(CC) -c subscriber.cppclean:	rm publisher subscriber *.o
致谢:
redis异步API使用libevent:http://www.tuicool.com/articles/N73uuu

你可能感兴趣的文章
【推荐】jquery开发的大型web应用—H5编辑器工具
查看>>
CentOS安装Python3
查看>>
redis常用命令
查看>>
【Vue实例】
查看>>
20170808 - 跨浏览器的事件兼容处理程序
查看>>
JavaScript this 从此不再疑惑
查看>>
【阅读笔记】Web安全深度剖析
查看>>
微信小程序中使用emoji表情相关
查看>>
浏览器端Event loop简介
查看>>
在 Node.js 中用 pipe 处理数组的实现
查看>>
Web开发中常见的认证机制
查看>>
[译] 设计一个现代的缓存
查看>>
nodejs微信公众号开发——0.起手式
查看>>
Learn Spring - Spring DAO
查看>>
移动APP测试之安全性测试策略分析
查看>>
解读 2018 之运维篇:我们离高效智能的运维还有多远
查看>>
深入redux技术栈
查看>>
Scrum Guides 2017年最新修改
查看>>
Java永久代去哪儿了
查看>>
Microsoft将持续交付功能添加到Visual Studio、Azure
查看>>