Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/reference/sql/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,14 @@ CREATE [OR REPLACE] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
[ WITH (<flow-option> = <value> [, ...]) ]
AS
<SQL>;
```

The `WITH` clause specifies flow options.
For example, the experimental `experimental_enable_incremental_read` option enables incremental source reads for eligible batching flows.

For `CREATE FLOW`, the query after `AS` can be a regular flow query or a TQL query. GreptimeDB also supports a strict TQL CTE form for cleaner flow definitions:

```sql
Expand Down
50 changes: 50 additions & 0 deletions docs/user-guide/flow-computation/manage-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
[ WITH (<flow-option> = <value> [, ...]) ]
Comment thread
v0y4g3r marked this conversation as resolved.
AS
<SQL>;
```
Expand All @@ -117,6 +118,8 @@ Conversely, when `IF NOT EXISTS` is specified, the command will have no effect i
- `EXPIRE AFTER` is an optional interval to expire the data from the Flow engine.
For more details, please refer to the [`EXPIRE AFTER`](#expire-after) part.
- `COMMENT` is the description of the flow.
- `WITH` specifies flow options.
For example, the experimental `experimental_enable_incremental_read` option enables incremental source reads for eligible batching flows.
- `SQL` part defines the continuous aggregation query.
It defines the source tables provide data for the flow.
Each flow can have multiple source tables.
Expand Down Expand Up @@ -158,6 +161,53 @@ For example, if the flow engine processes the aggregation at 10:00:00 and the `'
any input data that arrive now with a time index older than 1 hour (before 09:00:00) will expire and be ignore.
Only data timestamped from 09:00:00 onwards will be used in the aggregation and update to sink table.

### Experimental incremental source reads

:::warning Experimental feature
The `experimental_enable_incremental_read` option is experimental.
Its behavior and limitations may change in future releases.
:::

For batching SQL flows whose source tables are append-only, you can enable incremental source reads:

```sql
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
) WITH ('append_mode' = 'true');

CREATE FLOW temp_monitoring
SINK TO temp_alerts
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sensor_id,
loc,
max(temperature) AS max_temp,
date_bin('10 seconds'::INTERVAL, ts) AS time_window
FROM temp_sensor_data
GROUP BY
sensor_id,
loc,
time_window;
```

When this option is enabled, Flow keeps per-region source sequence watermarks and attempts to read only newly appended source rows after the initial full snapshot.
This is an execution optimization and does not change the query result.

The current limitations are:

- All source tables must be append-only tables created with `append_mode = 'true'`.
Flow creation fails if any source table is not append-only.
- The optimization only applies to batching SQL flows.
TQL flows, unsupported aggregate shapes, and simple projection/filter flows do not use incremental source reads.
- Source tables created with `ttl = 'instant'` currently use streaming mode and do not use this batching-mode option.
- The first run still needs a full snapshot.
Later runs may fall back to full snapshot or retry/repair when GreptimeDB cannot safely use incremental source reads.

### Write a SQL query

The `SQL` part of the flow is similar to a standard `SELECT` clause with a few differences. The syntax of the query is as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,14 @@ CREATE [OR REPLACE] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
[ WITH (<flow-option> = <value> [, ...]) ]
AS
<SQL>;
```

`WITH` 子句用于指定 flow 选项。
例如,实验性的 `experimental_enable_incremental_read` 选项可以为符合条件的 batching flow 启用增量 source 读取。

对于 `CREATE FLOW`,`AS` 后面的查询既可以是常规 Flow 查询,也可以是 TQL 查询。GreptimeDB 现在还支持一种严格受限的 TQL CTE 写法,用来让 Flow 定义更清晰:

```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
[ WITH (<flow-option> = <value> [, ...]) ]
AS
<SQL>;
```
Expand All @@ -103,6 +104,8 @@ AS
- `EXPIRE AFTER` 是一个可选的时间间隔,用于从 Flow 引擎中过期数据。
有关更多详细信息,请参考 [`EXPIRE AFTER`](#expire-after) 部分。
- `COMMENT` 是 flow 的描述。
- `WITH` 指定 flow 选项。
例如,实验性的 `experimental_enable_incremental_read` 选项可以为符合条件的 batching flow 启用增量读取 source 表。
- `SQL` 部分定义了用于持续聚合的查询。
它定义了为 flow 提供数据的源表。
每个 flow 可以有多个源表。
Expand Down Expand Up @@ -144,6 +147,53 @@ source 表中超出指定过期时间的数据将不再被包含在 flow 的计
当前时刻若输入数据的 Time Index 超过 1 小时(即早于 09:00:00),则会被判定为过期数据并被忽略。
仅时间戳为 09:00:00 及之后的数据会参与聚合计算,并更新到目标表。

### 实验性的增量 source 读取

:::warning 实验性功能
`experimental_enable_incremental_read` 选项是实验性的。
它的行为和限制可能会在未来版本中变化。
:::

对于 source 表为 append-only 表的 batching SQL flow,可以启用增量 source 读取:

```sql
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
) WITH ('append_mode' = 'true');

CREATE FLOW temp_monitoring
SINK TO temp_alerts
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sensor_id,
loc,
max(temperature) AS max_temp,
date_bin('10 seconds'::INTERVAL, ts) AS time_window
FROM temp_sensor_data
GROUP BY
sensor_id,
loc,
time_window;
```

启用该选项后,Flow 会维护每个 region 的 source sequence watermark,并在初始全量快照之后尝试只读取新追加的 source 行。
这是一个执行优化,不会改变查询结果。

当前限制如下:

- 所有 source 表都必须是使用 `append_mode = 'true'` 创建的 append-only 表。
如果任意 source 表不是 append-only 表,创建 Flow 会失败。
- 该优化只适用于 batching SQL flow。
TQL flow、不支持的聚合形态以及简单的 projection/filter flow 不会使用增量 source 读取。
- 使用 `ttl = 'instant'` 创建的 source 表当前会使用 streaming 模式,不会使用这个 batching 模式选项。
- 首次运行仍然需要全量快照。
之后的运行在 GreptimeDB 无法安全使用增量 source 读取时,可能会回退到全量快照,或者进行重试/修复。

### 编写 SQL 查询

flow 的 `SQL` 部分类似于标准的 `SELECT` 子句,但有一些不同之处。查询的语法如下:
Expand Down
Loading