{"id":562,"date":"2021-12-03T15:57:07","date_gmt":"2021-12-03T07:57:07","guid":{"rendered":"https:\/\/blog.frost-s.tk\/?p=562"},"modified":"2021-12-16T22:35:25","modified_gmt":"2021-12-16T14:35:25","slug":"kafka%e7%ad%96%e7%95%a5%e6%a8%a1%e5%bc%8f","status":"publish","type":"post","link":"https:\/\/blog.frost-s.com\/index.php\/2021\/12\/03\/kafka%e7%ad%96%e7%95%a5%e6%a8%a1%e5%bc%8f\/","title":{"rendered":"Kafka\u7b56\u7565\u6a21\u5f0f"},"content":{"rendered":"\n<figure class=\"wp-block-pullquote\"><blockquote><p>\u516c\u5171kafka\u5de5\u5177\u6a21\u5757<\/p><\/blockquote><\/figure>\n\n\n\n<h2 class=\"wp-block-heading\">\u9488\u5bf9\u4e8e\u4e0d\u540c\u573a\u666f\u7684\u6d88\u8d39\u6d88\u606f<\/h2>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p><strong><span class=\"has-inline-color has-luminous-vivid-amber-color\">\u4ee3\u7801\u7ed3\u6784\u5982\u4e0b<\/span><\/strong><\/p><cite> <\/cite><\/blockquote>\n\n\n\n<figure class=\"wp-block-image size-large\"><a href=\"https:\/\/blog.frost-s.com\/wp-content\/uploads\/2021\/12\/image.png\"><div class='fancybox-wrapper lazyload-container-unload' data-fancybox='post-images' href='https:\/\/blog.frost-s.com\/wp-content\/uploads\/2021\/12\/image-386x483.png'><img class=\"lazyload lazyload-style-2\" src=\"data:image\/svg+xml;base64,PCEtLUFyZ29uTG9hZGluZy0tPgo8c3ZnIHdpZHRoPSIxIiBoZWlnaHQ9IjEiIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyIgc3Ryb2tlPSIjZmZmZmZmMDAiPjxnPjwvZz4KPC9zdmc+\"  loading=\"lazy\" decoding=\"async\" width=\"386\" height=\"483\" data-original=\"https:\/\/blog.frost-s.com\/wp-content\/uploads\/2021\/12\/image-386x483.png\" src=\"data:image\/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAAAXNSR0IArs4c6QAAAARnQU1BAACxjwv8YQUAAAAJcEhZcwAADsQAAA7EAZUrDhsAAAANSURBVBhXYzh8+PB\/AAffA0nNPuCLAAAAAElFTkSuQmCC\" alt=\"\" class=\"wp-image-615\"\/><\/div><\/a><\/figure>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<ul><li><strong>consumerListener<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>package com.adaspace.kafka.consumer;\n\nimport com.adaspace.kafka.handler.HandlerContext;\nimport com.adaspace.kafka.handler.MessageHandler;\nimport lombok.extern.slf4j.Slf4j;\nimport org.apache.kafka.clients.consumer.ConsumerRecord;\nimport org.springframework.kafka.annotation.KafkaListener;\nimport org.springframework.stereotype.Component;\n\nimport javax.annotation.Resource;\n\n\/**\n * \u516c\u5171kafka\u6d88\u8d39\u8005\u76d1\u542c\u5668\uff0c\u4f7f\u7528\u7b56\u7565\u6a21\u5f0f\u6765\u8fdb\u884c\u4e0d\u540c\u573a\u666f\u7684\u6d88\u606f\u5904\u7406\n *\n * @Author: Frost\n * @Date: 2021\/11\/12 23:23\n *\/\n@Slf4j\n@Component\npublic class ConsumerListener {\n\n    @Resource\n    private HandlerContext handlerContext;\n\n    @KafkaListener(topics = \"#{'${kafka.listener.topics}'.split(',')}\", groupId = \"${kafka.listener.group-id}\")\n    public void listen(ConsumerRecord<!--?, ?--> record) {\n        log.info(\"\u76d1\u542ckafka\u6d88\u606f,topic={},partition={},offset={}\", record.topic(), record.partition(), record.offset());\n        String topic = record.topic();\n        try {\n            MessageHandler handler = handlerContext.getHandler(topic);\n            String message = String.valueOf(record.value());\n            handler.handle(message);\n        } catch (Exception e) {\n            log.error(\"\u8be5topic\u6d88\u606f\u7b56\u7565\u4e0d\u5b58\u5728\");\n        }\n\n\n    }\n}\n\n<\/code><\/pre>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<ul><li><strong>handler<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>package com.adaspace.kafka.handler;\n\nimport org.springframework.beans.factory.annotation.Autowired;\nimport org.springframework.stereotype.Component;\n\nimport java.util.Map;\nimport java.util.concurrent.ConcurrentHashMap;\n\n\/**\n * @Author: Frost\n * @Date: 2021\/11\/12 23:24\n *\/\n@Component\npublic class HandlerContext {\n\n    @Autowired\n    public final Map&lt;string, messagehandler=\"\"> map = new ConcurrentHashMap&amp;lt;&amp;gt;();\n\n    \/**\n     * \u653e\u5165\u5bf9\u5e94\u7684\u7b56\u7565\n     *\n     * @param map {key: topicName value\uff1aMessageHandler}\n     *\/\n    public HandlerContext(Map&lt;string, messagehandler=\"\"> map) {\n        map.forEach(this.map::put);\n    }\n\n\n    \/**\n     * \u4e0d\u540c\u7684topic\u8fdb\u884cMessageHandler\u7684\u7b56\u7565\u83b7\u53d6\uff0c\u901a\u8fc7\u516c\u5171kafka \u76d1\u542c\u5668\u6765\u89e6\u53d1\u4e0d\u540c\u7684handler\n     *\n     * @param handler\n     * @return\n     *\/\n    public MessageHandler getHandler(String handler) {\n        MessageHandler messageHandler = map.get(handler);\n        if (messageHandler == null) {\n            throw new RuntimeException();\n        }\n        return messageHandler;\n    }\n}\n\n&lt;\/string,>&lt;\/string,><\/code><\/pre>\n\n\n\n<ul><li><strong>MessageHandler\u63a5\u53e3<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>package com.adaspace.kafka.handler;\n\nimport org.apache.kafka.clients.consumer.ConsumerRecord;\n\n\/**\n * @Author: Frost\n * @Date: 2021\/11\/12 23:18\n *\/\npublic interface MessageHandler {\n\n    \/**\n     *  \u5904\u7406\u5668\n     * @param message \u6d88\u8d39\u6d88\u606f\u4f53\n     *\/\n    void handle(String message);\n}\n<\/code><\/pre>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<ul><li><strong>\u793a\u4f8bhandler \u4ee3\u7801<\/strong><\/li><\/ul>\n\n\n\n<p class=\"wp-block-coblocks-highlight\"><mark class=\"wp-block-coblocks-highlight__content\">\u901a\u8fc7Component \u5c06topic \u540d\u6ce8\u5165spring\uff0c\u4e0d\u540ctopic \u8fdb\u884c\u4e0d\u540c\u7b56\u7565\u5b9e\u73b0<\/mark><\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>package com.adaspace.mp.order.handler;\n\nimport com.adaspace.kafka.handler.MessageHandler;\nimport com.adaspace.mp.order.domain.*;\nimport com.adaspace.mp.order.dto.event.AIKafkaRespMessage;\nimport com.adaspace.mp.order.dto.event.KafkaMessageTypeEnum;\nimport com.adaspace.mp.order.gatewayimpl.impl.OrderGatewayImpl;\nimport com.adaspace.mp.unispace.api.UnispaceFeignClient;\nimport com.adaspace.mp.unispace.dto.ProcessResult;\nimport com.adaspace.mp.unispace.dto.UnispaceLoadDataDto;\nimport com.alibaba.cola.dto.SingleResponse;\nimport com.alibaba.fastjson.JSON;\nimport lombok.extern.slf4j.Slf4j;\nimport org.apache.kafka.common.TopicPartition;\nimport org.springframework.beans.factory.annotation.Value;\nimport org.springframework.stereotype.Component;\nimport org.springframework.util.Assert;\n\nimport javax.annotation.Resource;\nimport java.util.WeakHashMap;\n\n\n\/**\n * @Author: Frost\n * @Date: 2021\/11\/12 23:28\n *\/\n\n@Component(\"kafkaTest\")\n@Slf4j\npublic class AiHandler implements MessageHandler {\n    \/**\n     * {k:\u5206\u533a,v:offset} (\u5f85\u7528)\n     *\/\n    WeakHashMap&lt;topicpartition, long=\"\"> offsetMap = new WeakHashMap&amp;lt;&amp;gt;();\n\n    @Resource\n    private OrderGatewayImpl orderGatewayImpl;\n\n    @Override\n    public void handle(String message) {\n        JSON parse = (JSON) JSON.parse(message);\n        AIKafkaRespMessage aiKafkaRespMessage = JSON.toJavaObject(parse, AIKafkaRespMessage.class);\n        KafkaMessageTypeEnum kafkaMessageType = aiKafkaRespMessage.getKafkaMessageType();\n        switch (kafkaMessageType) {\n            case UNTREATED_TASK:\n            case AI_END_TASK:\n                orderGatewayImpl.updateOrderItemData(\n                        aiKafkaRespMessage.getId(),\n                        aiKafkaRespMessage.getStatus(),\n                        aiKafkaRespMessage.getTargetTIFF()\n                );\n                Order order = orderGatewayImpl\n                        .getOrder(aiKafkaRespMessage.getOrderId(), AlgorithmReqExtension.class, new OrderItem2Algorithm());\n                OrderItemData orderItemData =\n                        order.getOrderItem().getOrderItemData()\n                                .stream()\n                                .filter(x -&amp;gt; x.getId()\n                                        .equals(aiKafkaRespMessage.getId())).findAny().orElse(null);\n                Assert.notNull(orderItemData, \"\u63a5\u6536orderItemData\u4e3anull\");\n                orderItemData.setStatus(aiKafkaRespMessage.getStatus());\n                orderItemData.setIsReady(aiKafkaRespMessage.getStatus() == 0);\n                orderGatewayImpl.fireEvent(order, OrderEventType.SHIPPED, aiKafkaRespMessage.getTargetTIFF());\n                break;\n            default:\n                log.info(\"\u6d88\u606f\u6d88\u8d39\u672a\u627e\u5230\u5bf9\u5e94\u5904\u7406case\");\n        }\n    }\n}<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u516c\u5171kafka\u5de5\u5177\u6a21\u5757 \u9488\u5bf9\u4e8e\u4e0d\u540c\u573a\u666f\u7684\u6d88\u8d39\u6d88\u606f \u4ee3\u7801\u7ed3\u6784\u5982\u4e0b consumerListener handle [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":377,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[10,8,6,5],"tags":[],"_links":{"self":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts\/562"}],"collection":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/comments?post=562"}],"version-history":[{"count":9,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts\/562\/revisions"}],"predecessor-version":[{"id":821,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts\/562\/revisions\/821"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/media\/377"}],"wp:attachment":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/media?parent=562"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/categories?post=562"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/tags?post=562"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}