当前位置:首页 >> 编程语言 >> 【SQL篇】一、Flink动态表与流的关系以及DDL语法,汉王电纸书n510(flink 动态表)

【SQL篇】一、Flink动态表与流的关系以及DDL语法,汉王电纸书n510(flink 动态表)

cpugpu芯片开发光刻机 编程语言 1
文件名:【SQL篇】一、Flink动态表与流的关系以及DDL语法,汉王电纸书n510 【SQL篇】一、Flink动态表与流的关系以及DDL语法

文章目录 1、启动SQL客户端2、SQL客户端常用配置3、动态表和持续查询4、将流转为动态表5、用SQL持续查询6、动态表转为流7、时间属性8、DDL-数据库相关9、DDL-表相关

1、启动SQL客户端

启动Flink(基于yarn-session模式为例):

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

再启动sql的客户端:

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session

简单看下:

show databases; 2、SQL客户端常用配置

设置结果的显示模式,默认table,还可以设置为tableau、changelog

SET sql-client.execution.result-mode=changelog;

设置执行环境,默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度:

SET parallelism.default=1;

设置状态TTL:

SET table.exec.state.ttl=1000;

通过SQL文件初始化,可以发现,exit退出客户端时,刚创建的库表都被清空了,这个SQL初始化文件就是在启动客户端时你想执行的SQL语句:

# 创建SQL文件vim conf/sql-client-init.sqlSET sql-client.execution.result-mode=tableau;CREATE DATABASE mydatabase; # 启动时,-i指定SQL文件/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql 3、动态表和持续查询

和MySQL等关系型表不同的是,无限流下,会有源源不断的数据过来进入表中,即动态表,来一条数据,往表中插入一条数据。对应的,想获取最新结果就要写条SQL去不间断的查询,即持续查询(每次数据到来都会触发查询操作),持续查询的结果也是一个动态表。

关系型表流处理的动态表处理的数据对象字段元组的有界集合字段元组的无限序列查询时对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入查询终止条件生成固定大小的结果集后终止永不停止, 根据持续收到的数据不断更新查询结果

如图,持续查询的流程为:

流(stream)被转换为动态表(dynamic table)对动态表进行持续查询(continuous query),生成新的动态表生成的动态表被转换成流

如此,就通过执行SQL实现了对数据流的处理。

4、将流转为动态表

把流看作一张表,来一条数据,insert一次,比如有个记录用户点击事件的无限流:

5、用SQL持续查询

代码中定义一条查询SQL:

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

此时,结果表(动态),可能是简单的insert,如Bob这条数据,也可能是对旧数据的更新update,如Alice,这就是更新查询。 此时,结果表转DataStream调用toChangelogStream()方法。

修改查询SQL,使用TUMBLE加一个开窗,每个窗口触发时,输出结果,此时对结果表就只有insert追加数据,没有update,即追加查询。

6、动态表转为流 仅追加(Append-only)流

动态表仅仅通过insert来修改,转为流时,对应一个仅追加的流,流中的每条数据,就是动态表的每行数据。

撤回(Retract)流

流中有添加消息add和撤回消息retract两种,对应表中:

insert就是add消息delete就是retractupdate就是被改行的retract+新结果的add

更新插入(Upsert)流

流中有更新插入消息upsert和删除消息delete两种,对应表中:

update和insert是upsert消息delete为delete消息

最后,注意,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流两种。

7、时间属性

在表中加个时间字段,数据类型为TimeStamp,分为事件时间和处理时间。事件时间通过watermark语句来定义:

CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH (...);# TIMESTAMP后的3为精确度

以上,ts字段为事件时间属性,且基于ts设置5s的水位线延迟,注意,延迟秒数5必须加单引号。时间戳类型需要转为秒或者毫秒时,可:

# ...ts BIGINT,time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),# ...3即精确到毫秒

定义处理时间属性用procTime函数:

CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()) WITH (...); 8、DDL-数据库相关

建库:

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1=val1, key2=val2, ...)

查询所有库:

SHOW DATABASES

查当前库:

SHOW CURRENT DATABASE

修改库的某些属性:

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

删库:

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

注意,

RESTRICT:删除非空数据库会触发异常。默认启用CASCADE:删除非空数据库也会删除所有相关的表和函数,慎用

切换当前库:

USE database_name; 9、DDL-表相关

建表:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name# 字段({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]# 定义watermark[ <watermark_definition> ][ <table_constraint> ][ , ...n]) # 注释 [COMMENT table_comment]# 分区[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]# Flink特色WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] | AS select_query ]

关于表中的字段:physical_column就是常规列。metadata_column是元数据列,可访问到数据源本身的一些元数据,必须加METADATA关键字标识,如:读取数据写入Kafka时,Kafka引擎给数据打上的时间戳标记:

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' # !!!) WITH ('connector' = 'kafka'...);

自定义的列名称和 Connector 中定义 metadata 字段的名称一样时,后面的FROM省略:

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,`timestamp` TIMESTAMP_LTZ(3) METADATA # !!!) WITH ('connector' = 'kafka'...);

自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致时,会自动强转,因此这两个类型必须可以强转:

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,-- 将时间戳强转为 BIGINT`timestamp` BIGINT METADATA) WITH ('connector' = 'kafka'...);

默认metadata_column列可读可写,加VIRTUAL表示只读:

CREATE TABLE MyTable (`timestamp` BIGINT METADATA, `offset` BIGINT METADATA VIRTUAL, # !!!!`user_id` BIGINT,`name` STRING,) WITH ('connector' = 'kafka'...);

computed_column即计算列,把几列的计算结果做为新列,这在关系型SQL中一般在查询语句中完成,而不存成一个新列。

CREATE TABLE MyTable (`user_id` BIGINT,`price` DOUBLE,`quantity` DOUBLE,`cost` AS price * quanitity # !!!) WITH ('connector' = 'kafka'...);

主键的定义,只支持 not enforced:

CREATE TABLE MyTable (`user_id` BIGINT,`name` STRING,PARYMARY KEY(user_id) not enforced # !!!) WITH ('connector' = 'kafka'...);

with子句,用于指定这个表相关的外部系统的相关配置,如Kafka:

CREATE TABLE KafkaTable (`user_id` BIGINT,`name` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv')

like子句,即在现有表的基础上,创建另一种表:

CREATE TABLE Orders (`user` BIGINT,product STRING,order_time TIMESTAMP(3)) WITH ( 'connector' = 'kafka','scan.startup.mode' = 'earliest-offset');CREATE TABLE Orders_with_watermark (-- Add watermark definitionWATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH (-- Overwrite the startup-mode'scan.startup.mode' = 'latest-offset')LIKE Orders; # !!!!

举例:新表中的value字段加偏引号是因为value和关键字冲突了

CREATE TABLE test(id INT, ts BIGINT, vc INT) WITH ('connector' = 'print');CREATE TABLE test1 (`value` STRING)LIKE test;

create-table-as-select,即CTAS语句,通过查询结果创建表:

CREATE TABLE my_ctas_tableWITH ('connector' = 'kafka',...)AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;# 注意此时不能自己来定义列

查所有表:

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

查某张表信息:

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

修改表名:

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性:

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

删表:

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name
协助本站SEO优化一下,谢谢!
关键词不能为空
同类推荐
«    2025年12月    »
1234567
891011121314
15161718192021
22232425262728
293031
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
搜索
最新留言
文章归档
网站收藏
友情链接