Elasticsearch环境搭建和river数据导入(四)

其实有了那篇服务开发总结,这篇看起来没什么必要了,不过作为纯技术的总结还是发出来吧。

在前几篇中说到了es的jdbc工具,这个工具确实挺好,可以满足一些不想要开发量的小公司的要求。

但从我们使用的过程中也看出了这个工具的一些缺陷:

1.方案强依赖于数据库表的时间戳。
不靠谱,你不能要求你的业务RD或者老的遗留系统能够满足你的这些规范方面的要求(刚从dc出来的时候感觉很多规范方面的事情是应该做的,但根据工作经历来看,设计技术方案最好还是要考虑到最坏的情况)。

2.这个工具每一个脚本只能执行一条sql语句。
这是一个比较重大的缺陷,因为工具本身是java写的,这就意味着每个脚本都是在独立的jvm里跑。而jvm,你懂的。基本一台64GB的机器跑个七八个脚本,再加上es本身,内存就快要爆掉了。而实际上这件事情本身不应该需要这么大的硬件成本。

3.读写分离问题导致的数据延迟问题。
这条其实是1衍生出来的,同样考虑最坏的情况,主从延迟如果到了一定程度,那么在脚本运行期间内会导致时间戳前进之后,有旧数据入到从库。那么脚本的逻辑就会产生数据丢失。
为了解决这个问题你需要把目标指向主库,显然这是不合理的,读写分离就是为了分离主库的压力,结果一个高频率的同步操作还要去读主库,不靠谱。

所以对于大一些的公司,或者允许自己拿出时间来开发的小公司来说,应该寻求更合理的解决方案。

先来看看我们公司现在在用的解决方案:

mysql binlog -> alibaba canal server -> kafka -> consumer -> other storage

由mysql产生row format的binlog,然后通过alibaba的canal server解析,按照我们的要求把内容塞到kafka里,因为kafka里的消息不会丢失,所以后续的消费者可以解析消息,并把数据同步到任何的异构存储当中去。

这里面也同时有一些问题:

1.增量同步,如果是+x -y的更新,怎么保证数据的一致性?
好解决,把更新后的数据全量存储在消息体中,消费者拿到消息后相当于把数据进行全量覆盖,所以可以保证数据的最终一致性。也就是相当于把非幂等的消息改造成了幂等的消息。(当然了这个说法不太严密,意会即可)

2.消息量大了怎么办,kafka会不会抗不住?
第一,kafka在消息队列产品中几乎是高吞吐和可靠的代表性产品了。其设计在很多方面具有先进性,比如对sendfile和zerocopy的运用,replica的设计等等。
第二,如果消息量太大,目前我们的方案是可以按照主键来分区放到kafka的多个partition中的。这样同一个consumer-group的消费者也可以分开partition来消费,提高消费速度,同时也可以保证有些业务要求的“同一键/索引”的消息需要保持顺序。

3.如何计算消息的延迟?
消息在进入canal server的时候会打上时间戳,canal到kafka是很快的,基本可以认为没有延迟。然后从kafka到消费者的时候又可以打上时间戳,这两个时间戳之间的差值就可以认为是最终同步到异构存之前的时间差。
看起来时间会很长,其实实际上都是秒级的。对业务来说可能都感知不到这个延迟的。

这些问题解决了其实已经没有什么大问题了。

下面来讨论一下这个方案的缺陷:

1.canal server和kafka的repilca集群可能对于小公司来说成本有点太高了,所以这套方案可能不是那么具有借鉴性。
解决方案:理解了思路的话,自己开发一部分组件就好了。

2.row format的binlog在数据表发生schema变动(如增加字段、过期数据删除)时会有大量没有用的binlog,这个会影响到消费者的处理能力。
解决方案:屏蔽掉这些binlog,需要在canal那层去做这件事情。如果没有办法屏蔽的话,为了提高消费者的速度,例如消费者是golang写的话,那么应该让binlog的格式在外层就可以看到这个事件的类型,尽量别在完全json decode之后才能判断出来。因为json decode的字符串处理本身是比较耗时的操作。这样,可以在不解析内容的前提下提前获知这条消息是否需要处理。但目前我们的binlog看起来设计不太科学,也没辙了。

3.比起jdbc的方案依赖项更加多了,在任何一个环节出问题都可能导致整个流程出问题。
解决方案:这个好像没什么好办法,只能通过加监控,例如消息延迟突然变大、或者消费者一段时间拿到的消息数为0,来及时发现问题了。

4.join类型的index/type需要单独去处理。之前用jdbc工具的时候有人图省事,直接在sql里写join来创建es里的索引。
解决方案:解析binlog的时候去查询数据库,来组成新的文档,或者使用es的nested文档特性。不过哪个都不是特别靠谱。

如果小公司也想要这么一套方案的话,canal的部分可能需要自己去开发。可能还有一些人不太喜欢kafka这类偏java栈(其实是scala)的东西,想要换掉其中的消息队列。那么也应该尽量注意消息队列本身的可靠性,毕竟数据同步这种事情,丢一条数据都不是闹着玩的。不过说起来,如果要换掉消息队列的话又有什么可以选呢?nuts?还是nsq呢?

不要有技术栈迷信为好~

附上从kafka到es的消费者程序代码,欢迎吐槽:

https://github.com/cch123/binlog-river-es

Xargin

Xargin

If you don't keep moving, you'll quickly fall behind
Beijing