beanstalkd是一个使用还算广泛的开源消息队列,由c语言编写。因为本身功能简单,代码也不多,所以阅读可以帮助我快速复习一下网络编程,并且麻雀虽小,五脏俱全,一个c语言项目的方方面面它也都包含了。

---入口

万事先从main函数开始:

int  
main(int argc, char **argv)  
{
    //用来存储服务端生成的socket的fd
    int r;
    //job的链表
    struct job list = {};

    //程序名字
    progname = argv[0];
    //设置为行buffer模式,具体参见man手册,遇到换行即发送到目标
    setlinebuf(stdout);
    //解析输入参数
    optparse(&srv, argv+1);
    //常见的-v输出更多显示信息
    if (verbose) {
        printf("pid %d\n", getpid());
    }

    //生成server端的socket,里面的写法和一般的网络编程没有区别
    r = make_server_socket(srv.addr, srv.port);
    if (r == -1) twarnx("make_server_socket()"), exit(111);
    srv.sock.fd = r;
    //一些初始化工作,如果没有default的tube,那么就创建一个
    prot_init();

    //指定了运行时用户的话,切换到那个用户
    if (srv.user) su(srv.user);
    //设置信号处理函数
    set_sig_handlers();

    //使用-b参数时,会将srv.wal.use设置为1,其实就是消息需要持久化
    if (srv.wal.use) {
        // We want to make sure that only one beanstalkd tries
        // to use the wal directory at a time. So acquire a lock
        // now and never release it.
        // 文件锁
        if (!waldirlock(&srv.wal)) {
            twarnx("failed to lock wal dir %s", srv.wal.dir);
            exit(10);
        }

        list.prev = list.next = &list;

        //用binlog.X文件们来初始化全局的tube list和job list
        walinit(&srv.wal, &list);
        //重新遍历全局的job list,并把job塞入对应的tube的ready/delay等的堆中
        r = prot_replay(&srv, &list);
        if (!r) {
            twarnx("failed to replay log");
            return 1;
        }
    }

    //网络编程的事件循环就在这里面了,包括accept和epoll/kqueue的事件处理
    srvserve(&srv);
    return 0;
}

---binlog文件中的内容存储结构

|文件开始|
|4个字节|=>表示文件协议版本,v5和v7两种版本,具体对应到jobrec上的结构大小不一样


===========================================
|4个字节|=>表示tubename的长度namelen
|namelen长度个字节|=>表示一个tubename
|sizeof(Jobrec)个字节|=>表示该tube里的一个job
|msgbody| => 大小从Jobrec->r->body_size里得到
===========================================
上面的内容*N

---数据结构

在beanstalkd里大概有下面这些比较重要的数据结构:

heap

beanstalkd里自己封装的heap,直接带上了容量和目前的长度,看起来“封装”的数据结构一般都会带上额外的信息,将获取这些的时间复杂度尽量降低,redis里也有大量类似的做法

struct Heap {  
    int     cap;
    int     len;
    void    **data;
    Less    less; //是一个函数指针,比较两个堆结点的内容大小,由使用者提供具体比较方法
    Record  rec;
};

tube

封装的tube链表
struct ms {  
    size_t used, cap, last; //
    void **items; //这里面存的才是真正的tube们
    ms_event_fn oninsert, onremove; //插入和删除job时的回调函数
} tubes;

tube本体  
struct tube {  
    uint refs; //引用数
    char name[MAX_TUBE_NAME_LEN]; //tubename
    Heap ready; //ready状态的job堆
    Heap delay; //delay状态的job堆
    struct ms waiting; /* set of conns */
    struct stats stat;
    uint using_ct;
    uint watching_ct;
    int64 pause;
    int64 deadline_at;
    struct job buried;
};

job

struct job {  
    Jobrec r; // persistent fields; these get written to the wal,这个是job对应到Wal文件里的内容,不是把job这个struct写进去的

    /* bookeeping fields; these are in-memory only */
    //英文注释说的比较明白了orz
    char pad[6];
    tube tube;
    job prev, next; /* linked list of jobs */
    job ht_next; /* Next job in a hash table list */
    size_t heap_index; /* where is this job in its current heap */
    File *file;
    job  fnext;
    job  fprev;
    void *reserver;
    int walresv;
    int walused;

    char body[]; // written separately to the wal
};

file

beanstalkd对linux的file做了一次封装,其实就是做了一个File的链表

struct File {  
    File *next;
    uint refs;
    int  seq;
    int  iswopen; // is open for writing
    int  fd;
    //文件内的空闲空间
    int  free;
    //文件内已经用掉的空间
    int  resv;
    //存储路径
    char *path;
    Wal  *w;

    struct job jlist; // jobs written in this file
};

Jobrec

Jobrec是直接写入到wal文件里的内容,成员命名都比较好懂,不再赘述

// if you modify this struct, you must increment Walver above
struct Jobrec {  
    uint64 id;
    uint32 pri;
    int64  delay;
    int64  ttr;
    int32  body_size;
    int64  created_at;
    int64  deadline_at;
    uint32 reserve_ct;
    uint32 timeout_ct;
    uint32 release_ct;
    uint32 bury_ct;
    uint32 kick_ct;
    byte   state;
};

