Flink 的 [Table API] 是流批统一的 API。 这意味着 Table API \& SQL 在无论有限的批式输入还是无限的流式输入下，都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的， 关系型查询在流式场景下不如在批式场景下容易懂。
Table programs that run in streaming mode leverage all capabilities of Flink as a stateful stream processor. In particular, a table program can be configured with a [state backend] for handling different requirements regarding state size and fault tolerance. It is possible to take a savepoint of a running Table API \& SQL pipeline and to restore the application's state at a later point in time. State Usage Due to the declarative nature of Table API \& SQL programs, it is not always obvious where and how much state is used within a pipeline. The planner decides whether state is necessary to compute a correct result. A pipeline is optimized to claim as little state as possible given the current set of optimizer rules.
Conceptually, source tables are never kept entirely in state. An implementer deals with logical tables (i.e. [dynamic tables]. Their state requirements depend on the used operations.
Queries such as
SELECT ... FROM ... WHERE which only consist of field projections or filters are usually stateless pipelines. However, operations such as joins, aggregations, or deduplications require keeping intermediate results in a fault-tolerant storage for which Flink's state abstractions are used.
Please refer to the individual operator documentation for more details about how much state is required and how to limit a potentially ever-growing state size.
For example, a regular SQL join of two tables requires the operator to keep both input tables in state entirely. For correct SQL semantics, the runtime needs to assume that a matching could occur at any point in time from both sides. Flink provides [optimized window and interval joins]. Another example is the following query that computes the number of clicks per session.
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
sessionId attribute is used as a grouping key and the continuous query maintains a count for each
sessionId it observes. The
sessionId attribute is evolving over time and
sessionId values are only active until the session ends, i.e., for a limited period of time. However, the continuous query cannot know about this property of
sessionId and expects that every
sessionId value can occur at any point of time. It maintains a count for each observed
sessionId value. Consequently, the total state size of the query is continuously growing as more and more
sessionId values are observed.
Idle State Retention Time
The Idle State Retention Time parameter [table.exec.state.ttl] defines for how long the state of a key is retained without being updated before it is removed. For the previous example query, the count of a
sessionId would be removed as soon as it has not been updated for the configured period of time.
By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means that the count of a
sessionId would start again at
Stateful Upgrades and Evolution
Table programs that are executed in streaming mode are intended as standing queries which means they are defined once and are continuously evaluated as static end-to-end pipelines.
In case of stateful pipelines, any change to both the query or Flink's planner might lead to a completely different execution plan. This makes stateful upgrades and the evolution of table programs challenging at the moment. The community is working on improving those shortcomings.
For example, by adding a filter predicate, the optimizer might decide to reorder joins or change the schema of an intermediate operator. This prevents restoring from a savepoint due to either changed topology or different column layout within the state of an operator.
The query implementer must ensure that the optimized plans before and after the change are compatible. Use the
EXPLAIN command in SQL or
table.explain() in Table API to [get insights].
Since new optimizer rules are continuously added, and operators become more efficient and specialized, also the upgrade to a newer Flink version could lead to incompatible plans.
Currently, the framework cannot guarantee that state can be mapped from a savepoint to a new table operator topology.
In other words: Savepoints are only supported if both the query and the Flink version remain constant.
Since the community rejects contributions that modify the optimized plan and the operator topology in a patch version (e.g. from
1.13.2), it should be safe to upgrade a Table API \& SQL pipeline to a newer bug fix release. However, major-minor upgrades from (e.g. from
1.13) are not supported.
For both shortcomings (i.e. modified query and modified Flink version), we recommend to investigate whether the state of an updated table program can be “warmed up” (i.e. initialized) with historical data again before switching to real-time data. The Flink community is working on a [hybrid source] to make this switching as convenient as possible.
- [动态表]: 描述了动态表的概念。
- [时间属性]: 解释了时间属性以及它是如何在 Table API \& SQL 中使用的。
- [流上的 Join]: 支持的几种流上的 Join。
- [时态（temporal）表]: 描述了时态表的概念。
- [查询配置]: Table API \& SQL 特定的配置。
- Flink应用开发-Table API-集合操作
- Flink应用开发-Table API-配置
- Flink应用开发-Table API-自定义函数
- Flink应用开发-Table API-系统（内置）函数
- Flink应用开发-Table API-窗口聚合
- Flink应用开发-Table API-窗口函数
- Flink应用开发-Table API-窗口关联
- Flink应用开发-Table API-窗口 Top-N
- Flink应用开发-Table API-流式聚合
- Flink应用开发-Table API-流式概念
- Flink应用开发-Table API-模式检测
- Flink应用开发-Table API-模块
- Flink应用开发-Table API-概览
- Flink应用开发-Table API-概念与通用 API
- Flink应用开发-Table API-时间属性
- Flink应用开发-Table API-时态表（Temporal Tables）
- Flink应用开发-Table API-时区
- Flink应用开发-Table API-去重
- Flink应用开发-Table API-动态表 (Dynamic Table)
- Flink应用开发-Table API-分组聚合
- Flink应用开发-Table API-函数
- Flink应用开发-Table API-入门
- Flink应用开发-Table API-WITH 语句
- Flink应用开发-Table API-User-defined Sources & Sinks
- Flink应用开发-Table API-USE 语句
- Flink应用开发-Table API-UNLOAD 语句
- Flink应用开发-Table API-Top-N
- Flink应用开发-Table API-Temporal Table Function
- Flink应用开发-Table API-Table API & SQL
- Flink应用开发-Table API-Table API
- Flink应用开发-Table API-SQL 客户端
- Flink应用开发-Table API-SQL
- Flink应用开发-Table API-SHOW 语句
- Flink应用开发-Table API-SET 语句
- Flink应用开发-Table API-SELECT 与 WHERE
- Flink应用开发-Table API-SELECT DISTINCT
- Flink应用开发-Table API-RESET 语句
- Flink应用开发-Table API-Queries 查询
- Flink应用开发-Table API-Over聚合
- Flink应用开发-Table API-ORDER BY 语句
- Flink应用开发-Table API-LOAD 语句
- Flink应用开发-Table API-LIMIT 语句
- Flink应用开发-Table API-Join
- Flink应用开发-Table API-JAR Statements
- Flink应用开发-Table API-INSERT 语句
- Flink应用开发-Table API-Hints
- Flink应用开发-Table API-EXPLAIN Statements
- Flink应用开发-Table API-DataStream API Integration
- Flink应用开发-Table API-Data Types
- Flink应用开发-Table API-DROP 语句
- Flink应用开发-Table API-DESCRIBE Statements
- Flink应用开发-Table API-Catalogs
- Flink应用开发-Table API-CREATE 语句
- Flink应用开发-Table API-ALTER 语句