SpringBoot集成Kafka实例,简单实现producer和consumer

JAVA学习网 2018-11-01 21:48:04

1、使用IDEA新建工程,创建工程 springboot-kafka-producer

工程pom.xml文件添加如下依赖:

<!-- 添加 kafka 依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- 添加 gson 依赖 -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.5</version>
</dependency>
<!-- 添加 lombok 依赖 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.22</version>
    <scope>provided</scope>
</dependency>

3、创建kafka配置类,KafkaProducerConfig

 1 package com.miniooc.kafka.producer;
 2 
 3 import org.apache.kafka.clients.producer.ProducerConfig;
 4 import org.apache.kafka.common.serialization.StringSerializer;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.Configuration;
 8 import org.springframework.kafka.annotation.EnableKafka;
 9 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.kafka.core.ProducerFactory;
12 
13 import java.util.HashMap;
14 import java.util.Map;
15 
16 /**
17  * kafka模板配置类
18  *
19  * @author 宋陆
20  * @version 1.0.0
21  */
22 @EnableKafka
23 @Configuration
24 public class KafkaProducerConfig {
25 
26     @Value("${kafka.bootstrap.servers}")
27     private String BOOTSTRAP_SERVERS_CONFIG;
28 
29     @Bean
30     public KafkaTemplate<String, String> kafkaTemplate() {
31         return new KafkaTemplate<>(producerFactory());
32     }
33 
34     public ProducerFactory<String, String> producerFactory() {
35         Map<String, Object> props = new HashMap<>();
36         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
37         props.put(ProducerConfig.RETRIES_CONFIG, 0);
38         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
39         props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
40         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
41         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
42         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
43         return new DefaultKafkaProducerFactory<>(props);
44     }
45 
46 }

4、创建kafka生产类,KafkaProducer

 1 package com.miniooc.kafka.producer;
 2 
 3 import com.google.gson.Gson;
 4 import com.google.gson.GsonBuilder;
 5 import com.miniooc.kafka.message.MessageBean;
 6 import lombok.extern.java.Log;
 7 import org.springframework.beans.factory.annotation.Value;
 8 import org.springframework.kafka.core.KafkaTemplate;
 9 import org.springframework.stereotype.Component;
10 
11 import javax.annotation.Resource;
12 
13 /**
14  * Kafka消息生产类
15  *
16  * @author 宋陆
17  * @version 1.0.0
18  */
19 @Log
20 @Component
21 public class KafkaProducer {
22 
23     @Resource
24     private KafkaTemplate<String, String> kafkaTemplate;
25 
26     @Value("${kafka.topic.order}")
27     private String topicOrder;
28 
29     /**
30      * 发送消息
31      *
32      * @param messageBean 消息实例
33      */
34     public void sendMessage(MessageBean messageBean) {
35         GsonBuilder builder = new GsonBuilder();
36         builder.setPrettyPrinting();
37         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
38         Gson gson = builder.create();
39         // 将消息实例序列化为json格式的字符串
40         String message = gson.toJson(messageBean);
41         // 发送消息
42         kafkaTemplate.send(topicOrder, message);
43         // 打印消息
44         log.info("\nminiooc send message:\n" + message);
45     }
46 }

5、创建消息实体类,MessageBean

 1 package com.miniooc.kafka.message;
 2 
 3 import lombok.Data;
 4 
 5 import java.io.Serializable;
 6 import java.util.Date;
 7 
 8 /**
 9  * 消息实体类
10  *
11  * @author 宋陆
12  * @version 1.0.0
13  */
14 @Data
15 public class MessageBean implements Serializable {
16 
17     /** uuid */
18     private String uuid;
19 
20     /** 时间  */
21     private Date date;
22 
23 }

6、创建消息控制器类,MessageController,主要是方便测试,非必须。也可以用其他方式创建消息实例

 1 package com.miniooc.kafka.controller;
 2 
 3 import com.miniooc.kafka.message.MessageBean;
 4 import com.miniooc.kafka.producer.KafkaProducer;
 5 import lombok.extern.java.Log;
 6 import org.springframework.stereotype.Controller;
 7 import org.springframework.web.bind.annotation.RequestMapping;
 8 import org.springframework.web.bind.annotation.ResponseBody;
 9 
10 import javax.annotation.Resource;
11 import java.util.Date;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.UUID;
15 
16 /**
17  * 消息控制器
18  *
19  * @author 宋陆
20  * @version 1.0.0
21  */
22 @Log
23 @Controller
24 @RequestMapping("/message")
25 public class MessageController {
26 
27     @Resource
28     private KafkaProducer kafkaProducer;
29 
30     /**
31      * 生成消息
32      *
33      * @return
34      */
35     @RequestMapping("create")
36     @ResponseBody
37     public Map<String, Object> create() {
38         // 创建消息
39         MessageBean messageBean = new MessageBean();
40         String uuid = UUID.randomUUID().toString();
41         messageBean.setUuid(uuid);
42         messageBean.setDate(new Date());
43         // 将消息发送到 kafka
44         kafkaProducer.sendMessage(messageBean);
45         Map<String, Object> model = new HashMap<>();
46         // 返回成功信息
47         model.put("resultCode", 1);
48         model.put("resultMsg", "success");
49         model.put("messageBean", messageBean);
50         return model;
51     }
52 
53 }

