1、到官网查询所在版本的依赖,导入pom.xml(在此用Flink1.13) 官网->教程->connectors->datastream->kafka
网址:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.6</version>
</dependency>
2.在此页面找到Kafka source 示例代码,将此代码填充至类中并将其具体参数修改即可
//如果方法在返回值的位置声明了泛型,此时在调用这个方法时,需要在方法名前补充泛型KafkaSource.<String>builder()
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092") //集群地址,写一个也行,多个也行
.setTopics("first")//消费的主题
.setGroupId("my-group")//消费者组id
/*设置起始偏移量有以下几种情况:
1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
5.新的组,从来没有提交过,再指定一个消费方式: OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
*/
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//设置起始偏移量,也就是从哪里消费
//由于大多数情况下key列没有值,所以只设置value的反序列化器即可
.setValueOnlyDeserializer(new SimpleStringSchema()) //消费必须设置的Key-value的反序列化器
.build();
//用设置好的组件获取source
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
需注意!
1、flink的kafkaSource默认是把消费的offsets提交到当前Task的状态中,并不会主动提交到kafka的——consumer_offsets中
所以,上述代码无论运行多少次消费的都是一样的内容,想要达到这次消费起始位置是上次消费的最后一条的情况
需要手动设置,把offsets提交到kafka一份
//设置额外的消费者参数
.setProperty("enable.auto.commit","true")//允许consumer自动提交offsets
.setProperty("auto.commit.interval.ms","1000")//每次提交的时间间隔
2、Job重启时,如果开启了Checjpoint,默认从哪Checkpoint中获取之前提交的offsets
原文地址:http://www.cnblogs.com/CYan521/p/16816514.html
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,请务用于商业用途!
3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员!
8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载
声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性