Elasticsearch环境搭建和river数据导入(三)

很多事情真是说起来容易做起来难,就比如说起来很简单的:

把mysql的数据同步到elasticsearch里

听着很简单吧,我们有各种现成的工具,之前调研了半天,选择了elasticsearch jdbc这种类似river的工具,根据数据表里的update_time来做数据同步。将lastexecutionstart作为时间参数传到sql里,构成了下面这样的sql

select *, id as _id from [业务表] where update_time > ?
? = lastexecutionstart

看着是不是挺天衣无缝的?

如果真的这么顺利就好了。。。。

实际操作的时候发现了很多问题:

首先,我们的代码里没有在创建数据条目的时候填update_time,我&$@#$$(@#)*$。

而这种”显然我就不遵守既定的常识“的事情在我现在参与的这个系统里尤其的多。这种我觉得还是规范的问题,boss所谓的”现在还没有到我们遵守规范啊流程啊的阶段“根本就是悖论。为了省这么一点事情,当更优秀的人介入开发,引入其它的技术手段对系统进行改造的时候会显得困难重重。系统写成这样,连基本的业务文档都不写,当初的老人走的时候后面的人还能维护得动?我要对着你的代码来猜你的业务吗?

扯远了,为了解决这个问题,我们可以给在每一次业务系统更新时将update_time时间戳生成并写入。听着也挺简单吧。但是在这个系统里写入根本不!收!束!也就是说,你根本就不能确定某个业务表是不是只在有限的几个文件中被修改。指不定在哪里有人会直接裸写一个sql来修改你的数据库呢。。。被逼无奈,只能在表的update_time字段上加on update current_timestamp()了。这样至少可以保证mysql在数据更新的时候自动帮我们生成时间戳。事实证明我还是太年轻了。。mysql在5.6.3版本之前,不支持在一张表上有两个字段默认值使用current_timestamp()函数,即使一个在default current_timestamp(),一个在on update current_timestamp()也不行。

所以最终将创建数据部分的create_time进行了补全,之后使用了数据库的on update current_timestamp()才算是暂时解决了问题。

这之后本以为可以一帆风顺,结果又遇到了mysql和elasticsearch数据不同步的问题。首先我们分析一下elasticsearch jdbc的运行过程:

1.设置初始lastexecutionstart = [你给定的时间]

||interval

2.查询数据库select * from [业务表] where update_time > lastexecutionstart,记录最新的lastexecutionstart

||interval

3.重复2

分析一下上面的过程有什么问题呢?

可以大概推测出几个问题,首先,update_time > [lastexecutionstart]这条,如果恰巧会在lastexecutionstart时刻会创建多条数据,很有可能在查询结束后的同一时刻又有一些数据进入数据库,而因为mysql的时间戳只精确到秒,所以当tps比较高的时候,显然这个策略是有一些问题的。那么我们这里先把这个改成>=

跑了一段时间之后,发现还是会丢数据。。。

再来想一想,如果不会漏掉某一个时刻的数据的话,那么这个逻辑可能出现问题的情况就是“一个时刻创建了在这个时刻之前”的数据。那么在我们这种数据库两主两从的架构下,怎么才会出现这种情况呢?那就是主从同步延迟!我司的数据库都是做了读写分离的,数据写入在主,读取在从。一般情况下由db_proxy来干这件事情,其会根据你传给数据库vip的sql来判断该去哪台db机器去执行该条sql,我们的集群不指定router,且语句是select的话,一般就会直接去从库里读取了。

主从同步一般情况下延迟不会太大,但是在主库的压力比较大的时候,比如一次性写入非常多,那么同步就会有明显的延迟。实际业务中甚至可能有同步延迟十多分钟的情况(当然dba侧一般会在同步延迟很大的时候触发报警)。

那么这种情况就会导致我们的数据同步出问题了。跟dba协商后决定先把增量数据同步迁移到主库来做。看起来这个问题就可以解决了~

想得挺美。

还是有丢数据的问题。这一次分析就麻烦得多了,我们打开了elasticsearch jdbc的trace日志来跟踪问题,发现那些丢掉的数据在查询的时候还是没有被查到。

这次有点山穷水尽的感觉。。剩下的可能的原因只有es机器和db机器时间戳不同步、或者真的是在业务系统里会去创建过去的工单的情况了。

然后想起来知乎上曾经的一个段子,某程序员为了让一个写入任务成功,会for(int i=0; i<100; i++) write();听着很不靠谱。。但是为了解决问题也顾不得手段了。。

这里我们改造了一下之前的sql:

select * from [业务表] where update_time > date_sub(now(), 10 minute);

也就是说,每次运行都会去查询最近10分钟的所有数据
=> 一条数据最多会被写10次。。

从结果来看,效果还不错,至少数据是对得上了。

不过老实说,这个方案并不靠谱,那么什么是更靠谱的数据同步方案呢?下一次说吧。