[消息队列]beanstalkd源码详解

目录

1.消息队列简介

2.beanstalkd基本知识

2.1beanstalkd简介

2.2beanstalkd命令

3.beanstalkd源码分析

3.1数据结构

3.1.1基础结构体

3.1.2 管道tube

3.1.3任务job

3.14套接字socket

3.15服务器server

3.1.6客户端链接conn

3.2 服务器启动过程

3.2.1 epoll简介

3.2.2 beanstalkd使用epoll

3.2.3服务器启动

3.3 服务器与客户端的数据交互

3.4 命令的处理过程

3.4.1查找命令

3.4.2命令1——发布任务

3.4.3 命令2——获取任务reserve

总结


1.消息队列简介

计算机软件发展的一个重要目标是降低软件耦合性;

网站架构中,系统解耦合的重要手段就是异步,业务之间的消息传递不是同步调用,而是将一个业务操作分为多个阶段,每个阶段之间通过共享数据的方式异步执行;

在分布式系统中,多个服务器集群通过分布式消息队列实现异步;分布式消息队列可以看作是内存队列的分布式部署;

分布式消息队列架构图通常如下所示:

[消息队列]beanstalkd源码详解

消息队列是典型的生产者消费者模式,两者不存在直接调用,只要保持数据结构不变,彼此功能实现可以随意改变而不互相影响;异步消息队列还有以下特点:

  • 提高系统可用性:消费者服务器发生故障时,生产者服务器可以继续处理业务请求,系统整体表现无故障;此时数据会在消息队列服务器堆积,待消费者服务器恢复后,可以继续处理消息队列中的数据;
  • 加快网站相应速度:业务处理前端的生产者服务器在处理完业务请求后,将数据写入消息队列,不需要等待消费者服务器处理就可以返回,减少响应延迟;
  • 消除并发访问高峰:用户访问是随机的,存在高峰和低谷;可以使用消息队列将突然增加的访问请求数据放入消息队列中,等待消费者服务器依次处理;

消费者消费消息时,通常有两种模式可以选择:拉模型与推模型。

  • 拉模型是由消息的消费者发起的,主动权把握在消费者手中,它会根据自己的情况对生产者发起调用;
  • 推模式消费者只会被动接受消息,消息队列一旦发现消息进入,就会通知消费者执行对消息的处理;

2.beanstalkd基本知识

2.1beanstalkd简介

beanstalkd是一个轻量级的消息队列;主要有一下特点:

  • 拉模式,消费者需要主动从服务器拉取消息数据;
  • tube:类似于消息主题topic,一个beanstalkd中可以支持多个tube,每个tube都有自己的producer和consumer;多个生产者可以往同一个tube生产job,多个消费者也能监听同一个tube获取job;
  • job:代替了传统的message,与消息最大的区别是,job有多种状态;
  • conn:代表一个客户端链接;
  • 优先级:job可以有0~2^32个优先级,0代表最高优先级,beanstalkd使用堆处理job的优先级排序,因此reserve命令的时间复杂度是O(logN);
  • 延时:生产者发布任务时可以指定延时,到达延迟时间后,job才能被消费者消费;
  • 超时机制:消费者从beanstalkd获取一个job后,必须在预设的 TTR (time-to-run) 时间内处理完任务,并发送 delete / release/ bury 命令改变任务状态;否则 Beanstalkd 会认为消息消费失败,重置job状态,使其可以被其他消费者消费。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从重新计时TTR;
  • 暂停:pause命令可以暂停当前tube,暂停时期内所有job都不能够被消费者消费;

job有一下几种状态:

  • READY,需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;
  • DELAYED,延迟执行的任务,;
  • RESERVED,已经被消费者获取, 正在执行的任务,Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;
  • BURIED,保留的任务: 任务不会被执行,也不会消失,除非有人将他修改为其他状态;
  • DELETED,消息被彻底删除。Beanstalkd 不再维持这些消息。

状态之间的转移图如下所示:

[消息队列]beanstalkd源码详解

思考:

  • beanstalkd如何维护job的状态ube有3个集合delay、ready和 buried分别存放对应状态的job,conn的reserved_jobs集合存储状态为reserved的job(消费者获取一个job后,job的状态才会改变为reserved,因此这个集合由conn维护);
  • delay状态的job怎么修改为readyelay集合是一个按照时间排序的最小堆,beanstalkd不定时循环从堆根节点获取job,校验是否需要改变其状态未ready;
  • 如何实现优先级有ready状态的job才能被消费者获取消费,ready集合是一个按照优先级排序的最小堆,根节点始终是优先级最高得job;
  • 拉模式实现费者使用reserve命令获取job,beanstalkd检查消费者监听的所有tube,查找到ready的job即返回,否则阻塞消费者知道有ready状态的job产生为止;

2.2beanstalkd命令

beanstalkd支持以下命令:

命令

含义

命令

含义

use 使用指定tube
watch  监听指定tube
ignore  取消监听指定tube
list-tubes 列出所有的tube
list-tube-used 列出当前客户端使用的tube
list-tubes-watched 列出当前客户端监听的所有tube
pause-tube  暂停指定tube,暂停期间所有job都不能再被消费者消费

put rn

rn

生产者发布job

reserve

 

RESERVED rn

rn

获取job,如果客户端监视的所有tube都没有ready状态的tube,阻塞客户端;否则返回job
reserve-with-timeout  通reserve,只是设置最大阻塞时间
peek  返回id对应的job
peek-ready  返回下一个ready的job
peek-delay  返回剩余时间最短的delay状态的job
peek-buried  返回下一个buried列表中的job
release    将一个reserved状态的job重置为ready状态
bury  将job的状态设置为buried
kick  将buried状态的job迁移为ready,或将delay状态的job迁移为ready
kick-job  将一个job的状态迁移为ready
delete 删除一个job
touch  客户端reserve获取job后,发现没有足够时间处理此job,发送touch命令,放服务器重新开始计时TTR
stats-job  查询job状态
stats-tube  查询tube状态
stats  查询服务器统计信息
quit  退出

3.beanstalkd源码分析

3.1数据结构

3.1.1基础结构体

3.1.2 管道tube

创建tube的代码如下:


  1. //按照优先级比较
  2. int job_pri_less(void *ax, void *bx)
  3. {
  4. job a = ax, b = bx;
  5. if (a->r.pri r.pri) return 1;
  6. if (a->r.pri > b->r.pri) return 0;
  7. return a->r.id r.id;
  8. }
  9. //按照过期时间比较
  10. int job_delay_less(void *ax, void *bx)
  11. {
  12. job a = ax, b = bx;
  13. if (a->r.deadline_at r.deadline_at) return 1;
  14. 来源:艾 尼 路

    声明:本站部分文章及图片转载于互联网,内容版权归原作者所有,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2018年8月22日
下一篇 2018年8月22日

相关推荐