Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docs/user-guide/flow-computation/manage-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ CREATE [ OR REPLACE ] FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT '<string>' ]
[ WITH (<flow-option> = <value> [, ...]) ]
Comment thread
v0y4g3r marked this conversation as resolved.
AS
<SQL>;
```
Expand All @@ -117,6 +118,8 @@ Conversely, when `IF NOT EXISTS` is specified, the command will have no effect i
- `EXPIRE AFTER` is an optional interval to expire the data from the Flow engine.
For more details, please refer to the [`EXPIRE AFTER`](#expire-after) part.
- `COMMENT` is the description of the flow.
- `WITH` specifies flow options.
For example, the experimental `experimental_enable_incremental_read` option enables incremental source reads for eligible batching flows.
- `SQL` part defines the continuous aggregation query.
It defines the source tables provide data for the flow.
Each flow can have multiple source tables.
Expand Down Expand Up @@ -158,6 +161,53 @@ For example, if the flow engine processes the aggregation at 10:00:00 and the `'
any input data that arrive now with a time index older than 1 hour (before 09:00:00) will expire and be ignore.
Only data timestamped from 09:00:00 onwards will be used in the aggregation and update to sink table.

### Experimental incremental source reads

:::warning Experimental feature
The `experimental_enable_incremental_read` option is experimental.
Its behavior and limitations may change in future releases.
:::

For batching SQL flows whose source tables are append-only, you can enable incremental source reads:

```sql
CREATE TABLE temp_sensor_data (
sensor_id INT,
loc STRING,
temperature DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(sensor_id, loc)
) WITH ('append_mode' = 'true');

CREATE FLOW temp_monitoring
SINK TO temp_alerts
WITH (experimental_enable_incremental_read = 'true')
AS
SELECT
sensor_id,
loc,
max(temperature) AS max_temp,
date_bin('10 seconds'::INTERVAL, ts) AS time_window
FROM temp_sensor_data
GROUP BY
sensor_id,
loc,
time_window;
```

When this option is enabled, Flow keeps per-region source sequence watermarks and attempts to read only newly appended source rows after the initial full snapshot.
This is an execution optimization and does not change the query result.

The current limitations are:

- All source tables must be append-only tables created with `append_mode = 'true'`.
Flow creation fails if any source table is not append-only.
- The optimization only applies to batching SQL flows.
TQL flows, unsupported aggregate shapes, and simple projection/filter flows do not use incremental source reads.
- Source tables created with `ttl = 'instant'` currently use streaming mode and do not use this batching-mode option.
- The first run still needs a full snapshot.
Later runs may fall back to full snapshot or retry/repair when GreptimeDB cannot safely use incremental source reads.

### Write a SQL query

The `SQL` part of the flow is similar to a standard `SELECT` clause with a few differences. The syntax of the query is as follows:
Expand Down
Loading