`
kane_xie
  • 浏览: 143051 次
社区版块
存档分类
最新评论

Kafka Connect

阅读更多

Kafka Connect是Kafka0.9新增的模块。可以从名称看出,它可以和外部系统、数据集建立一个数据流的连接,实现数据的输入、输出。有以下特性:

 

  • 使用了一个通用的框架,可以在这个框架上非常方面的开发、管理Kafka Connect接口
  • 支持分布式模式或单机模式进行运行
  • 支持REST接口,可以通过REST API提交、管理 Kafka Connect集群
  • offset自动管理

 

在大部分Kafka应用场景中,我们常常需要从某一数据源导入数据到Kafka中,或者将Kafka中的数据导出(准确说是被取出)到其他系统中。为了实现这一功能,我们一般需要在上游系统中创建一个Kafka Producer,或在下游系统中创建Kafka Consumer。

 

在Kafka0.9中新增的Kafka Connect模块实现了以上功能,这样我们就可以省掉了一些通用交互代码,而只需要做简单的配置就行了。这和Logstash的思路很相似,Kafka提供了connector的接口,通过实现该接口来,我们可以创建各种各样的Input和Output插件。当然Kafka Connect才刚刚推出,插件远没有Logstash丰富,不过相信随着Kafka0.9的普及,这一功能将变得更加实用。

 

以下介绍Kafka Connect的一个简单的实现,FileInput和FileOutput,启动方式如下:

 
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

 

执行connect-standalone.sh会在本地单机启动connector(s)。我们必须声明至少两个参数,第一个是Kafka Connect的配置文件,包含一些基本配置例如kafka server地址,序列化格式等等;其他的配置文件用于配置connector(s),每个配置文件创建一个connector。我们来看一下这两个配置文件:

 

connect-file-source.properties 
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/test-src.txt
topic=test

 

该配置会启动一个名叫local-file-source的connector,这个connector会从/tmp/test-src.txt中读取数据,然后写入名叫test的topic中。

 

connect-file-sink.properties 
name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test-dest.txt
topics=test

 

该配置会启动一个名叫local-file-sink的connector,这个connector会从Kafka的test topic中读取数据,然后写入/tmp/test-dest.tx文件中。

 

所以我们看到,运行以上命令实际上是启动了一个程序不断地读取/tmp/test-src.txt,并通过Kafka Producer发送到Kafka 的test这个topic中,同时启动一个Kafka Consumer从这个topic中读取数据并写入/tmp/test-dest.txt,启动之后我们可以测试一下。

 
echo "hello world" >> /tmp/test-src.txt

 

我们往/tmp/test-src.txt中写入几个字符,然后我们查看/tmp/test-dest.txt,发现“hello world”已经被写入/tmp/test-dest.txt中。

 

多说一句,数据并不是以文本的形式存入Kafka,而是Json。它的格式类似于:

 
{"schema":{"type":"string","optional":false},"payload":"hello world"}

 

所以本文中两个connector实际上还做了数据的封装和Json解析的工作。

 

 

 

1
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics