- flink sql 模式代码demo (Java)
(使用flink sql 进行流式处理注意字段的映射)
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
/**
* @author frost
*/
public class FlinkStreamJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String topic = "topic_invoke_statistics";
//KAFKA properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.128.212:9092");
properties.setProperty("group.id", "frost-consumer");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
//source 源添加
DataStreamSource<String> data = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));
data.print();
//dataStream 数据操作,目前将json String 转换为 pojo 对象,后续可使用Flink算子进行数据集处理
DataStream<Test> mapDs = data.map(line ->
JSON.parseObject(line, Test.class));
mapDs.print();
Table kafkaInputTable = tableEnv.fromDataStream(mapDs);
// kafka 数据源注册为source 临时表
tableEnv.createTemporaryView("kafkaInputTable", kafkaInputTable);
// Mysql sink源表创建
// 本地调试 宿主机内网地址由于连接的wifi为动态分配,需要确定IP地址,否则会连接不上
tableEnv.executeSql("CREATE TABLE flink_test_table (\n" +
" host STRING,\n" +
" productId INT,\n" +
" referrer STRING,\n" +
" remoteAddr STRING,\n" +
" remotePort INT,\n" +
" request STRING,\n" +
" requestTime TIMESTAMP,\n" +
" requestUri STRING,\n" +
" scheme STRING,\n" +
" tenantId INT,\n" +
" userAgent STRING\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc', \n" +
// " 'connector.url' = 'jdbc:mysql://192.168.20.109:3306/flink-test', \n" +
" 'connector.url' = 'jdbc:postgresql://192.168.128.214:5432/flink_test', \n" +
" 'connector.table' = 'flink_test_table',\n" +
" 'connector.username' = 'postgres',\n" +
" 'connector.password' = 'test', \n" +
" 'connector.write.flush.max-rows' = '1' \n" +
")");
// Table query = tableEnv.sqlQuery("select productId from kafkaInputTable");
Table query1 = tableEnv.sqlQuery("select * from kafkaInputTable");
// tableEnv.toRetractStream(query1, Row.class).print();
// tableEnv.sqlQuery("select * from kafkaInputTable").execute().print();
tableEnv.executeSql("insert into flink_test_table select * from kafkaInputTable").print();
env.execute("StreamingJob");
}
}
Flink Table Sink到File、Kafka、Es、Mysql
- 知识点
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。
具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入 注册过的 TableSink 中。同时表的输出跟更新模式有关
更新模式(Update Mode)
对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。
Flink Table API 中的更新模式有以下三种:
追加模式(Append Mode)
在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。
撤回模式(Retract Mode)
在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
插入(Insert)会被编码为添加消息;
删除(Delete)则编码为撤回消息;
更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。
在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。
更新模式 (Upsert Mode)
在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。
这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息外部连接器需要知道这个唯一 key 的属性。
插入(Insert)和更新(Update)都被编码为 Upsert 消息;
删除(Delete)编码为 Delete 信息。
这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。
- 文件代码案例
package guigu.table.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object FileSink {
def main(args: Array[String]): Unit = {
//1、环境准备
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
//2、读取数据,创建表视图
val inputFile = "E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"
tableEnv.connect(new FileSystem().path(inputFile))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("temperature",DataTypes.DOUBLE())
.field("timestamp",DataTypes.BIGINT())
)
.createTemporaryTable("inputTable")
//3、table api转换
val tableApi: Table = tableEnv.from("inputTable")
val apiResult: Table = tableApi.select("id,temperature").where("id = 'sensor_1'")
val sqlResult: Table = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_1'")
//字符串模板
val sqlModelResult: Table = tableEnv.sqlQuery(
"""
|select id,temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)
//4、创建输出表视图
val outputFile = "E:\\java\\demo\\src\\main\\resources\\file\\outputFile.csv"
tableEnv.connect(new FileSystem().path(outputFile))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id",DataTypes.STRING())
.field("temperature",DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable")
//5、执行
sqlModelResult.insertInto("outputTable")
tableEnv.execute("Flink Sink Flie Test")
}
}
- Es代码案例
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.10.1</version>
</dependency>
package table.tableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Kafka, Schema}
object EsSink {
def main(args: Array[String]): Unit = {
//1、环境准备
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
//2、读取数据并转为表视图
val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
val schema: Schema = new Schema().field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
streamTableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(schema)
.createTemporaryTable("inputTable")
//3、表的转换
val inputTable: Table = streamTableEnv.from("inputTable")
val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'")
val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'count)
//4、注册表输出视图,输出到es
streamTableEnv.connect(
new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("_doc")
.bulkFlushMaxActions(1) //一定要加呀,否则数据都在内存中,没有输出到es
)
.inUpsertMode()
.withFormat(new Json())
.withSchema(new Schema().field("id",DataTypes.STRING())
.field("temperature",DataTypes.DOUBLE()))
.createTemporaryTable("outputEsTable")
//5、执行
aggTable.insertInto("outputEsTable")
env.execute()
}
}
- Kafka代码案例
package table.tableSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
object KafkaSink {
def main(args: Array[String]): Unit = {
//1、表的环境准备
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
//2、读取数据并转为表视图
val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
val outputPath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\output.txt"
val schema: Schema = new Schema().field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
streamTableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(schema)
.createTemporaryTable("inputTable")
//3、表的基本转换
val inputTable: Table = streamTableEnv.from("inputTable")
val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'")
//4、注册输出表视图,输出至kafka
streamTableEnv.connect(
new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema().field("id",DataTypes.STRING())
.field("temperature",DataTypes.DOUBLE()))
.createTemporaryTable("outputKafkaTable")
//5、执行
resultTable.insertInto("outputKafkaTable")
env.execute()
}
}
- mysql代码案例
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.1</version>
</dependency>
package table.tableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
object MysqlSink {
def main(args: Array[String]): Unit = {
//1、环境准备
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
//2、读取数据并转为表视图
val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
val schema: Schema = new Schema().field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
streamTableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(schema)
.createTemporaryTable("inputTable")
//3、表的转换
val inputTable: Table = streamTableEnv.from("inputTable")
val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'")
val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'cnt)
//4、创建mysql DDL,并在环境中执行 with表示连接器
val sinkDDL: String =
"""
|create table jdbcOutputTable (
| id varchar(20) not null,
| cnt bigint not null
|) with (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://localhost:3306/test',
| 'connector.table' = 'sensor_count',
| 'connector.driver' = 'com.mysql.jdbc.Driver',
| 'connector.username' = 'root',
| 'connector.password' = '123456'
|) """.stripMargin
streamTableEnv.sqlUpdate(sinkDDL)
//5、执行
aggTable.insertInto("jdbcOutputTable")
env.execute()
}
}
flink 1.12 注意事项
flink 依赖加载问题
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
classloader.resolve-order: parent-first
需要将类加载修改,由于flink 1.12 更改为了无顺序加载依赖
添加 mysql 驱动依赖
<!-- mysql 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
添加 postgreSql 驱动
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.19</version>
</dependency>