Kafka策略模式

公共kafka工具模块

针对于不同场景的消费消息

代码结构如下


  • consumerListener
package com.adaspace.kafka.consumer;

import com.adaspace.kafka.handler.HandlerContext;
import com.adaspace.kafka.handler.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 公共kafka消费者监听器,使用策略模式来进行不同场景的消息处理
 *
 * @Author: Frost
 * @Date: 2021/11/12 23:23
 */
@Slf4j
@Component
public class ConsumerListener {

    @Resource
    private HandlerContext handlerContext;

    @KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", groupId = "${kafka.listener.group-id}")
    public void listen(ConsumerRecord record) {
        log.info("监听kafka消息,topic={},partition={},offset={}", record.topic(), record.partition(), record.offset());
        String topic = record.topic();
        try {
            MessageHandler handler = handlerContext.getHandler(topic);
            String message = String.valueOf(record.value());
            handler.handle(message);
        } catch (Exception e) {
            log.error("该topic消息策略不存在");
        }


    }
}


  • handler
package com.adaspace.kafka.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: Frost
 * @Date: 2021/11/12 23:24
 */
@Component
public class HandlerContext {

    @Autowired
    public final Map<string, messagehandler=""> map = new ConcurrentHashMap&lt;&gt;();

    /**
     * 放入对应的策略
     *
     * @param map {key: topicName value:MessageHandler}
     */
    public HandlerContext(Map<string, messagehandler=""> map) {
        map.forEach(this.map::put);
    }


    /**
     * 不同的topic进行MessageHandler的策略获取,通过公共kafka 监听器来触发不同的handler
     *
     * @param handler
     * @return
     */
    public MessageHandler getHandler(String handler) {
        MessageHandler messageHandler = map.get(handler);
        if (messageHandler == null) {
            throw new RuntimeException();
        }
        return messageHandler;
    }
}

</string,></string,>
  • MessageHandler接口
package com.adaspace.kafka.handler;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * @Author: Frost
 * @Date: 2021/11/12 23:18
 */
public interface MessageHandler {

    /**
     *  处理器
     * @param message 消费消息体
     */
    void handle(String message);
}

  • 示例handler 代码

通过Component 将topic 名注入spring,不同topic 进行不同策略实现

package com.adaspace.mp.order.handler;

import com.adaspace.kafka.handler.MessageHandler;
import com.adaspace.mp.order.domain.*;
import com.adaspace.mp.order.dto.event.AIKafkaRespMessage;
import com.adaspace.mp.order.dto.event.KafkaMessageTypeEnum;
import com.adaspace.mp.order.gatewayimpl.impl.OrderGatewayImpl;
import com.adaspace.mp.unispace.api.UnispaceFeignClient;
import com.adaspace.mp.unispace.dto.ProcessResult;
import com.adaspace.mp.unispace.dto.UnispaceLoadDataDto;
import com.alibaba.cola.dto.SingleResponse;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.util.WeakHashMap;


/**
 * @Author: Frost
 * @Date: 2021/11/12 23:28
 */

@Component("kafkaTest")
@Slf4j
public class AiHandler implements MessageHandler {
    /**
     * {k:分区,v:offset} (待用)
     */
    WeakHashMap<topicpartition, long=""> offsetMap = new WeakHashMap&lt;&gt;();

    @Resource
    private OrderGatewayImpl orderGatewayImpl;

    @Override
    public void handle(String message) {
        JSON parse = (JSON) JSON.parse(message);
        AIKafkaRespMessage aiKafkaRespMessage = JSON.toJavaObject(parse, AIKafkaRespMessage.class);
        KafkaMessageTypeEnum kafkaMessageType = aiKafkaRespMessage.getKafkaMessageType();
        switch (kafkaMessageType) {
            case UNTREATED_TASK:
            case AI_END_TASK:
                orderGatewayImpl.updateOrderItemData(
                        aiKafkaRespMessage.getId(),
                        aiKafkaRespMessage.getStatus(),
                        aiKafkaRespMessage.getTargetTIFF()
                );
                Order order = orderGatewayImpl
                        .getOrder(aiKafkaRespMessage.getOrderId(), AlgorithmReqExtension.class, new OrderItem2Algorithm());
                OrderItemData orderItemData =
                        order.getOrderItem().getOrderItemData()
                                .stream()
                                .filter(x -&gt; x.getId()
                                        .equals(aiKafkaRespMessage.getId())).findAny().orElse(null);
                Assert.notNull(orderItemData, "接收orderItemData为null");
                orderItemData.setStatus(aiKafkaRespMessage.getStatus());
                orderItemData.setIsReady(aiKafkaRespMessage.getStatus() == 0);
                orderGatewayImpl.fireEvent(order, OrderEventType.SHIPPED, aiKafkaRespMessage.getTargetTIFF());
                break;
            default:
                log.info("消息消费未找到对应处理case");
        }
    }
}
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