socket

自己定义了一个socket,里面带上了socket的fd和对应的处理函数

struct Socket {  
    int    fd;
    Handle f;
    void   *x;
    int    added;
};

conn

对客户端和服务端连接的简单封装

struct Conn {  
    Server *srv;
    Socket sock;
    char   state;
    char   type;
    Conn   *next;
    tube   use;
    int64  tickat;      // time at which to do more work
    int    tickpos;     // position in srv->conns
    job    soonest_job; // memoization of the soonest job
    int    rw;          // currently want: 'r', 'w', or 'h'
    int    pending_timeout;
    char   halfclosed;

    char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated
    int  cmd_len;
    int  cmd_read;

    char *reply;
    int  reply_len;
    int  reply_sent;
    char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated

    // How many bytes of in_job->body have been read so far. If in_job is NULL
    // while in_job_read is nonzero, we are in bit bucket mode and
    // in_job_read's meaning is inverted -- then it counts the bytes that
    // remain to be thrown away.
    int in_job_read;
    job in_job; // a job to be read from the client

    job out_job;
    int out_job_sent;

    struct ms  watch;
    struct job reserved_jobs; // linked list header
};

server

对server做了简单封装,具体看下面吧

struct Server {  
    char *port;
    char *addr;
    char *user;

    Wal    wal; //write ahead log结构,内含文件链表
    Socket sock; //整个server对应的监听socket
    Heap   conns; //管理连接的堆
};

---job生命周期

 put with delay               release with delay
----------------> [DELAYED] <------------.
                      |                   |
               kick   | (time passes)     |
                      |                   |
 put                  v     reserve       |       delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
                     ^  ^                |  |
                     |   \  release      |  |
                     |    `-------------'   |
                     |                      |
                     | kick                 |
                     |                      |
                     |       bury           |
                  [BURIED] <---------------'
                     |
                     |  delete
                      `--------> *poof*

---事件处理

beanstalkd其实也是一个网络程序,所以它的做法和其它的网络程序差不多。

因为本身是一个单线程的程序,所以也没有用到pthread库之类的麻烦的多线程库。不过出于性能考虑,也用上了各个操作系统上的io multiplexing,比如在freebsd上用kqueue来实现多路复用,在linux上用epoll来实现。这一点和redis差不多,自己实现起来也不是很麻烦。

---使用方面

如何做分布式

因为beanstalkd非常的简单,所以本身没有提供cluster之类的功能,所以要做分布式的话和一般的分布式没什么两样,大概有这么两种选择:

proxy

smart client  

proxy如果你见过mysql proxy或者redis对应的codis的话,那么就明白这里的proxy是啥意思了。拿我们公司的proxy来举例,实际上proxy层做的是jobid的二次封装和tcp层的请求转发。做起来并不是很难,proxy层维护的jobid实际上是serverid+jobid(这个jobid是beanstalkd里的原装id),这里我们姑且把这个中间的jobid叫mid-jobid。

客户端发job到proxy之后,proxy会先根据哈希规则(round robin/random/consistent hash等)来把job发往对应的机器,然后在proxy层用golang的map来维护mid-jobid和机器的对应关系。为什么要在proxy层维护这个对应关系?那显然是因为后端的server结点存在挂掉的可能性啊,所以每次计算并不一定会映射到相同的机器上。也就是说proxy层是要负责出问题的机器的动态摘除的。

相信你肯定也发现了这个proxy的问题,因为mid-jobid和后端的server一一对应,所以如果在消息发送到了对应的server之后,这个server挂掉了,那么这条消息也就丢失了。开发者也比较有自知之明,在声明中只写了这个proxy只保证producer的高可用,也就是说,如果某台机器挂了,我们依然是能够把消息发送到对应的机器上。至于消息丢了,那这个proxy是不管的。

如果想要改进的话怎么办?那么一条消息就不能只发给一台机器了,但是因为消息系统集群本身是希望能够使消息系统能够横向扩展,又没有办法像配置系统那样做所有结点的副本冗余。如果要参考其它多副本系统(比如hdfs或者kafka)的话,那么会使proxy的逻辑变的更复杂。然后你可能就会觉得,为什么我不去用kafka?

事实确实是这样的。与其把这种proxy搞得越来越复杂,不如在kafka上去做二次开发。

当然了,像rabbitmq那种消息在所有结点上冗余的可以实现结点挂不影响消息,但这个其实更多是为了HA,而不是为了横向扩展。所以rabbitmq也会遇到消息暴涨的时候所有机器都会受影响而难以横向扩展的问题。

做分布式的另一种思路是smart client。也就是客户端的负载均衡,这个和memcached的客户端一致性哈希(ketama hash之类的)做法差不多,缺点也很明显,smart client必须有能力感知结点是否存活,而这种smart client实际上对客户端提出了更多的要求。像php一类的“用后即焚”的语言是难以实现这种功能的(现在也有一些trick的方式来实现,这里就不展开了)。