Flink SQL的使用

SQL想必大家都不会陌生,在大数据开发和引用中,使用SQL是非常常用,而Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合ANSI标准 SQL 语义的开发语言,它面向用户的 API 层,不同于JAVA、SCALA去开发业务逻辑,虽然灵活,但有一些不足,并且Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一

虽然跟传统的MySQL方言很相似,但是意义并完全一样,在学习之前,我们需要准备好环境,可以使用docker run一个flink,或者直接在本机运行(我使用的是MAC)

进入flink/bin/

我们可以看到里面有很多.sh

其实不难发现,每个.sh文件命名都表明了运行的方式,之前有介绍过Flink的运行环境,此处我们使用到的是Flink SQL,所以直接打开./sql-cluster.sh

其实严格意义上来说,sql-client还只是一个“玩具”,看到上面写着BETA,我们生产环境中一般都不会用这种方式运行,像ververica platform都有提供封装好的SQL编辑器,后续我分享一下Streamx的使用,此处我们主要是熟悉Flink SQL

运行起来以后,就可以放一下,我们先学习一下语法

CREATE TABLE 建表,语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用

SELECT 查询数据

SHOW 用于列出所有的catalog、database、function等

INSERT INTO|OVERWRITE 用来向表中添加行(INTO是追加,OVERWRITE是覆盖)

是不是和我们学习MySQL的差不多,CREATE TABLE建表,SHOW查看建表的结果,INSERT INTO插入数据,SELECT查询出数据,这些都是我们最基本和最简单的SQL语法

其他语法后续介绍,先思考一下,如何使用Flink SQL,因为是流式计算引擎,所以我们需要source和sink,使用Flink SQL是通过INSERT sink数据到下游,那么source,我们一般会有kafka,将业务层的数据采集到kafka中,我们看一个例子

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
-- 定义数据的来源
 ) WITH (
 -- Using the kafka connector
'connector.type' = 'kafka', 
 -- kafka topic
'connector.topic' = 'user_behavior',
'connector.properties.bootstrap.servers'='服务地址',
 -- Read from start offset
'connector.startup-mode' = 'earliest-offset',
 -- The data source format is json
'format.type' = 'json'
 );
CREATE TABLE sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
 ) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://mysql:3306/test', -- jdbc url
'connector.table' = 'test_table',
'connector.username' = 'username',
'connector.password' = 'password',
-- Default 5000, changed to 100 for demonstration
'connector.write.flush.max-rows' = '100' 
 );

INSERT INTO sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm');

此时如果打开localhost:8080的web ui界面就能看到跟我们使用datastream api实现一样的效果,但是我们不需要繁琐的引包、建项目和编写JAVA/SCALA代码就能实现一个简单的ETL

下面介绍一下CDC,刚刚的例子我们使用到了kafka,当前的场景,假设是从mysql中提取数据到elasticsearch,需要什么呢?我们需要一个从mysql获取数据到kafka的工具或者平台,业界比较成熟的方案:debezium、canal、streamsets等等,我介绍一个基于debezium实现的flink cdc:https://github.com/ververica/flink-cdc-connectors

这是一个非常优秀的框架,同时解决了我们需要维护多套组件的问题

CDC意思是捕获数据的变化,底层使用到的debezium其实最早也是跟kafka对接,我们再从kafka中拿到数据,但是这个框架很好贴合了flink的原生,一站式解决ETL方案,具体的介绍可以去github看,我们来使用一下

将官方的包下载放入flink的lib目录下即可,结合打开sql-client.sh

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
   PRIMARY KEY (user_id) NOT ENFORCED
 ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'username',
   'password' = 'password',
   'database-name' = 'database',
   'table-name' = 'user_log'
 );
CREATE TABLE sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
 ) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://mysql:3306/test', -- jdbc url
'connector.table' = 'test_table',
'connector.username' = 'username',
'connector.password' = 'password',
-- Default 5000, changed to 100 for demonstration
'connector.write.flush.max-rows' = '100' 
 );

INSERT INTO sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm');

我们可以发现,除了with其他基本一样,如果我们需要把sink改成elasticsearch,同样的,修改一下with即可

CREATE TABLE es_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'pvuv'
 );

总结:

使用Flink SQL能大大降低我们的学习成本,提高开发效率,目前还有一些不足和不兼容的地方,社区也在慢慢完善,相信在将来能给我们展现更好的Flink SQL,虽然是不支持一些特性和API,但是我们可以做取舍,将但部分的业务转为Flink SQL,像字节跳动,基本上90%的业务都适用Flink SQL实现,可见其重要性,其他进阶和高级的使用后续我们再结合例子来介绍。

Leave a Reply

Your email address will not be published. Required fields are marked *