针对于不同场景的消费消息
代码结构如下
- consumerListener
package com.adaspace.kafka.consumer;
import com.adaspace.kafka.handler.HandlerContext;
import com.adaspace.kafka.handler.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 公共kafka消费者监听器,使用策略模式来进行不同场景的消息处理
*
* @Author: Frost
* @Date: 2021/11/12 23:23
*/
@Slf4j
@Component
public class ConsumerListener {
@Resource
private HandlerContext handlerContext;
@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", groupId = "${kafka.listener.group-id}")
public void listen(ConsumerRecord record) {
log.info("监听kafka消息,topic={},partition={},offset={}", record.topic(), record.partition(), record.offset());
String topic = record.topic();
try {
MessageHandler handler = handlerContext.getHandler(topic);
String message = String.valueOf(record.value());
handler.handle(message);
} catch (Exception e) {
log.error("该topic消息策略不存在");
}
}
}
- handler
package com.adaspace.kafka.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author: Frost
* @Date: 2021/11/12 23:24
*/
@Component
public class HandlerContext {
@Autowired
public final Map<string, messagehandler=""> map = new ConcurrentHashMap<>();
/**
* 放入对应的策略
*
* @param map {key: topicName value:MessageHandler}
*/
public HandlerContext(Map<string, messagehandler=""> map) {
map.forEach(this.map::put);
}
/**
* 不同的topic进行MessageHandler的策略获取,通过公共kafka 监听器来触发不同的handler
*
* @param handler
* @return
*/
public MessageHandler getHandler(String handler) {
MessageHandler messageHandler = map.get(handler);
if (messageHandler == null) {
throw new RuntimeException();
}
return messageHandler;
}
}
</string,></string,>
- MessageHandler接口
package com.adaspace.kafka.handler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* @Author: Frost
* @Date: 2021/11/12 23:18
*/
public interface MessageHandler {
/**
* 处理器
* @param message 消费消息体
*/
void handle(String message);
}
- 示例handler 代码
通过Component 将topic 名注入spring,不同topic 进行不同策略实现
package com.adaspace.mp.order.handler;
import com.adaspace.kafka.handler.MessageHandler;
import com.adaspace.mp.order.domain.*;
import com.adaspace.mp.order.dto.event.AIKafkaRespMessage;
import com.adaspace.mp.order.dto.event.KafkaMessageTypeEnum;
import com.adaspace.mp.order.gatewayimpl.impl.OrderGatewayImpl;
import com.adaspace.mp.unispace.api.UnispaceFeignClient;
import com.adaspace.mp.unispace.dto.ProcessResult;
import com.adaspace.mp.unispace.dto.UnispaceLoadDataDto;
import com.alibaba.cola.dto.SingleResponse;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import javax.annotation.Resource;
import java.util.WeakHashMap;
/**
* @Author: Frost
* @Date: 2021/11/12 23:28
*/
@Component("kafkaTest")
@Slf4j
public class AiHandler implements MessageHandler {
/**
* {k:分区,v:offset} (待用)
*/
WeakHashMap<topicpartition, long=""> offsetMap = new WeakHashMap<>();
@Resource
private OrderGatewayImpl orderGatewayImpl;
@Override
public void handle(String message) {
JSON parse = (JSON) JSON.parse(message);
AIKafkaRespMessage aiKafkaRespMessage = JSON.toJavaObject(parse, AIKafkaRespMessage.class);
KafkaMessageTypeEnum kafkaMessageType = aiKafkaRespMessage.getKafkaMessageType();
switch (kafkaMessageType) {
case UNTREATED_TASK:
case AI_END_TASK:
orderGatewayImpl.updateOrderItemData(
aiKafkaRespMessage.getId(),
aiKafkaRespMessage.getStatus(),
aiKafkaRespMessage.getTargetTIFF()
);
Order order = orderGatewayImpl
.getOrder(aiKafkaRespMessage.getOrderId(), AlgorithmReqExtension.class, new OrderItem2Algorithm());
OrderItemData orderItemData =
order.getOrderItem().getOrderItemData()
.stream()
.filter(x -> x.getId()
.equals(aiKafkaRespMessage.getId())).findAny().orElse(null);
Assert.notNull(orderItemData, "接收orderItemData为null");
orderItemData.setStatus(aiKafkaRespMessage.getStatus());
orderItemData.setIsReady(aiKafkaRespMessage.getStatus() == 0);
orderGatewayImpl.fireEvent(order, OrderEventType.SHIPPED, aiKafkaRespMessage.getTargetTIFF());
break;
default:
log.info("消息消费未找到对应处理case");
}
}
}