{"id":696,"date":"2021-12-10T11:08:46","date_gmt":"2021-12-10T03:08:46","guid":{"rendered":"https:\/\/blog.frost-s.tk\/?p=696"},"modified":"2021-12-20T09:35:47","modified_gmt":"2021-12-20T01:35:47","slug":"flink-kafka-sink-to-rdbs-%e6%b5%8b%e8%af%95demo","status":"publish","type":"post","link":"https:\/\/blog.frost-s.com\/index.php\/2021\/12\/10\/flink-kafka-sink-to-rdbs-%e6%b5%8b%e8%af%95demo\/","title":{"rendered":"Flink kafka sink to RDBS \u6d4b\u8bd5Demo"},"content":{"rendered":"\n<ul class=\"has-very-light-gray-to-cyan-bluish-gray-gradient-background has-background\" style=\"font-size:20px\"><li><strong>flink sql \u6a21\u5f0f\u4ee3\u7801demo \uff08Java\uff09<\/strong><\/li><\/ul>\n\n\n\n<p><strong>\uff08\u4f7f\u7528flink sql \u8fdb\u884c\u6d41\u5f0f\u5904\u7406\u6ce8\u610f\u5b57\u6bb5\u7684\u6620\u5c04\uff09<\/strong><\/p>\n\n\n\n<div class=\"wp-block-buttons is-content-justification-center is-layout-flex wp-block-buttons-is-layout-flex\">\n<div class=\"wp-block-button is-style-shadow\"><a class=\"wp-block-button__link has-light-green-cyan-to-vivid-green-cyan-gradient-background has-background\" href=\"https:\/\/nightlies.apache.org\/flink\/flink-docs-release-1.13\/docs\/dev\/table\/types\/\" target=\"_blank\" rel=\"noreferrer noopener\"> \u5b98\u65b9\u6587\u6863\u7c7b\u578b\u6620\u5c04 <\/a><\/div>\n<\/div>\n\n\n\n<pre class=\"wp-block-code\"><code>import com.alibaba.fastjson.JSON;\nimport org.apache.flink.streaming.api.datastream.DataStream;\nimport org.apache.flink.streaming.api.datastream.DataStreamSource;\nimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;\nimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;\nimport org.apache.flink.table.api.EnvironmentSettings;\nimport org.apache.flink.table.api.Table;\nimport org.apache.flink.table.api.bridge.java.StreamTableEnvironment;\nimport org.apache.flink.api.common.serialization.SimpleStringSchema;\nimport java.util.Properties;\n\/**\n * @author frost\n *\/\npublic class FlinkStreamJob {\n    public static void main(String&#91;] args) throws Exception {\n        \/\/ set up the streaming execution environment\n        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()\n                .useOldPlanner()\n                .inStreamingMode()\n                .build();\n        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\n        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);\n        String topic = \"topic_invoke_statistics\";\n        \/\/KAFKA properties\n        Properties properties = new Properties();\n        properties.setProperty(\"bootstrap.servers\", \"192.168.128.212:9092\");\n        properties.setProperty(\"group.id\", \"frost-consumer\");\n        properties.setProperty(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");\n        properties.setProperty(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");\n        properties.setProperty(\"auto.offset.reset\", \"latest\");\n        \/\/source \u6e90\u6dfb\u52a0\n        DataStreamSource&lt;String> data = env.addSource(new FlinkKafkaConsumer&lt;>(topic, new SimpleStringSchema(), properties));\n        data.print();\n        \/\/dataStream \u6570\u636e\u64cd\u4f5c\uff0c\u76ee\u524d\u5c06json String \u8f6c\u6362\u4e3a pojo \u5bf9\u8c61\uff0c\u540e\u7eed\u53ef\u4f7f\u7528Flink\u7b97\u5b50\u8fdb\u884c\u6570\u636e\u96c6\u5904\u7406\n        DataStream&lt;Test> mapDs = data.map(line ->\n                JSON.parseObject(line, Test.class));\n        mapDs.print();\n        Table kafkaInputTable = tableEnv.fromDataStream(mapDs);\n        \/\/ kafka \u6570\u636e\u6e90\u6ce8\u518c\u4e3asource \u4e34\u65f6\u8868\n        tableEnv.createTemporaryView(\"kafkaInputTable\", kafkaInputTable);\n        \/\/ Mysql sink\u6e90\u8868\u521b\u5efa\n        \/\/ \u672c\u5730\u8c03\u8bd5 \u5bbf\u4e3b\u673a\u5185\u7f51\u5730\u5740\u7531\u4e8e\u8fde\u63a5\u7684wifi\u4e3a\u52a8\u6001\u5206\u914d\uff0c\u9700\u8981\u786e\u5b9aIP\u5730\u5740\uff0c\u5426\u5219\u4f1a\u8fde\u63a5\u4e0d\u4e0a\n        tableEnv.executeSql(\"CREATE TABLE flink_test_table (\\n\" +\n                \"    host STRING,\\n\" +\n                \"    productId INT,\\n\" +\n                \"    referrer STRING,\\n\" +\n                \"    remoteAddr STRING,\\n\" +\n                \"    remotePort INT,\\n\" +\n                \"    request STRING,\\n\" +\n                \"    requestTime TIMESTAMP,\\n\" +\n                \"    requestUri STRING,\\n\" +\n                \"    scheme STRING,\\n\" +\n                \"    tenantId INT,\\n\" +\n                \"    userAgent STRING\\n\" +\n                \") WITH (\\n\" +\n                \"    'connector.type' = 'jdbc', \\n\" +\n\/\/                \"    'connector.url' = 'jdbc:mysql:\/\/192.168.20.109:3306\/flink-test', \\n\" +\n                \"    'connector.url' = 'jdbc:postgresql:\/\/192.168.128.214:5432\/flink_test', \\n\" +\n                \"    'connector.table' = 'flink_test_table',\\n\" +\n                \"    'connector.username' = 'postgres',\\n\" +\n                \"    'connector.password' = 'test', \\n\" +\n                \"    'connector.write.flush.max-rows' = '1' \\n\" +\n                \")\");\n\/\/        Table query = tableEnv.sqlQuery(\"select productId from kafkaInputTable\");\n        Table query1 = tableEnv.sqlQuery(\"select * from kafkaInputTable\");\n\/\/        tableEnv.toRetractStream(query1, Row.class).print();\n\/\/        tableEnv.sqlQuery(\"select * from kafkaInputTable\").execute().print();\n        tableEnv.executeSql(\"insert into flink_test_table select * from kafkaInputTable\").print();\n        env.execute(\"StreamingJob\");\n    }\n}<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">Flink Table Sink\u5230File\u3001Kafka\u3001Es\u3001Mysql<\/h2>\n\n\n\n<ul class=\"is-style-checkbox has-light-green-cyan-to-vivid-green-cyan-gradient-background has-background\" style=\"font-size:20px\"><li><strong>\u77e5\u8bc6\u70b9<\/strong><\/li><\/ul>\n\n\n\n<p>       \u8868\u7684\u8f93\u51fa\uff0c\u662f\u901a\u8fc7\u5c06\u6570\u636e\u5199\u5165 TableSink \u6765\u5b9e\u73b0\u7684\u3002TableSink \u662f\u4e00\u4e2a\u901a\u7528\u63a5\u53e3\uff0c\u53ef\u4ee5 \u652f\u6301\u4e0d\u540c\u7684\u6587\u4ef6\u683c\u5f0f\u3001\u5b58\u50a8\u6570\u636e\u5e93\u548c\u6d88\u606f\u961f\u5217\u3002<\/p>\n\n\n\n<p>       \u5177\u4f53\u5b9e\u73b0\uff0c\u8f93\u51fa\u8868\u6700\u76f4\u63a5\u7684\u65b9\u6cd5\uff0c\u5c31\u662f\u901a\u8fc7 Table.insertInto() \u65b9\u6cd5\u5c06\u4e00\u4e2a Table \u5199\u5165 \u6ce8\u518c\u8fc7\u7684 TableSink \u4e2d\u3002\u540c\u65f6\u8868\u7684\u8f93\u51fa\u8ddf\u66f4\u65b0\u6a21\u5f0f\u6709\u5173<\/p>\n\n\n\n<p>\u66f4\u65b0\u6a21\u5f0f\uff08Update Mode\uff09<\/p>\n\n\n\n<p>\u200b       \u5bf9\u4e8e\u6d41\u5f0f\u67e5\u8be2\uff08Streaming Queries\uff09\uff0c\u9700\u8981\u58f0\u660e\u5982\u4f55\u5728\uff08\u52a8\u6001\uff09\u8868\u548c\u5916\u90e8\u8fde\u63a5\u5668\u4e4b\u95f4\u6267\u884c \u8f6c\u6362\u3002\u4e0e\u5916\u90e8\u7cfb\u7edf\u4ea4\u6362\u7684\u6d88\u606f\u7c7b\u578b\uff0c\u7531\u66f4\u65b0\u6a21\u5f0f\uff08update mode\uff09\u6307\u5b9a\u3002<\/p>\n\n\n\n<p>\u200b Flink Table API \u4e2d\u7684\u66f4\u65b0\u6a21\u5f0f\u6709\u4ee5\u4e0b\u4e09\u79cd\uff1a<\/p>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>\u8ffd\u52a0\u6a21\u5f0f\uff08Append Mode\uff09<\/p><cite>\u200b \u5728\u8ffd\u52a0\u6a21\u5f0f\u4e0b\uff0c\u8868\uff08\u52a8\u6001\u8868\uff09\u548c\u5916\u90e8\u8fde\u63a5\u5668\u53ea\u4ea4\u6362\u63d2\u5165\uff08Insert\uff09\u6d88\u606f\u3002<\/cite><\/blockquote>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>\u64a4\u56de\u6a21\u5f0f\uff08Retract Mode\uff09<\/p><cite>\u200b \u5728\u64a4\u56de\u6a21\u5f0f\u4e0b\uff0c\u8868\u548c\u5916\u90e8\u8fde\u63a5\u5668\u4ea4\u6362\u7684\u662f\uff1a\u6dfb\u52a0\uff08Add\uff09\u548c\u64a4\u56de\uff08Retract\uff09\u6d88\u606f\u3002<br>\u200b \u63d2\u5165\uff08Insert\uff09\u4f1a\u88ab\u7f16\u7801\u4e3a\u6dfb\u52a0\u6d88\u606f\uff1b<br>\u200b \u5220\u9664\uff08Delete\uff09\u5219\u7f16\u7801\u4e3a\u64a4\u56de\u6d88\u606f\uff1b<br>\u200b \u66f4\u65b0\uff08Update\uff09\u5219\u4f1a\u7f16\u7801\u4e3a\uff0c\u5df2\u66f4\u65b0\u884c\uff08\u4e0a\u4e00\u884c\uff09\u7684\u64a4\u56de\u6d88\u606f\uff0c\u548c\u66f4\u65b0\u884c\uff08\u65b0\u884c\uff09 \u7684\u6dfb\u52a0\u6d88\u606f\u3002<br>\u200b \u5728\u6b64\u6a21\u5f0f\u4e0b\uff0c\u4e0d\u80fd\u5b9a\u4e49 key\uff0c\u8fd9\u4e00\u70b9\u8ddf upsert \u6a21\u5f0f\u5b8c\u5168\u4e0d\u540c\u3002<\/cite><\/blockquote>\n\n\n\n<hr class=\"wp-block-separator\"\/>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>\u66f4\u65b0\u6a21\u5f0f \uff08Upsert Mode\uff09<\/p><cite>\u200b \u5728 Upsert \u6a21\u5f0f\u4e0b\uff0c\u52a8\u6001\u8868\u548c\u5916\u90e8\u8fde\u63a5\u5668\u4ea4\u6362 Upsert \u548c Delete \u6d88\u606f\u3002<br>\u200b \u8fd9\u4e2a\u6a21\u5f0f\u9700\u8981\u4e00\u4e2a\u552f\u4e00\u7684 key\uff0c\u901a\u8fc7\u8fd9\u4e2a key \u53ef\u4ee5\u4f20\u9012\u66f4\u65b0\u6d88\u606f\u3002\u4e3a\u4e86\u6b63\u786e\u5e94\u7528\u6d88\u606f\u5916\u90e8\u8fde\u63a5\u5668\u9700\u8981\u77e5\u9053\u8fd9\u4e2a\u552f\u4e00 key \u7684\u5c5e\u6027\u3002<br>\u200b \u63d2\u5165\uff08Insert\uff09\u548c\u66f4\u65b0\uff08Update\uff09\u90fd\u88ab\u7f16\u7801\u4e3a Upsert \u6d88\u606f\uff1b<br>\u200b \u5220\u9664\uff08Delete\uff09\u7f16\u7801\u4e3a Delete \u4fe1\u606f\u3002<br>\u200b \u8fd9\u79cd\u6a21\u5f0f\u548c Retract \u6a21\u5f0f\u7684\u4e3b\u8981\u533a\u522b\u5728\u4e8e\uff0cUpdate \u64cd\u4f5c\u662f\u7528\u5355\u4e2a\u6d88\u606f\u7f16\u7801\u7684\uff0c\u6240\u4ee5\u6548\u7387 \u4f1a\u66f4\u9ad8\u3002<\/cite><\/blockquote>\n\n\n\n<ul class=\"has-very-light-gray-to-cyan-bluish-gray-gradient-background has-background\" style=\"font-size:18px\"><li><strong>\u6587\u4ef6\u4ee3\u7801\u6848\u4f8b<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>package guigu.table.sink\nimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment\nimport org.apache.flink.table.api.{DataTypes, Table}\nimport org.apache.flink.table.api.scala.StreamTableEnvironment\nimport org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}\nobject FileSink {\n  def main(args: Array&#91;String]): Unit = {\n    \/\/1\u3001\u73af\u5883\u51c6\u5907\n    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment\n    env.setParallelism(1)\n    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)\n    \/\/2\u3001\u8bfb\u53d6\u6570\u636e\uff0c\u521b\u5efa\u8868\u89c6\u56fe\n    val inputFile = \"E:\\\\java\\\\demo\\\\src\\\\main\\\\resources\\\\file\\\\data5.csv\"\n    tableEnv.connect(new FileSystem().path(inputFile))\n      .withFormat(new Csv())\n      .withSchema(new Schema()\n        .field(\"id\",DataTypes.STRING())\n        .field(\"temperature\",DataTypes.DOUBLE())\n        .field(\"timestamp\",DataTypes.BIGINT())\n      )\n      .createTemporaryTable(\"inputTable\")\n    \/\/3\u3001table api\u8f6c\u6362\n    val tableApi: Table = tableEnv.from(\"inputTable\")\n    val apiResult: Table = tableApi.select(\"id,temperature\").where(\"id = 'sensor_1'\")\n    val sqlResult: Table = tableEnv.sqlQuery(\"select id,temperature from inputTable where id = 'sensor_1'\")\n    \/\/\u5b57\u7b26\u4e32\u6a21\u677f\n    val sqlModelResult: Table = tableEnv.sqlQuery(\n      \"\"\"\n        |select id,temperature\n        |from inputTable\n        |where id = 'sensor_1'\n      \"\"\".stripMargin)\n    \/\/4\u3001\u521b\u5efa\u8f93\u51fa\u8868\u89c6\u56fe\n    val outputFile = \"E:\\\\java\\\\demo\\\\src\\\\main\\\\resources\\\\file\\\\outputFile.csv\"\n    tableEnv.connect(new FileSystem().path(outputFile))\n      .withFormat(new Csv())\n      .withSchema(new Schema()\n        .field(\"id\",DataTypes.STRING())\n        .field(\"temperature\",DataTypes.DOUBLE())\n        )\n      .createTemporaryTable(\"outputTable\")\n    \/\/5\u3001\u6267\u884c\n    sqlModelResult.insertInto(\"outputTable\")\n    tableEnv.execute(\"Flink Sink Flie Test\")\n  }\n}<\/code><\/pre>\n\n\n\n<ul class=\"has-very-light-gray-to-cyan-bluish-gray-gradient-background has-background\" style=\"font-size:18px\"><li><strong>Es\u4ee3\u7801\u6848\u4f8b<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>    &lt;dependency&gt;\n      &lt;groupId&gt;org.apache.flink&lt;\/groupId&gt;\n      &lt;artifactId&gt;flink-json&lt;\/artifactId&gt;\n      &lt;version&gt;1.10.1&lt;\/version&gt;\n    &lt;\/dependency&gt;<\/code><\/pre>\n\n\n\n<pre class=\"wp-block-code\"><code>package table.tableSink\nimport org.apache.flink.streaming.api.scala._\nimport org.apache.flink.table.api.scala.StreamTableEnvironment\nimport org.apache.flink.table.api.scala._\nimport org.apache.flink.table.api.{DataTypes, Table}\nimport org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Kafka, Schema}\nobject EsSink {\n  def main(args: Array&#91;String]): Unit = {\n    \/\/1\u3001\u73af\u5883\u51c6\u5907\n    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment\n    env.setParallelism(1)\n    val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)\n    \/\/2\u3001\u8bfb\u53d6\u6570\u636e\u5e76\u8f6c\u4e3a\u8868\u89c6\u56fe\n    val filePath = \"G:\\\\Java\\\\Flink\\\\guigu\\\\flink\\\\src\\\\main\\\\resources\\\\sensor.txt\"\n    val schema: Schema = new Schema().field(\"id\", DataTypes.STRING())\n      .field(\"timestamp\", DataTypes.BIGINT())\n      .field(\"temperature\", DataTypes.DOUBLE())\n    streamTableEnv.connect(new FileSystem().path(filePath))\n      .withFormat(new Csv())\n      .withSchema(schema)\n      .createTemporaryTable(\"inputTable\")\n    \/\/3\u3001\u8868\u7684\u8f6c\u6362\n    val inputTable: Table = streamTableEnv.from(\"inputTable\")\n    val resultTable: Table = inputTable.select(\"id,temperature\").where(\"id = 'sensor_1'\")\n    val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'count)\n    \/\/4\u3001\u6ce8\u518c\u8868\u8f93\u51fa\u89c6\u56fe\uff0c\u8f93\u51fa\u5230es\n    streamTableEnv.connect(\n      new Elasticsearch()\n        .version(\"6\")\n        .host(\"localhost\", 9200, \"http\")\n        .index(\"sensor\")\n        .documentType(\"_doc\")\n\u3000\u3000\u3000\u3000\u3000.bulkFlushMaxActions(1) \/\/\u4e00\u5b9a\u8981\u52a0\u5440\uff0c\u5426\u5219\u6570\u636e\u90fd\u5728\u5185\u5b58\u4e2d\uff0c\u6ca1\u6709\u8f93\u51fa\u5230es\n    )\n      .inUpsertMode()\n      .withFormat(new Json())\n      .withSchema(new Schema().field(\"id\",DataTypes.STRING())\n      .field(\"temperature\",DataTypes.DOUBLE()))\n      .createTemporaryTable(\"outputEsTable\")\n    \/\/5\u3001\u6267\u884c\n    aggTable.insertInto(\"outputEsTable\")\n    env.execute()\n  }\n}<\/code><\/pre>\n\n\n\n<ul class=\"has-very-light-gray-to-cyan-bluish-gray-gradient-background has-background\" style=\"font-size:18px\"><li><strong>Kafka\u4ee3\u7801\u6848\u4f8b<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>package table.tableSink\nimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment\nimport org.apache.flink.table.api.scala.StreamTableEnvironment\nimport org.apache.flink.table.api.{DataTypes, Table}\nimport org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}\nobject KafkaSink {\n  def main(args: Array&#91;String]): Unit = {\n    \/\/1\u3001\u8868\u7684\u73af\u5883\u51c6\u5907\n    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment\n    env.setParallelism(1)\n    val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)\n    \/\/2\u3001\u8bfb\u53d6\u6570\u636e\u5e76\u8f6c\u4e3a\u8868\u89c6\u56fe\n    val filePath = \"G:\\\\Java\\\\Flink\\\\guigu\\\\flink\\\\src\\\\main\\\\resources\\\\sensor.txt\"\n    val outputPath = \"G:\\\\Java\\\\Flink\\\\guigu\\\\flink\\\\src\\\\main\\\\resources\\\\output.txt\"\n    val schema: Schema = new Schema().field(\"id\", DataTypes.STRING())\n      .field(\"timestamp\", DataTypes.BIGINT())\n      .field(\"temperature\", DataTypes.DOUBLE())\n    streamTableEnv.connect(new FileSystem().path(filePath))\n      .withFormat(new Csv())\n      .withSchema(schema)\n      .createTemporaryTable(\"inputTable\")\n    \/\/3\u3001\u8868\u7684\u57fa\u672c\u8f6c\u6362\n    val inputTable: Table = streamTableEnv.from(\"inputTable\")\n    val resultTable: Table = inputTable.select(\"id,temperature\").where(\"id = 'sensor_1'\")\n    \/\/4\u3001\u6ce8\u518c\u8f93\u51fa\u8868\u89c6\u56fe\uff0c\u8f93\u51fa\u81f3kafka\n    streamTableEnv.connect(\n      new Kafka()\n        .version(\"0.11\")\n        .topic(\"sinkTest\")\n        .property(\"zookeeper.connect\", \"localhost:2181\")\n        .property(\"bootstrap.servers\", \"localhost:9092\")\n    )\n      .withFormat(new Csv())\n      .withSchema(new Schema().field(\"id\",DataTypes.STRING())\n      .field(\"temperature\",DataTypes.DOUBLE()))\n      .createTemporaryTable(\"outputKafkaTable\")\n    \/\/5\u3001\u6267\u884c\n    resultTable.insertInto(\"outputKafkaTable\")\n    env.execute()\n  }\n}<\/code><\/pre>\n\n\n\n<ul class=\"has-very-light-gray-to-cyan-bluish-gray-gradient-background has-background\" style=\"font-size:18px\"><li><strong>mysql\u4ee3\u7801\u6848\u4f8b<\/strong><\/li><\/ul>\n\n\n\n<pre class=\"wp-block-code\"><code>    &lt;dependency&gt;\n      &lt;groupId&gt;org.apache.flink&lt;\/groupId&gt;\n      &lt;artifactId&gt;flink-jdbc_2.12&lt;\/artifactId&gt;\n      &lt;version&gt;1.10.1&lt;\/version&gt; \n    &lt;\/dependency&gt;<\/code><\/pre>\n\n\n\n<pre class=\"wp-block-code\"><code>package table.tableSink\nimport org.apache.flink.streaming.api.scala._\nimport org.apache.flink.table.api.scala.{StreamTableEnvironment, _}\nimport org.apache.flink.table.api.{DataTypes, Table}\nimport org.apache.flink.table.descriptors._\nobject MysqlSink {\n  def main(args: Array&#91;String]): Unit = {\n    \/\/1\u3001\u73af\u5883\u51c6\u5907\n    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment\n    env.setParallelism(1)\n    val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)\n    \/\/2\u3001\u8bfb\u53d6\u6570\u636e\u5e76\u8f6c\u4e3a\u8868\u89c6\u56fe\n    val filePath = \"G:\\\\Java\\\\Flink\\\\guigu\\\\flink\\\\src\\\\main\\\\resources\\\\sensor.txt\"\n    val schema: Schema = new Schema().field(\"id\", DataTypes.STRING())\n      .field(\"timestamp\", DataTypes.BIGINT())\n      .field(\"temperature\", DataTypes.DOUBLE())\n    streamTableEnv.connect(new FileSystem().path(filePath))\n      .withFormat(new Csv())\n      .withSchema(schema)\n      .createTemporaryTable(\"inputTable\")\n    \/\/3\u3001\u8868\u7684\u8f6c\u6362\n    val inputTable: Table = streamTableEnv.from(\"inputTable\")\n    val resultTable: Table = inputTable.select(\"id,temperature\").where(\"id = 'sensor_1'\")\n    val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'cnt)\n    \/\/4\u3001\u521b\u5efamysql DDL\uff0c\u5e76\u5728\u73af\u5883\u4e2d\u6267\u884c  with\u8868\u793a\u8fde\u63a5\u5668\n    val sinkDDL: String =\n      \"\"\"\n        |create table jdbcOutputTable (\n        | id varchar(20) not null,\n        | cnt bigint not null\n        |) with (\n        | 'connector.type' = 'jdbc',\n        | 'connector.url' = 'jdbc:mysql:\/\/localhost:3306\/test',\n        | 'connector.table' = 'sensor_count',\n        | 'connector.driver' = 'com.mysql.jdbc.Driver',\n        | 'connector.username' = 'root',\n        | 'connector.password' = '123456'\n        |) \"\"\".stripMargin\n    streamTableEnv.sqlUpdate(sinkDDL)\n    \/\/5\u3001\u6267\u884c\n    aggTable.insertInto(\"jdbcOutputTable\")\n    env.execute()\n  }\n}<\/code><\/pre>\n\n\n\n<h3 class=\"wp-block-heading\">flink 1.12 \u6ce8\u610f\u4e8b\u9879<\/h3>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>flink \u4f9d\u8d56\u52a0\u8f7d\u95ee\u9898<\/p><\/blockquote>\n\n\n\n<pre class=\"wp-block-code\"><code># Child first classloading allows users to use different dependency\/library\n# versions in their application than those in the classpath. Switching back\n# to 'parent-first' may help with debugging dependency issues.\n#\n classloader.resolve-order: parent-first<\/code><\/pre>\n\n\n\n<p>\u9700\u8981\u5c06\u7c7b\u52a0\u8f7d\u4fee\u6539\uff0c\u7531\u4e8eflink 1.12 \u66f4\u6539\u4e3a\u4e86\u65e0\u987a\u5e8f\u52a0\u8f7d\u4f9d\u8d56<\/p>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>\u6dfb\u52a0 mysql \u9a71\u52a8\u4f9d\u8d56<\/p><\/blockquote>\n\n\n\n<pre class=\"wp-block-code\"><code> &lt;!-- mysql \u9a71\u52a8 --&gt;\n        &lt;dependency&gt;\n            &lt;groupId&gt;mysql&lt;\/groupId&gt;\n            &lt;artifactId&gt;mysql-connector-java&lt;\/artifactId&gt;\n            &lt;version&gt;8.0.26&lt;\/version&gt;\n        &lt;\/dependency&gt;<\/code><\/pre>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>\u6dfb\u52a0 postgreSql \u9a71\u52a8<\/p><\/blockquote>\n\n\n\n<pre class=\"wp-block-code\"><code>        &lt;dependency&gt;\n            &lt;groupId&gt;org.postgresql&lt;\/groupId&gt;\n            &lt;artifactId&gt;postgresql&lt;\/artifactId&gt;\n            &lt;version&gt;42.2.19&lt;\/version&gt;\n        &lt;\/dependency&gt;<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>flink sql \u6a21\u5f0f\u4ee3\u7801demo \uff08Java\uff09 \uff08\u4f7f\u7528flink sql \u8fdb\u884c\u6d41\u5f0f\u5904\u7406\u6ce8\u610f\u5b57\u6bb5\u7684\u6620\u5c04\uff09 F [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":695,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[11,10,9,8,6,5],"tags":[],"_links":{"self":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts\/696"}],"collection":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/comments?post=696"}],"version-history":[{"count":8,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts\/696\/revisions"}],"predecessor-version":[{"id":847,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/posts\/696\/revisions\/847"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/media\/695"}],"wp:attachment":[{"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/media?parent=696"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/categories?post=696"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.frost-s.com\/index.php\/wp-json\/wp\/v2\/tags?post=696"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}