您好、欢迎来到现金彩票网!
当前位置:秒速快三 > 数据流 >

数据流时代Teads如何做到每天赋予1000亿事件价值?

发布时间:2019-05-17 18:11 来源:未知 编辑:admin

  在这篇文章中,我们描述了如何协调Kafka,Dataflow和BigQuery共同采集和转换大数据流。当增加了模式和延时的约束时,调优和重新排序成了很大的挑战,下面展示了我们是如何解决它的。

  在这篇文章中,我们描述了如何协调Kafka,Dataflow和BigQuery共同采集和转换大数据流。当增加了模式和延时的约束时,调优和重新排序成了很大的挑战,下面展示了我们是如何解决它的。

  在数字广告中,日常运营产生了许多我们需要跟踪的事件,以便透明地报道活动的效益。这些事件来自:

  用户与广告互动,通过浏览器发送。这些事件被称为可以标准化的(开始、完成、暂停、回复等)跟踪事件,或者使用Teads Studio构建的具有互动创意的自定义事件。我们每天收到大约100亿个跟踪事件。

  来自我们的后端这些事件都是关于广告拍卖的大部分(实时出价流程)细节。在抽样之前我们每天产生的这些事件超过600亿,在2018年这个数字将翻一番。

  浏览器通过HTTP将跟踪数据发送到一个专用组件,其他的事情都列进了Kafka的topic中。Analytics是这些事件的服务对象之一。

  我们用一个Analytics小组,他们的任务是按照如下定义管理这些事件:

  为了完成这个任务,我们建立和维护了一系列处理工具和管道。由于公司的有机增长和新产品的需求,我们定期挑战我们的结构。

  数据的模式使它不可能存放在单一的Cassandra表中,这会妨碍高效的交叉查询,

  它是一个复杂的基础框架,在批处理和速度层都会出现代码复制,这阻碍了我们新功能的高效发布,

  这时候,我们有了几种可能的选择。首先,我们可以建立一个增强的lambda,但它只能推迟我们要面临的问题。

  我们考虑了几个有前景的替代品,像Druid何BigQuery。我们最终选择迁移到BiQuery,因为他有很多强大的功能。

  感谢flat-rate计划,我们高强度的用法(查询和存储方式)是具有高成本效益的。

  然而,我们的技术环境不适合BigQuery。我们想用它来存储和转换来自多个Kafka topic 的所有事件。我们无法让我们的Kafka群组移出AWS,也无法使用与Kafka托管等效的Pub/Sub,因为这些群集也被我们托管在AWS上的一些广告投放组件使用。因此,我们不得不处理来自运营的多云基础框架的挑战。

  今天,BigQuery是我们的数据仓库系统,用于我们的跟踪数据与其他的原始数据的协调核对。

  当处理追踪事件的时候,你面对的首要问题就是,你必须在不知道延迟的情况下无序地处理他们。

  事件实际发生的时间(事件触发时间,event time)和系统注意到这个事件的时间(处理时间,processing time)之间的时间间隔的范围涵盖了从毫秒级到小时级。这些巨大的延迟并不罕见,而且当用户在浏览会话的时间中间连接断开了或者开启了飞行模式,就会出现这种情况。

  如果要获取流数据处理遇到的问题相关更多信息,我们建议去看Google Cloud Next17 中Tyler Akidau(Google数据处理技术主管)和 Lo?c Jaures(Teads的共同创始人和技术部高级副总裁)讨论《批处理和流处理之间的来回转换》。本文就是受到这个讨论的启发。

  Dataflow是一个管理流系统,为了应对我们面对的事件的混乱本质的挑战而生。Dataflow有一个统一的流和批处理编程模型,流是它的主推特性。

  由于Dataflow的承诺和对流模式的大胆尝试,我们购买了它。不幸的是,在面对真实生产环境的数据传输,我们感到了惊骇:BigQuery的流插入代价。

  我们对压缩数据大小(即,通过网络的字节的实际数据卷)和非BigQuery的原始数据格式大小已经有了基本估算。幸运的是现在已经为每个数据类型提供了文档,因此你也可以做计算。

  那时候,我们低估了这个额外代价的100倍,这几乎是我们整个获取渠道(Dataflow + BigQuery)的两倍代价。我们也遇到了其他的局限,例如100,000 events/s 速率限制,这已经几乎接近我们在做的事情了。

  好消息是,有一种方法可以完全避免流插入限制:批量加载到BigQuery。

  理想情况下,我们希望在流模式中使用Dataflow,在批处理模式下使用BigQuery。在那个时候,Dataflow SDK中没有用于无限制数据流的BigQuery批处理接收器。

  然后我们考虑开发自己的自定义接收器。不幸的是,当时不可能在无限制的数据流中添加一个自定义的接收器(见Dataflow计划为在将来的版本中增加对编写无界数据的自定义接收器的支持现在这是有可能的,Beam是官方的Dataflow SDK)。

  我们别无选择,只能把我们的数据转换成批处理模式。由于Dataflow的统一模型,这仅仅是几行代码的问题。幸运的是,我们可以接收由切换到批处理模式所引入的额外数据处理延迟。

  继续向前推进,我们目前的接入架构是基于Scio,这是一个由Spotify提供的Dataflow开源的Scala API。如前所述,Dataflow原生支持Pub/Sub,但集成Kafka还不太成熟。我们必须扩展Scio以支持检查点持久性和有效的并行性。

  我们的结果处理架构是一个30个节点的Dataflow批处理作业的链,按顺序排列,读取Kafka topic,并使用加载作业来写入BigQuery。

  其中一个关键是找到理想的分批时间。我们发现在成本和读取性能之间有一个最佳的平衡点(因此延迟)。调整的变量是Kafka读取阶段的持续时间。

  要得到完整的批处理时间,您必须将写入操作添加到BigQuery阶段也算在里面(不是成比例增加的,而是与读操作时间密切相关),再加上一个常量,也就是启动和关闭消耗的时间。

  读取阶段太短会降低读取和非读取阶段之间的比例。在一个理想的情况下,1:1的比值意味着你必须能够以同样的速度进行读取和写入。在上面的例子中,我们有20分钟的读取阶段,对一个30分钟的批处理(比值为3:2)。这意味着我们必须能够在读取数据时比我们写入数据的速度快1.5倍。小的比值意味着需要更大的实例。

  过长的读取阶段将简单地增加事件的发生时刻与BigQuery中其可用的时刻之间的延迟。

  为简便以及更易于失败管理,数据流作业按顺序启动。这是我们愿意采取的延迟所做的折衷。如果某项作业失败了,我们只需返回上次所提交的Kafka偏移即可。

  我们必须修改我们的Kafka集群的拓扑结构,并增加分区的数量,以便能够更快地unstack消息。根据你在Dataflow中所进行的转换,受限的因素很可能是在处理能力或网络吞吐量上。为了实现高效的并行,你应该始终尝试保留大量CPU线程,这个数字是你所拥有的分区数量的一个因子(推论:Kafka分区的数量是多因子合数,这是很不错的)。

  在极少数的延迟情况下,我们可以用较长的读取序列对作业进行微调。通过使用更大的批处理,我们也能够以延迟为代价来赶上这类延迟。

  为了处理大部分情况,我们调整Dataflow使其读取速度以比实际速度快3倍。用单个n1-highcpu-16实例读取20分钟可以unstack 60分钟的消息。

  在我们的用例中,我们最终得到的锯齿式延迟,震荡范围在3分钟(Write BQ阶段的最小时长)和30分钟(作业的总时长)之间。

  原始数据是不可避免地体积庞大,我们有太多的事件,并照目前状态无法查询它们。我们需要汇总这些原始数据以保持较低的读取时间和紧凑的体积大小。以下是我们在BigQuery中的做法:

  与传统ETL过程中数据在加载之前进行转换不同的是,我们选择以原始格式首先存储它(ELT)。

  我们希望直接写入每天分区的原始事件表。我们不能因为Dataflow批处理就必须使用特定的目标(表或分区)来定义,并且可以包含针对不同分区的数据。我们通过将每个批装载到一个临时表中来解决这个问题,然后开始转换它。

  对于这些临时批处理表,我们运行一组转换,这些转换被具体化成SQL查询,输出到其他表。其中一个转换只是将所有数据附加到大型原始事件表,并在白天进行分区。

  另一个转换是rollup:给定一组维度数据的聚合。所有这些转换都是幂等的,可以在错误或需要进行数据再处理的情况下安全地重新运行。

  直接查询原始事件表是很好的调试,也有利于深入分析,但是直接查询原始表不可能达到可接受的性能,更不用说这种操作的成本了。

  为了给你一个想法,这个表格只保留了4个月,包含1万亿个事件,大小接近250TB。

  在上面的示例中,我们将事件计数设置为3个维度:小时、Ad ID、网站ID。事件也被旋转并转换为列。该示例显示了2.5x的减少,而实际情况则接近70x。

  在BigQuery大型并行上下文中,查询运行时不会受到太大影响,改进是根据使用的槽数来衡量的。

  Rollups还让我们将数据划分为小块:事件被分组到小的表中,每一个小时(事件时间的小时,而不是处理时间)。因此,如果您需要查询给定小时的数据,您将查询单个表(10M行,10GB)。

  Rollups是一种通用的聚合,我们可以更有效地查询所有事件,给定了大量的维度。还有一些其他的用例,我们希望对数据有专门的视图。它们每个都可以实现一组特定的转换,最终得到一个专门的和优化的表。

  BigQuery不允许查询具有不同模式(即使查询没使用不同的字段)的多个表。当我们需要添加一个字段,我们用一个脚本来做上百个表的批量更新。

  BigQuery不支持列删除。没什么大不了的,因为这对技术而言没什么负担。

  查询多个小时:BigQuery的表名支持通配符,但是性能非常差,我们生成查询的时候,需要使用UNION ALL来明确要查询的每张表。

  我们总是需要连接带有托管在其他数据库(例如,给事件提供更多的广告活动信息)上数据的这些事件,但是BigQuery也不支持这个。我们现在不得不定期把完整的表拷贝到BigQuery上,以便能在单个查询中做数据连接。

  通过在AWS中Teads的广告投放基础设施和Kafka群组来与其它组件共享,我们别无选择,只能在AWS和GCP云之间移动大量数据,当然这不容易,无疑也不会便宜。我们将Dataflow实例(这主要的是GCP的切入点)尽可能靠近放置在我们的AWS基础设施旁边。幸运的是,AWS和GCP之间的连接足够好,以至于我们可以简便的使用托管的VPN。

  虽然我们运行这些VPN遇到了一些不稳定性,但我们想办法整理出了一个简单的脚本,用来再一次的打开和关闭VPN。我们从未面对过一个足够巨大的问题来证明专用链路的成本。

  又一次,费用成了你不得不密切关注的事情,出口是令人担忧的,在你看到账单之前费用是难以估计的。为了压缩成本,你需要仔细选择压缩数据的方法。

  在BigQuery中所拥有的这些事件是不够的。为了给业务带来价值,数据必须与不同的规则和度量相结合。此外,BigQuery不适合实时用例。

  由于并发限制和不可压缩的查询延迟3到5秒(可接受和固有的设计),BigQuery必须与其他工具混合,以服务应用程序(指示板、web ui等)。

  这个任务由我们的分析服务来执行,它是一个Scala组件,它利用BigQuery来生成按需报告(电子表格)和定制的数据集市(每日或每小时更新)。

  我们选择了AWS Redshift来存储和服务我们的数据集市。尽管服务于面向用户的应用程序似乎不是一个清晰的选择,但Redshift对我们很适用,因为我们的并发用户数量是有限的。

  另外,使用键/值存储器需要更多的开发工作。通过保持中间的关系数据库,数据集市的消费变得更容易了。

  关于如何规划化地构建、维护和查询这些数据集市,这会有很多话题,但他们将成为另一篇文章的主题。

http://starnet-france.com/shujuliu/54.html
锟斤拷锟斤拷锟斤拷QQ微锟斤拷锟斤拷锟斤拷锟斤拷锟斤拷锟斤拷微锟斤拷
关于我们|联系我们|版权声明|网站地图|
Copyright © 2002-2019 现金彩票 版权所有