基于SpringBoot 使用 Flink 收发Kafka消息()

  本篇文章为你整理了基于SpringBoot 使用 Flink 收发Kafka消息()的详细内容,包含有 基于SpringBoot 使用 Flink 收发Kafka消息,希望能帮助你了解 基于SpringBoot 使用 Flink 收发Kafka消息。

  这周学习下Flink相关的知识,学习到一个读写Kafka消息的示例, 自己动手实践了一下,别人示例使用的是普通的Java Main方法,没有用到spring boot. 我们在实际工作中会使用spring boot。 因此我做了些加强, 把流程打通了,过程记录下来。

  首先我们通过docker安装一个kafka服务,参照Kafka的官方指导文档
 

  https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html
 

  主要的是有个docker-compose.yml文件

  

---

 

  version: 2

  services:

   zookeeper:

   image: confluentinc/cp-zookeeper:7.3.0

   hostname: zookeeper

   container_name: zookeeper

   ports:

   - "2181:2181"

   environment:

   ZOOKEEPER_CLIENT_PORT: 2181

   ZOOKEEPER_TICK_TIME: 2000

   broker:

   image: confluentinc/cp-kafka:7.3.0

   hostname: broker

   container_name: broker

   depends_on:

   - zookeeper

   ports:

   - "29092:29092"

   environment:

   KAFKA_BROKER_ID: 1

   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

   KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

   KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092

   KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

   KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  
就可以把kafka docker 环境搭起来,
 

  使用以下命令,创建一个flink.kafka.streaming.source的topic
 

  docker exec -t broker kafka-topics --create --topic flink.kafka.streaming.source --bootstrap-server broker:9092
 

  然后使用命令,就可以进入到kafka机器的命令行
 

  docker exec -it broker bash
 

  官方文档示例中没有-it, 运行后没有进入broker的命令行,加上来才可以。这里说明下

  Flink我们打算直接采用开发工具运行,暂时未搭环境,以体验为主。

  首先需要引入的包POM文件

  

 properties 

 

   jdk.version 1.8 /jdk.version

   maven.compiler.source 8 /maven.compiler.source

   maven.compiler.target 8 /maven.compiler.target

   project.build.sourceEncoding UTF-8 /project.build.sourceEncoding

   spring-boot.version 2.7.7 /spring-boot.version

   flink.version 1.16.0 /flink.version

   /properties

   dependencyManagement

   dependencies

   dependency

   groupId org.springframework.boot /groupId

   artifactId spring-boot-dependencies /artifactId

   version ${spring-boot.version} /version

   type pom /type

   scope import /scope

   /dependency

   /dependencies

   /dependencyManagement

   dependencies

   dependency

   groupId org.springframework.boot /groupId

   artifactId spring-boot-starter /artifactId

   /dependency

   dependency

   groupId org.projectlombok /groupId

   artifactId lombok /artifactId

   optional true /optional

   /dependency

   dependency

   groupId org.apache.flink /groupId

   artifactId flink-java /artifactId

   version ${flink.version} /version

   scope provided /scope

   /dependency

   dependency

   groupId org.apache.flink /groupId

   artifactId flink-clients /artifactId

   version ${flink.version} /version

   scope provided /scope

   /dependency

   dependency

   groupId org.apache.flink /groupId

   artifactId flink-streaming-java /artifactId

   version ${flink.version} /version

   scope provided /scope

   /dependency

   dependency

   groupId org.apache.flink /groupId

   artifactId flink-connector-kafka /artifactId

   version ${flink.version} /version

   scope provided /scope

   /dependency

   /dependencies

  

 

  这里我们使用Java8, 本来想使用Spring Boot 3的,但是Spring Boot 3 最低需要Java17了, 目前Flink支持Java8和Java11,所以我们使用Spring Boot 2, Java 8来开发。

  spring-boot-starter 我们就一个命令行程序,所以用这个就够了
 

  lombok 用来定义model
 

  flink-java, flink-clients, flink-streaming-java 是使用基本组件, 缺少flink-clients编译阶段不会报错,运行的时候会报java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
 

  flink-connector-kafka 是连接kafka用
 

  我们这里把provided, 打包的时候不用打包flink相关组件,由运行环境提供。但是IDEA运行的时候会报java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema,
 

  在运行的configuration上面勾选上“add dependencies with provided scope to classpath”可以解决这个问题。

  

@Component

 

  @Slf4j

  public class KafkaRunner implements ApplicationRunner

   @Override

   public void run(ApplicationArguments args) throws Exception {

   try{

   /****************************************************************************

   * Setup Flink environment.

   ****************************************************************************/

   // Set up the streaming execution environment

   final StreamExecutionEnvironment streamEnv

   = StreamExecutionEnvironment.getExecutionEnvironment();

   /****************************************************************************

   * Read Kafka Topic Stream into a DataStream.

   ****************************************************************************/

   //Set connection properties to Kafka Cluster

   Properties properties = new Properties();

   properties.setProperty("bootstrap.servers", "localhost:29092");

   properties.setProperty("group.id", "flink.learn.realtime");

   //Setup a Kafka Consumer on Flnk

   FlinkKafkaConsumer String kafkaConsumer =

   new FlinkKafkaConsumer

   ("flink.kafka.streaming.source", //topic

   new SimpleStringSchema(), //Schema for data

   properties); //connection properties

   //Setup to receive only new messages

   kafkaConsumer.setStartFromLatest();

   //Create the data stream

   DataStream String auditTrailStr = streamEnv

   .addSource(kafkaConsumer);

   //Convert each record to an Object

   DataStream Tuple2 String, Integer userCounts

   = auditTrailStr

   .map(new MapFunction String,Tuple2 String,Integer () {

   @Override

   public Tuple2 String,Integer map(String auditStr) {

   System.out.println("--- Received Record : " + auditStr);

   AuditTrail at = new AuditTrail(auditStr);

   return new Tuple2 String,Integer (at.getUser(),at.getDuration());

   .keyBy(0) //By user name

   .reduce((x,y) - new Tuple2 String,Integer ( x.f0, x.f1 + y.f1));

   //Print User and Durations.

   userCounts.print();

   /****************************************************************************

   * Setup data source and execute the Flink pipeline

   ****************************************************************************/

   //Start the Kafka Stream generator on a separate thread

   System.out.println("Starting Kafka Data Generator...");

   Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());

   kafkaThread.start();

   // execute the streaming pipeline

   streamEnv.execute("Flink Windowing Example");

   catch(Exception e) {

   e.printStackTrace();

  

 

  简单说明下程序
 

  DataStream auditTrailStr = streamEnv
 

  .addSource(kafkaConsumer);
 

  就是接通了Kafka Source

  

 Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());

 

   kafkaThread.start();

  

 

  这段代码是另外开一个线程往kafka里面去发送文本消息
 

  我们在这个示例中就是一个线程发,然后flink就读出来,然后统计出每个用户的操作时间。
 

  auditTrailStr.map 就是来进行统计操作。

  可以看到Kafka一边发送,然后我们就一边读出来,然后就统计出了每个用户的时间。

  本文只是简单的打通了几个环节,对于flink的知识没有涉及太多,算是一个环境入门。后面学习更多的以后我们再深入些来记录flink. 示例代码会放到 https://github.com/dengkun39/redisdemo.git spring-boot-flink 文件夹。

  以上就是基于SpringBoot 使用 Flink 收发Kafka消息()的详细内容,想要了解更多 基于SpringBoot 使用 Flink 收发Kafka消息的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: