Flink | 六十、FlinkSQL之SQL 客户端

作者: 雨中散步撒哈拉
  • [一、SQL 客户端]
  • [二、SQL客户端使用流程]
    • [1. 首先启动本地集群]
    • [2. 启动 Flink SQL 客户端]
    • [3. 设置运行模式]
    • [4. 执行 SQL 查询]

使用直白的sql进行运算,是方便的、是集中注意力到sql编写中的

一、SQL 客户端

有了 Table API 和 SQL,我们就可以使用熟悉的 SQL 来编写查询语句进行流处理了。不过,这种方式还是将 SQL 语句嵌入到 Java/Scala 代码中进行的;另外,写完的代码后想要提交作业还需要使用工具进行打包。这都给 Flink 的使用设置了门槛,如果不是 Java/Scala 程序员,即使是非常熟悉SQL 的工程师恐怕也会望而生畏了。

基于这样的考虑,Flink 为我们提供了一个工具来进行 Flink 程序的编写、测试和提交,这工具叫作"SQL 客户端"。SQL 客户端提供了一个命令行交互界面(CLI),我们可以在里面非常容易地编写SQL 进行查询,就像使用 MySQL 一样;整个 Flink 应用编写、提交的过程全变成了写 SQL,不需要写一行Java/Scala 代码。

把自己的sql和业务代码脱离,减少打包过程和调试代码的过程

二、SQL客户端使用流程

1. 首先启动本地集群

./bin/start-cluster.sh
./bin/sql-client.sh

SQL 客户端的启动脚本同样位于 Flink 的 bin 目录下。默认的启动模式是 embedded,也就是说客户端是一个嵌入在本地的进程,这是目前唯一支持的模式。未来会支持连接到远程 SQL 客户端的模式。

3. 设置运行模式

启动客户端后,就进入了命令行界面,这时就可以开始写 SQL 了。一般我们会在开始之前对环境做一些设置,比较重要的就是运行模式。

首先是表环境的运行时模式,有流处理和批处理两个选项。默认为流处理:
Flink SQL> SET ‘execution.runtime-mode’ = ‘streaming’;

其次是 SQL 客户端的"执行结果模式",主要有 table、changelog、tableau 三种,默认为table 模式:
Flink SQL> SET ‘sql-client.execution.result-mode’ = ‘table’;
table 模式就是最普通的表处理模式,结果会以逗号分隔每个字段;changelog 则是更新日志模式,会在数据前加上"+"(表示插入)或"-"(表示撤回)的前缀;而 tableau 则是经典的可视化表模式,结果会是一个虚线框的表格。

此外我们还可以做一些其它可选的设置,比如之前提到的空闲状态生存时间(TTL):
Flink SQL> SET ‘table.exec.state.ttl’ = ‘1000’;

除了在命令行进行设置,我们也可以直接在 SQL 客户端的配置文件 sql-cli-defaults.yaml 中进行各种配置,甚至还可以在这个 yaml 文件里预定义表、函数和 catalog。关于配置文件的更多用法,大家可以查阅官网的详细说明。

4. 执行 SQL 查询

接下来就可以愉快的编写 SQL 语句了,这跟操作 MySQL、Oracle 等关系型数据库没什么区别。

我们可以尝试把一开始举的简单聚合例子写一下:

Flink SQL > CREATE TABLE EventTable(
user STRING,

url STRING,

`timestamp` BIGINT

) WITH (

'connector' = 'filesystem',

'path'= 'events.csv',

'format'= 'csv'

 );

Flink SQL > CREATE TABLE ResultTable (
user STRING,

cnt BIGINT

) WITH (
'connector' = 'print'
);
Flink SQL> INSERT INTO ResultTable SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user;

这里我们直接用 DDL 创建两张表,注意需要有 WITH 定义的外部连接。一张表叫作EventTable,是从外部文件 events.csv 中读取数据的,这是输入数据表;另一张叫作ResultTable,连接器为"print",其实就是标准控制台打印,当然就是输出表了。所以接下来就可以直接执行 SQL 查询,并将查询结果 INSERT 写入结果表中了。

在 SQL 客户端中,每定义一个 SQL 查询,就会把它作为一个 Flink 作业提交到集群上执行。所以通过这种方式,我们可以快速地对流处理程序进行开发测试。

文章作者: 雨中散步撒哈拉

文章列表

更多推荐

更多
  • Apache StreamPark-用户、团队、角色以及成员管理 用户管理 ADMIN 创建或修改用户时可以指定用户类型,用户类型有 ADMIN 和 USER 两种。ADMIN 表示系统管理员,即:StreamPark 的超级管理员,有 StreamPark 管理页面以及各个团队的所有权限。USER ...
  • Apache StreamPark-Docker 快速使用教程 使用 Docker 完成StreamPark的部署。 前置条件 Docker 1.13.1+ Docker Compose 1.28.0+ 安装docker 使用 docker 启动服务,使用 docker-compose ...
  • Apache StreamPark-快速开始 本章节看看如果用 streampark-console 快速部署运行一个作业, 用 streampark 开发的项目都做了很好的支持,下面我们使用 streampark-quickstart 来快速开启 streampark-console...
  • Apache StreamPark-变量管理 背景介绍 ...
  • Apache StreamPark-LDAP 快速使用教程 LDAP简介 LDAP(Light Directory Access Portocol),它是基于X.500标准的轻量级目录访问协议。 ...
  • Apache StreamPark-安装部署 StreamPark 总体组件栈架构如下, 由 streampark-core 和 streampark-console 两个大的部分组成, 定位是一个综合实时数据平台,流式数仓平台, 低代码 ( Low Code )...
  • Apache StreamPark FlinkSQL-数据类型 Flink SQL有一组丰富的本地数据类型可供用户使用。 数据类型描述表生态系统中值的逻辑类型,它可用于声明操作的输入和/或输出类型。 ...
  • Apache StreamPark FlinkSQL-查询配置 任务执行配置 以下选项可用于调优查询执行的性能。table.exec.async-lookup,table.exec.deduplicate,以下配置可用于调整查询优化器,以获得更好的执行计划。table.optimizer.agg-...
  • Apache StreamPark FlinkSQL-性能调整 SQL是数据分析中使用最广泛的语言。Flink的Table API和SQL使用户可以用更少的时间和精力去开发高效的流分析应用程序。 此外,Flink Table API和SQL都被进行了有效的优化,集成了大量查询优化和算子优化实现。
  • Apache StreamPark FlinkSQL-读写hive Apache Hive 已经成为了数据仓库生态系统中的核心。它不仅仅是一个用于大数据分析和ETL场景的SQL引擎。 Flink 与 Hive 的集成包含两个层。一是利用了 Hive 的 MetaStore 作为持久化的 ...
  • 近期文章

    更多
    文章目录

      推荐作者

      更多