7、编辑资源文件

server.port=9526
spring.application.name=kafka-producer
kafka.bootstrap.servers=192.168.88.202:9092
kafka.topic.order=topic-order
kafka.group.id=group-order

8、启动工程,在浏览器中访问:http://localhost:9526/message/create

工程控制台看到红框所示内容,说明消息发送成功!

 

9、使用IDEA新建工程,创建工程 springboot-kafka-consumer

工程pom.xml文件添加如下依赖:

<!-- 添加 kafka 依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- 添加 gson 依赖 -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.5</version>
</dependency>
<!-- 添加 lombok 依赖 -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.22</version>
    <scope>provided</scope>
</dependency>

10、创建kafka配置类,KafkaConsumerConfig

 1 package com.miniooc.kafka.consumer;
 2 
 3 import org.apache.kafka.clients.consumer.ConsumerConfig;
 4 import org.apache.kafka.common.serialization.StringDeserializer;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.context.annotation.Bean;
 7 import org.springframework.context.annotation.Configuration;
 8 import org.springframework.kafka.annotation.EnableKafka;
 9 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
10 import org.springframework.kafka.config.KafkaListenerContainerFactory;
11 import org.springframework.kafka.core.ConsumerFactory;
12 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
13 import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
14 
15 import java.util.HashMap;
16 import java.util.Map;
17 
18 /**
19  * kafka配置类
20  *
21  * @author 宋陆
22  * @version 1.0.0
23  */
24 @EnableKafka
25 @Configuration
26 public class KafkaConsumerConfig {
27     @Value("${kafka.bootstrap.servers}")
28     private String BOOTSTRAP_SERVERS_CONFIG;
29 
30     @Value("${kafka.group.id}")
31     private String GROUP_ID_CONFIG;
32 
33     @Bean
34     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
35         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
36         factory.setConsumerFactory(consumerFactory());
37         factory.setConcurrency(10);
38         factory.getContainerProperties().setPollTimeout(3000);
39         return factory;
40     }
41 
42     public ConsumerFactory<String, String> consumerFactory() {
43         Map<String, Object> propsMap = new HashMap<>();
44         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
45         propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交
46         propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
47         propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
48         propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
49         propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
50         propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
51         propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
52         propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
53         return new DefaultKafkaConsumerFactory<>(propsMap);
54     }
55 }

11、创建kafka消费类,KafkaConsumer

 1 package com.miniooc.kafka.consumer;
 2 
 3 import com.google.gson.Gson;
 4 import com.google.gson.GsonBuilder;
 5 import com.google.gson.reflect.TypeToken;
 6 import com.miniooc.kafka.message.MessageBean;
 7 import lombok.extern.java.Log;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11 
12 /**
13  * Kafka消息消费类
14  *
15  * @author 宋陆
16  * @version 1.0.0
17  */
18 @Log
19 @Component
20 public class KafkaConsumer {
21 
22     @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
23     public void consume(@Payload String message) {
24         GsonBuilder builder = new GsonBuilder();
25         builder.setPrettyPrinting();
26         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
27         Gson gson = builder.create();
28         // 将接收到的消息反序列化消息实例
29         MessageBean messageBean = gson.fromJson(message, new TypeToken<MessageBean>() {
30         }.getType());
31         // 将消息实例序列化为json格式的字符串
32         String json = gson.toJson(messageBean);
33         // 打印消息
34         log.info("\nminiooc receive message:\n" + json);
35     }
36 }

12、创建消息实体类,MessageBean

package com.miniooc.kafka.message;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
 * 消息实体类
 *
 * @author 宋陆
 * @version 1.0.0
 */
@Data
public class MessageBean implements Serializable {

    /** uuid */
    private String uuid;

    /** 时间  */
    private Date date;

}

12、编辑资源文件

server.port=9527
spring.application.name=kafka-consumer
kafka.bootstrap.servers=192.168.88.202:9092
kafka.topic.order=topic-order
kafka.group.id=group-order

13、启动工程

 

工程控制台看到红框所示内容,说明消息接收消费成功!

 

SpringBoot Kafka 整合完成! 

 

 

 

 

有需要源码的,私信我

微信号: songlu2011

QQ号: 13637818

阅读(2042) 评论(0)