关于大数据体系,将数据从磁盘上读出后送到kafka在由flume消费的的脚本配置文件。
在脚本配置文件的书写时,在Kafka 集群的地址后面空格加注释后莫名奇妙就会报地址错误的问题,同样在kafka的话题配置后面加上同样的注释也是会出现同样的问题。
以下是Flume的agent1的脚本配置代码:
# 定义代理名称
agent1.sources = spoolSource
agent1.channels = memoryChannel
agent1.sinks = kafkaSink
# 配置 SpoolDirectorySource,从磁盘读取数据
agent1.sources.spoolSource.type = TAILDIR
agent1.sources.spoolSource.filegroups = f1
agent1.sources.spoolSource.filegroups.f1 = /usr/local/bigData/flume/DS/test.txt
agent1.sources.spoolSource.positionFile = /usr/local/bigData/flume/taildir_position.json
# 配置内存 Channel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 10000
agent1.channels.memoryChannel.transactionCapacity = 1000
# 配置 Kafka Sink,将数据发送到 Kafka
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.kafka.topic = topic_ais_signal
agent1.sinks.kafkaSink.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092,localhost:9092
agent1.sinks.kafkaSink.kafka.flumeBatchSize = 1
agent1.sinks.kafkaSink.kafka.producer.acks = all
agent1.sinks.kafkaSink.kafka.producer.max.block.ms = 60000
# 配置源与通道的连接
agent1.sources.spoolSource.channels = memoryChannel
agent1.sinks.kafkaSink.channel = memoryChannel
注意:这里面的磁盘地址使用通配符*代表里面的全部文件也出现了报错问题。同时要注意,每行代码后面最好都不要加注释,不然都会存在报错的风险,而每段代码的上面的那些注释不会出现这种问题。
这一段的Flume配置脚本是从一个文件里面读取内容后传到kafka的。
下面是消费kafka里面信息的agent2的配置脚本:
# 定义代理名称
agent2.sources = kafkaSource
agent2.channels = memoryChannel
agent2.sinks = loggerSink
# 配置 Kafka Source,从 Kafka 消费数据
agent2.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent2.sources.kafkaSource.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
agent2.sources.kafkaSource.kafka.consumer.group.id = console-consumer-22174
agent2.sources.kafkaSource.kafka.topics = topic_ais_signal
# 配置内存 Channel
agent2.channels.memoryChannel.type = memory
agent2.channels.memoryChannel.capacity = 10000
agent2.channels.memoryChannel.transactionCapacity = 1000
# 配置 Sink,将数据输出到控制台(你可以根据需要改为其他 Sink,如 HDFS)
agent2.sinks.loggerSink.type = logger
agent2.sinks.loggerSink.channel = memoryChannel
# 配置源与通道的连接
agent2.sources.kafkaSource.channels = memoryChannel
agent2.sinks.loggerSink.channel = memoryChannel
配置脚本的启动命令:bin/flume-ng agent \
–conf conf \
–conf-file conf/flume-kafka-source.conf \
–name agent1 \
注:后面的name要注意区分不同,同样不同的配置脚本名字的不同也需要注意
要想在这过程中打印出日志信息,或者说处于调试过程可以在后面加上:
-Dflume.root.logger=DEBUG,console
加上这一小段命令之后会在终端控制台打印出相关日志。
注:在消费端加上这个日志打印命令会出现不断地打印日志的情况,很可能会导致你会错过想要得到的运行结果,所以要留意所打印的日志信息,要想跟清楚地得到运行传输的数据可以将数据传到磁盘里面的某个目录文件下进行保存处理,由于本人后面还需对数据处理,接入到MinIO里面,所以这里直接打印到了控制台,也导致了以为没有成功而浪费了很多时间。
在撰写配置脚本的过程可以使用kafka的工具来监测数据的改变以及传输情况:
kafka-console-consumer –bootstrap-server :9092 –topic –from-beginning
通过这个命令,可以直观地观察到信息由磁盘到kafka的一个传输情况,也就能很好地测试第一个agent的配置成功与否的情况。