在4月21日的Kafka Beijing Meetup第四场活动上,DataPipeline CTO陈肃分享了DataPipeline是如何基于Kafka Connect框架构建实时数据集成平台的应用实践。以下内容是基于现场录音整理的文字,供大家参考。
什么是数据集成?最简单的应用场景就是:一个数据源,一个数据目的地,数据目的地可以是个数据仓库,把关系型数据库的数据同步到数据仓库里,就形成了一次数据集成。
企业数据集成面临的4个挑战
我们先来看一个真实的数据集成案例。
G公司是DataPipeline的一个典型客户,拥有近千个数据源,类型主要包括Oracle、SQL Server、MySQL等。根据业务的需要和现有的基础设施情况,这些数据源分别需要同步到不同的目的端,类型主要包括MySQL、HDFS、Kafka等。基于以上背景,G公司的具体要求如下:
1. 需要支持约5TB日新增数据量的同步,今年将增长5-10倍。
2. 这些数据一部分数据源要求实时同步,另一部分可接受定时同步。
3. 缺乏强大的运维人才,现有数据源的业务承载压力有限,对压力非常的敏感,要求进行限流。
4. 从这些数据源到目的地的同步都是Kettle写脚本实现的,管理起来比较混乱,要求通过一个管理平台对任务进行集中化的配置和管理。
5. 上游的数据源和下游的数据目的都不稳定,随时可能出现各种问题,要求通过一个高可用的平台以减少数据传输中断的影响。
6. 当数据同步任务被随机的暂停/恢复时,要求可以保证数据的完整性。
7. 当数据源和目的地随机出现故障和过载时,要求可以保证数据的完整性。
8. 当数据源Schema发生变化时,要求可以根据业务需求灵活配置目的地策略。
G公司的案例只是当前企业数据集成需求的一个典型应用场景。事实上,无论是互联网企业还是传统企业,在面临数据集成的时候都会遇到以下4个挑战:
1. 数据源的异构性:传统ETL方案中,从数据源到目的地的同步都是脚本实现的,异构数据源就意味着企业要做大量的适配工作。
2. 数据源的动态性:在数据集成时,上游的数据源端经常会发生各种变化,有些数据源可能被删掉一些结构,这可能会影响到后续数据分析的结果。
3. 任务的可伸缩性:当数据集成只有几个数据源,系统压力的问题不太突出。当数据集成面临的是成百上千个数据源时,多任务并行就需要进行一些限速与缓冲的调度,让读写速度相互匹配。
4. 任务的容错性:当数据在传输过程中出现问题的时候,是否可以实现断点重传,且不产生重复的数据。
以上也是DataPipeline要为企业数据集成过程中解决的最关键的4个问题。
为什么选择Kafka Connect作为底层框架
Kafka Connect是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具,可以更快捷和简单地将大量数据集合移入和移出Kafka的连接器。Kafka Connect为DataPipeline提供了一个相对成熟稳定的基础框架,还提供了一些开箱即用的工具,大大地降低研发的投入和提升应用的质量。
下面,我们看一看Kafka Connect的具体优势。
首先,Kafka Connect提供的是以数据管道为中心的业务抽象。在Kafka Connect里有两个核心概念:Source和Sink。Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。比如Source Connector,Sink Connector,其实就是提供了数据读取和写入的高度业务抽象,可以简化很多生命周期的管理工作。
当然,Source Connector会去初始化Source Task,Sink Connector会去初始化Sink Task。这些都是标准的封装。对于数据方面,通过Source & Sink Record把数据的结构进行标准化的抽象。另外,企业客户在做数据集成的时候,数据在很多应用场景下都要求有一定的格式,所以在Kafka Connect里用Schema Registry & Projector来解决数据格式验证和兼容性的问题。当数据源产生变化的时候,会生成新的Schema版本,通过不同的处理策略用Projector来完成对数据格式的兼容。
第二,Kafka Connect具有良好的可伸缩性、与容错性。这些特性是与Kafka是一脉相承的。在流式处理和批量处理模式里,更多取决于Source端如何去读取数据,Kafka Connect天然支持流式处理和批量传输方式。单节点和集群水平扩展功能都是由Kafka Connect框架直接支持。而任务恢复和状态保持方面,目的端任务的写入进度信息通过Kafka Connect框架自动管理、源端任务可以根据需要往Kafka里面放读取进度信息,节省很多精力去管理任务重启后的进度。
对于数据集成这样一个通用的应用场景里,大家肯定都不希望重复发明轮子。目前,在Kafka Connect生态系统下,拥有可以直接使用的Connector共84个,绝大部分都是开源的。其中,一部分是Kafka官方提供的,另外一些是Confluent认证的,还有一些是第三方提供的。根据需求适当裁剪后,这些Connector都可以应用到自己的系统平台中。
DataPipeline解决哪些数据集成的核心难题
基于Kafka Connect 框架,DataPipeline已经完成了很多优化和提升工作,可以很好地解决当前企业数据集成面临的很多核心难题。
1. 任务的独立性与全局性。
从Kafka设计之初,就遵从从源端到目的的解耦性。下游可以有很多个Consumer,如果不是具有这种解耦性,消费端很难扩展。企业做数据集成任务的时候,需要源端到目的端的协同性,因为企业最终希望把握的是从源端到目的端的数据同步拥有一个可控的周期,并能够持续保持增量同步。在这个过程中,源端和目的端相互独立的话,会带来一个问题,源端和目的端速度不匹配,一快一慢,造成数据堆积现象严重。所以,企业用户在建立一个数据任务之后,我们希望对任务进行缓冲的控制,避免数据丢失。
2. 任务并行化的方式。
如果企业客户有1000张数据表需要建立数据集成的任务,就要考虑用什么方式进行任务切分最佳。其中一种方式是把1000张表切分成若干个任务。这种情况下,Source Task的负载很难做到均衡,Sink Task可以消费多个Topics,依然存在负载不均的问题,每个任务负载多少张表其实是很难均衡的。每增加一个任务都会触发Rebalance机制。可以想象,每一张表都通过Source Connector和Sink Connector初始化一个源端和目的端任务,会大大增加Rebalance的开销。
3. 异构数据的映射。
在给企业客户做数据集成的时候,50%几率都会遇到一些脏活累活——异构数据源的映射(Mapping)。这个映射对很多互联网公司来说不是那么严重什么事儿,因为数据库设计的都比较符合规范,对字段的命名方式等都会比较“优雅”(统一)。但是在传统企业里,由于很多业务系统都会外包,还有一些意识的原因,导致数据库设计的没有那么规范和统一。用Kafka Connect做数据集成的时候,需要尽可能做到异构数据精准的还原,尤其金融行业客户对此要求比较高。另外,当确实遇到数据之间不匹配的情况时,可以在业务数据之间进行比较合理的映射。
另外,源端的Source Record包含了每一列的基本数据类型(INT16、STRING等)以及可选的meta信息(例如“name”)。目的端处理Sink Record的时候,需要依据基本数据类型以及meta信息决定映射关系。
4. Schema变化的处理策略。
给企业做数据集成的时候,需要根据数据源Schema的变化给出对应的处理策略。基于Kafka Connect框架,我们提供了以下几种处理策略:
(1)Backward Compatibility:可使用最新的Schema一致访问所有数据,e.g. 删除列、添加具有默认值的列。
(2)Forward Compatibility:可使用最旧的Schema一致访问所有数据,e.g. 删除具有默认值的列。
(3)Full Compatibility:可任意使用新旧Schema访问所有数据。
Kafka Connect推荐使用Backward Compatibility,这也是Schema Registry的默认值。另外,企业用户还会提出源端删除列,目的端需要忽略,源端添加具有默认值列,目的端需要跟随等需求,都以Task为单位进行配置和实现。
DataPipeline基于Kafka Connect做了哪些提升
在不断满足当前企业客户数据集成需求的同时,DataPipeline也基于Kafka Connect 框架做了很多非常重要的提升。
1. 系统架构层面。
DataPipeline引入DataPipeline Manager的概念,主要用于优化Source和Sink的全局化生命周期管理。当任务出现异常时,可以实现对目的端和全局生命周期的管理。例如,处理源端到目的端读取速率不匹配以及暂停等状态的协同。
为了加强系统的健壮性,我们把Connector任务的参数保存在ZooKeeper中,方便任务重启后读取配置信息。
DataPipeline Connector通过JMX Client将统计信息上报Dashboard。在Connector中在技术上进行一些封装,把一些通用信息,比如说Connector历史读取信息,跟管理相关的信息都采集到Dashboard里面,提供给客户。
2. 任务并行模式。
DataPipeline在任务并行方面做了一些加强。我们在具体服务客户的时候也遇到这样的问题,需要同步数十张表。在DataPipeline Connector中,我们允许每个Task内部可以定义和维护一个线程池,通过控制线程并发数,并且每个Task允许设置行级别的IO控制。而对于JDBC类型的Task,我们额外允许配置连接池的大小,减少上游和下游资源的开销。
3. 规则引擎。
DataPipeline在基于Kafka Connect做应用时的基本定位是数据集成。数据集成过程中,不应当对数据进行大量的计算,但是又不可避免地要对一些字段进行过滤,所以在产品中我们也在考虑怎样提供一种融合性。
虽然Kafka Connect提供了一个Transformation接口可以与Source Connector和Sink Connector进行协同,对数据进行基本的转换。但这是以Connector为基本单位的,企业客户需要编译后部署到所有集群的节点,并且缺乏良好的可视化动态编译调试环境支持。
基于这种情况,DataPipeline产品提供了两种可视化配置环境:基本编码引擎(Basic Code Engine)和高级编码引擎(Advanced Code Engine)。前者提供包括字段过滤、字段替换和字段忽略等功能,后者基于Groovy可以更加灵活地对数据处理、并且校验处理结果的Schema一致性。对于高级编码引擎,DataPipeline还提供了数据采样和动态调试能力。
4. 错误队列机制。
我们在服务企业客户的过程中也看到,用户源端的数据永远不会很“干净”。不“干净”的数据可能来自几个方面,比如当文件类型数据源中的“脏记录”、规则引擎处理特定数据产生未预期的异常、因为目的端Schema不匹配导致某些值无法写入等各种原因。
面对这些情况,企业客户要么把任务停下来,要么把数据暂存到某处后续再处理。而DataPipeline采取的是第二种方式,通过产品中错误队列预警功能指定面对错误队列的策略,支持预警和中断策略的设置和实施等,比如错误队列达到某个百分比的时候任务会暂停,这样的设置可以保证任务不会因少量异常数据而中断,被完整记录下来的异常数据可以被管理员非常方便地进行追踪、排查和处理。企业客户认为,相比以前通过日志来筛查异常数据,这种错误队列可视化设置功能大大提升管理员的工作效率。
在做数据集成的过程中,确实不应该对原始数据本身做过多的变换和计算。传统ETL方案把数据进行大量的变换之后,虽然会产生比较高效的输出结果,但是当用户业务需求发生变化时,还需要重新建立一个数据管道再进行一次原始数据的传输。这种做法并不适应当前大数据分析的需求。
基于这种考虑,DataPipeline会建议客户先做少量的清洗,尽量保持数据的原貌。但是,这并不是说,我们不重视数据质量。未来的重要工作之一,DataPipeline将基于Kafka Streaming将流式计算用于数据质量管理,它不对数据最终输出的结果负责,而是从业务角度去分析数据在交换过程中是否发生了改变,通过滑动窗口去判断到底数据发生了什么问题,判断条件是是否超出一定比例历史均值的记录数,一旦达到这个条件将进一步触发告警并暂停同步任务。
总结一下,DataPipeline经过不断地努力,很好地解决了企业数据集成过程需要解决异构性、动态性、可伸缩性和容错性等方面的问题;基于Kafka Connect的良好基础支撑构建了成熟的企业级数据集成平台;基于Kafka Connect进行二次封装和扩展,优化了应用Kafka Connect时面临的挑战:包括Schema映射和演进,任务并行策略和全局化管理等。未来,Datapipeline将会基于流式计算进一步加强数据质量管理。