Solo  当前访客:5 开始使用


策略优先于代码

1. 背景

我目前主要负责从线下到线上导数据这一块。首先内部custom会把他们线下算出来的数据写到一张hive表里面。我们就从hive表里面读出数据,发送到kafka,线上的service再从kafka消费,写到live的Aerospike。

Pipeline流程图如下:
ScreenShot20191110at5.09.49PM.png

而在这整个过程中,数据经过了多个转化阶段,每个节点都可能因为数据格式或者程序的错误丢失一部分数据,如何衡量和监控整个pipeline数据的转化率成为一个比较关键的因素。
为此我们设计了一套metrics系统,记录每个阶段数据的成功转化率,并且把他们都发给了线上的日志系统进行记录。
我们的UI再从日志系统提供的API获得这些metrics的信息,以job历史的记录展示在页面上,方便监控最近job的运行状态。
然而最近读取日志系统API,返回给前端job历史的功能暴露出来很多问题。其中一个最主要的问题是返回的job有时候多,有时候少,让用户很困惑。另一个问题是我们给前端的API只能返回最近7天的job历史,而我们的PO大人要求一个月的历史。

基于这两点需求,我对我们从sherlock读取历史job返回给前端的API进行了重新设计和实现。

2. 不稳定的原因分析

2.1 技术背景介绍

首先这里先介绍一下我们的日志抓取API, API形式大致如下:

List<LogInfo> getLogInfo(LogType type, Long startTime, Long endTime)

它存在以下几个问题:

  1. 不稳定。基本上每10次call会有6次返回server error。
  2. sla太长,基本一次调用接近秒级。而我们前端能接受的最大时延只能是毫秒级别的。
  3. 如果时间范围超过7天,返回server error。因此对于返回一个月的情况,就需要多次调用。

而我们提供给前端的API,大致如下:

List<Job> getJobList(Long startTime, Long endTime)

它的要求:

  1. 稳定,每次必须返回正确的数据。
  2. sla必须毫秒级。
  3. 查询的时间范围要求可以达到一个月。

基于上述矛盾,自然而然的我们使用了cache, 它的基本架构如下。
ScreenShot20191110at5.29.50PM.png
而对于cache的构建,需要经过三个步骤:

  1. 通过日志API获取指定一段时间范围内所有的job id列表。
  2. 对于每一个job id,再次调用日志API获取job的summary信息。
  3. 由于job的还有一部分信息在另一个日志类型里面,因此还需要另一次call去返回job的entity相关的信息。

2.2 核心问题: cache策略的失误

通过观察发现,每次cache更新的时候,都是整个cache list的全替换。如果我们基于的log api是稳定的,那么这个不会是问题,而且每次同步都会得到实时最新的job metrics信息。然而问题任何一次API call都可能返回server error,这样每次获取的job就可能不太一样(因为失败是随机的)。

3. 新cache策略

3.1 基本的设计

  1. 建立30的window(这个N是可配置的),每个window是一天,在一天的范围内获取所有的job id,然后根据job id去抓取summary和entity信息构建job对象。
  2. 如果这一天的job列表已经成功获得了,那么下一次同步的时候,就可以不再从新获取,历史某一天跑了哪些job是不会变的。今天会做特殊处理,每次同步它都需要重新获取job历史。
  3. 对于一个job而言,如果在运行过程中,那么它的metrics是递增的。因此如果一个job的信息两次抓取(间隔5分钟)的metrics信息都是一样的,那么表明这个job已经完成了,标记完成,下一次同步的时候,这个job不再重新获取。
  4. 对于同一个job的每次更新,我们都会检测这次抓取是否成功,只有成功获取的metrics信息,才会更新到cache里面来。
  5. 如果一天内所有的job都已经完成了,那么当前这天的job在同步时候不再重新抓取。今天除外。
  6. 当检测到新的一天到来,会重新建一个新的一天的时间window,替换掉最老的一天。这里数据结构上,我选取了guava的EvictingQueue, 它的一个很好的特性是在queue满了的情况下,自动删掉最先加入queue的元素。
  7. 同一个job可能跨多天,但是它只能属于某一天的时间window。否则一个job不稳定,会造成连续M天的不稳定,如果M大于我们抓取的历史job的window K(3.3节第3小点),就会影响schedule job的效率。因此我采用了注册机制,每天的window在爬取当天有哪些job的时候,都会把今天的batch信息注册到job里面,如果没有,那么抢注成功。如果有了的话,就会去比较当前batch天是否比现已经注册的早,如果早,说明这天才是这个job开始的日期,就修改成当前batch。

3.2 启动的策略

在服务启动的时候,我们需要对cache进行初始化,而如果一次全部加载30天的数据,肯定是不行的。经过多次优化后的策略如下:

  1. 服务启动的时候,根据当前时间划分时间分片,构建dayJobBatch的list列表。
  2. 服务启动2分钟后,每一分钟内触发schedule的job刷新今天没有稳定的job信息。这个schedule的job会一直运行下去,保持同步最新的job。
  3. 服务启动2分钟后,每5分钟触发一次schedule的job从前(30天前)往后(到昨天),选择3天(K可配置)没有完成的window,抓取job信息。当所有的历史信息更新完成了,这个schedule的job会很清闲。当然对于那些超过一天的job还是有它用武的地方。

3.3 整个新cache策略实现的类图

SherlockCache2.jpg

4 潜在的bug和Fix

4.1 数据天然存在不完整的情况

问题:对于这种情况,解析获得的数据是null,因此会被认为爬取不成功,这样就会让这个job当天的batch一直重试。
解决方法:如果爬取成功了,但是数据本身有问题,那么会给它返回一个空的实体,而不是null。这样两次重复返回空的实体,也会检测到没有变化,而把这个job标记为Finish。而跳出重复爬取的怪圈。

4.2 长job 的问题

问题:如果一个job运行多天,目前会把它放在最早出现的那一天取检测运行(每天检测job的时候,如果发现在更早的一天出现,就把它移动到更早的一天)。为什么我们要知道它最早是那一天出现呢?因为我们的job数据是连续在一个时间段的一个aggragation的结果。如果不从最早出现的那一天算起,就会丢失数据。但是这样对于运行几天的job会造成一个问题,它最早出现的那天一直不稳定,会影响后面几天job的获取。
解决方法:掠夺。
如果一个job在今天出现了,那就把它掠夺到今天来。


标题:策略优先于代码
作者:wuhaifengdhu
地址:https://wudevin.cn/articles/2019/11/10/1573381584336.html
只有站在峰顶的人,才能看见星辰大海。
标签: ,
新一篇: 我在 GitHub 上的开源项目 旧一篇: 人的文化基因