diff --git a/docs/reference/sql/create.md b/docs/reference/sql/create.md index f7c229881..84c13e6f8 100644 --- a/docs/reference/sql/create.md +++ b/docs/reference/sql/create.md @@ -499,10 +499,14 @@ CREATE [OR REPLACE] FLOW [ IF NOT EXISTS ] SINK TO [ EXPIRE AFTER ] [ COMMENT '' ] +[ WITH ( = [, ...]) ] AS ; ``` +The `WITH` clause specifies flow options. +For example, the experimental `experimental_enable_incremental_read` option enables incremental source reads for eligible batching flows. + For `CREATE FLOW`, the query after `AS` can be a regular flow query or a TQL query. GreptimeDB also supports a strict TQL CTE form for cleaner flow definitions: ```sql diff --git a/docs/user-guide/flow-computation/manage-flow.md b/docs/user-guide/flow-computation/manage-flow.md index ccfeed50c..a073452f6 100644 --- a/docs/user-guide/flow-computation/manage-flow.md +++ b/docs/user-guide/flow-computation/manage-flow.md @@ -101,6 +101,7 @@ CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] SINK TO [ EXPIRE AFTER ] [ COMMENT '' ] +[ WITH ( = [, ...]) ] AS ; ``` @@ -117,6 +118,8 @@ Conversely, when `IF NOT EXISTS` is specified, the command will have no effect i - `EXPIRE AFTER` is an optional interval to expire the data from the Flow engine. For more details, please refer to the [`EXPIRE AFTER`](#expire-after) part. - `COMMENT` is the description of the flow. +- `WITH` specifies flow options. + For example, the experimental `experimental_enable_incremental_read` option enables incremental source reads for eligible batching flows. - `SQL` part defines the continuous aggregation query. It defines the source tables provide data for the flow. Each flow can have multiple source tables. @@ -158,6 +161,53 @@ For example, if the flow engine processes the aggregation at 10:00:00 and the `' any input data that arrive now with a time index older than 1 hour (before 09:00:00) will expire and be ignore. Only data timestamped from 09:00:00 onwards will be used in the aggregation and update to sink table. +### Experimental incremental source reads + +:::warning Experimental feature +The `experimental_enable_incremental_read` option is experimental. +Its behavior and limitations may change in future releases. +::: + +For batching SQL flows whose source tables are append-only, you can enable incremental source reads: + +```sql +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +) WITH ('append_mode' = 'true'); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +WITH (experimental_enable_incremental_read = 'true') +AS +SELECT + sensor_id, + loc, + max(temperature) AS max_temp, + date_bin('10 seconds'::INTERVAL, ts) AS time_window +FROM temp_sensor_data +GROUP BY + sensor_id, + loc, + time_window; +``` + +When this option is enabled, Flow keeps per-region source sequence watermarks and attempts to read only newly appended source rows after the initial full snapshot. +This is an execution optimization and does not change the query result. + +The current limitations are: + +- All source tables must be append-only tables created with `append_mode = 'true'`. + Flow creation fails if any source table is not append-only. +- The optimization only applies to batching SQL flows. + TQL flows, unsupported aggregate shapes, and simple projection/filter flows do not use incremental source reads. +- Source tables created with `ttl = 'instant'` currently use streaming mode and do not use this batching-mode option. +- The first run still needs a full snapshot. + Later runs may fall back to full snapshot or retry/repair when GreptimeDB cannot safely use incremental source reads. + ### Write a SQL query The `SQL` part of the flow is similar to a standard `SELECT` clause with a few differences. The syntax of the query is as follows: diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/reference/sql/create.md b/i18n/zh/docusaurus-plugin-content-docs/current/reference/sql/create.md index c7d9895f4..f3ab4f251 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/reference/sql/create.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/reference/sql/create.md @@ -503,10 +503,14 @@ CREATE [OR REPLACE] FLOW [ IF NOT EXISTS ] SINK TO [ EXPIRE AFTER ] [ COMMENT '' ] +[ WITH ( = [, ...]) ] AS ; ``` +`WITH` 子句用于指定 flow 选项。 +例如,实验性的 `experimental_enable_incremental_read` 选项可以为符合条件的 batching flow 启用增量 source 读取。 + 对于 `CREATE FLOW`,`AS` 后面的查询既可以是常规 Flow 查询,也可以是 TQL 查询。GreptimeDB 现在还支持一种严格受限的 TQL CTE 写法,用来让 Flow 定义更清晰: ```sql diff --git a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/flow-computation/manage-flow.md b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/flow-computation/manage-flow.md index 07636520a..6d7da7353 100644 --- a/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/flow-computation/manage-flow.md +++ b/i18n/zh/docusaurus-plugin-content-docs/current/user-guide/flow-computation/manage-flow.md @@ -90,6 +90,7 @@ CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] SINK TO [ EXPIRE AFTER ] [ COMMENT '' ] +[ WITH ( = [, ...]) ] AS ; ``` @@ -103,6 +104,8 @@ AS - `EXPIRE AFTER` 是一个可选的时间间隔,用于从 Flow 引擎中过期数据。 有关更多详细信息,请参考 [`EXPIRE AFTER`](#expire-after) 部分。 - `COMMENT` 是 flow 的描述。 +- `WITH` 指定 flow 选项。 + 例如,实验性的 `experimental_enable_incremental_read` 选项可以为符合条件的 batching flow 启用增量读取 source 表。 - `SQL` 部分定义了用于持续聚合的查询。 它定义了为 flow 提供数据的源表。 每个 flow 可以有多个源表。 @@ -144,6 +147,53 @@ source 表中超出指定过期时间的数据将不再被包含在 flow 的计 当前时刻若输入数据的 Time Index 超过 1 小时(即早于 09:00:00),则会被判定为过期数据并被忽略。 仅时间戳为 09:00:00 及之后的数据会参与聚合计算,并更新到目标表。 +### 实验性的增量 source 读取 + +:::warning 实验性功能 +`experimental_enable_incremental_read` 选项是实验性的。 +它的行为和限制可能会在未来版本中变化。 +::: + +对于 source 表为 append-only 表的 batching SQL flow,可以启用增量 source 读取: + +```sql +CREATE TABLE temp_sensor_data ( + sensor_id INT, + loc STRING, + temperature DOUBLE, + ts TIMESTAMP TIME INDEX, + PRIMARY KEY(sensor_id, loc) +) WITH ('append_mode' = 'true'); + +CREATE FLOW temp_monitoring +SINK TO temp_alerts +WITH (experimental_enable_incremental_read = 'true') +AS +SELECT + sensor_id, + loc, + max(temperature) AS max_temp, + date_bin('10 seconds'::INTERVAL, ts) AS time_window +FROM temp_sensor_data +GROUP BY + sensor_id, + loc, + time_window; +``` + +启用该选项后,Flow 会维护每个 region 的 source sequence watermark,并在初始全量快照之后尝试只读取新追加的 source 行。 +这是一个执行优化,不会改变查询结果。 + +当前限制如下: + +- 所有 source 表都必须是使用 `append_mode = 'true'` 创建的 append-only 表。 + 如果任意 source 表不是 append-only 表,创建 Flow 会失败。 +- 该优化只适用于 batching SQL flow。 + TQL flow、不支持的聚合形态以及简单的 projection/filter flow 不会使用增量 source 读取。 +- 使用 `ttl = 'instant'` 创建的 source 表当前会使用 streaming 模式,不会使用这个 batching 模式选项。 +- 首次运行仍然需要全量快照。 + 之后的运行在 GreptimeDB 无法安全使用增量 source 读取时,可能会回退到全量快照,或者进行重试/修复。 + ### 编写 SQL 查询 flow 的 `SQL` 部分类似于标准的 `SELECT` 子句,但有一些不同之处。查询的语法如下: