大数据 flume

关于大数据体系,将数据从磁盘上读出后送到kafka在由flume消费的的脚本配置文件。

在脚本配置文件的书写时,在Kafka 集群的地址后面空格加注释后莫名奇妙就会报地址错误的问题,同样在kafka的话题配置后面加上同样的注释也是会出现同样的问题。

以下是Flume的agent1的脚本配置代码:

# 定义代理名称
agent1.sources = spoolSource
agent1.channels = memoryChannel
agent1.sinks = kafkaSink

# 配置 SpoolDirectorySource,从磁盘读取数据
agent1.sources.spoolSource.type = TAILDIR
agent1.sources.spoolSource.filegroups = f1
agent1.sources.spoolSource.filegroups.f1 = /usr/local/bigData/flume/DS/test.txt
agent1.sources.spoolSource.positionFile = /usr/local/bigData/flume/taildir_position.json

# 配置内存 Channel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 10000
agent1.channels.memoryChannel.transactionCapacity = 1000

# 配置 Kafka Sink,将数据发送到 Kafka
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.kafka.topic = topic_ais_signal
agent1.sinks.kafkaSink.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092,localhost:9092
agent1.sinks.kafkaSink.kafka.flumeBatchSize = 1
agent1.sinks.kafkaSink.kafka.producer.acks = all
agent1.sinks.kafkaSink.kafka.producer.max.block.ms = 60000

# 配置源与通道的连接
agent1.sources.spoolSource.channels = memoryChannel
agent1.sinks.kafkaSink.channel = memoryChannel

注意:这里面的磁盘地址使用通配符*代表里面的全部文件也出现了报错问题。同时要注意,每行代码后面最好都不要加注释,不然都会存在报错的风险,而每段代码的上面的那些注释不会出现这种问题。

这一段的Flume配置脚本是从一个文件里面读取内容后传到kafka的。

下面是消费kafka里面信息的agent2的配置脚本:

# 定义代理名称
agent2.sources = kafkaSource
agent2.channels = memoryChannel
agent2.sinks = loggerSink

# 配置 Kafka Source,从 Kafka 消费数据
agent2.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent2.sources.kafkaSource.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
agent2.sources.kafkaSource.kafka.consumer.group.id = console-consumer-22174
agent2.sources.kafkaSource.kafka.topics = topic_ais_signal

# 配置内存 Channel
agent2.channels.memoryChannel.type = memory
agent2.channels.memoryChannel.capacity = 10000
agent2.channels.memoryChannel.transactionCapacity = 1000

# 配置 Sink,将数据输出到控制台(你可以根据需要改为其他 Sink,如 HDFS)
agent2.sinks.loggerSink.type = logger
agent2.sinks.loggerSink.channel = memoryChannel

# 配置源与通道的连接
agent2.sources.kafkaSource.channels = memoryChannel
agent2.sinks.loggerSink.channel = memoryChannel

配置脚本的启动命令:bin/flume-ng agent \
  –conf conf \
  –conf-file conf/flume-kafka-source.conf \
  –name agent1 \
 注:后面的name要注意区分不同,同样不同的配置脚本名字的不同也需要注意

要想在这过程中打印出日志信息,或者说处于调试过程可以在后面加上:
  -Dflume.root.logger=DEBUG,console

加上这一小段命令之后会在终端控制台打印出相关日志。

注:在消费端加上这个日志打印命令会出现不断地打印日志的情况,很可能会导致你会错过想要得到的运行结果,所以要留意所打印的日志信息,要想跟清楚地得到运行传输的数据可以将数据传到磁盘里面的某个目录文件下进行保存处理,由于本人后面还需对数据处理,接入到MinIO里面,所以这里直接打印到了控制台,也导致了以为没有成功而浪费了很多时间。

在撰写配置脚本的过程可以使用kafka的工具来监测数据的改变以及传输情况:

kafka-console-consumer –bootstrap-server :9092 –topic –from-beginning

通过这个命令,可以直观地观察到信息由磁盘到kafka的一个传输情况,也就能很好地测试第一个agent的配置成功与否的情况。

版权声明

   站内部分内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供网络资源分享服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请 联系我们 一经核实,立即删除。并对发布账号进行永久封禁处理。在为用户提供最好的产品同时,保证优秀的服务质量。


本站仅提供信息存储空间,不拥有所有权,不承担相关法律责任。

给TA打赏
共{{data.count}}人
人已打赏
大数据

es-elasticsearch 一些记录

2025-2-25 15:19:17

大数据

kafka为什么吞吐量大(kafuka和rocketmq)

2025-2-25 15:19:19

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索