Flink DML
Querying with SQL
Amoro tables support reading data in stream or batch mode through Flink SQL. You can switch modes using the following methods:
-- Run Flink tasks in streaming mode in the current session
SET execution.runtime-mode = streaming;
-- Run Flink tasks in batch mode in the current session
SET execution.runtime-mode = batch;
The following Hint Options are supported:
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
source.parallelism | (none) | Integer | No | Defines a custom parallelism for the source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |
Batch mode
Use batch mode to read full and incremental data from FileStore.
TIPS
LogStore does not support bounded reading.
-- Run Flink tasks in batch mode in the current session
SET execution.runtime-mode = batch;
-- Enable dynamic table parameter configuration to make hint options configured in Flink SQL effective
SET table.dynamic-table-options.enabled=true;
Batch mode (non-primary key table)
Non-primary key tables support reading full data in batch mode, specifying snapshot data with snapshot-id or timestamp, and specifying the incremental data of the snapshot interval.
-- Read full data
SELECT * FROM unkeyed /*+ OPTIONS('streaming'='false')*/;
-- Read specified snapshot data
SELECT * FROM unkeyed /*+ OPTIONS('snapshot-id'='4411985347497777546')*/;
The supported parameters for bounded reads of non-primary-key tables in BaseStore include:
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
snapshot-id | (none) | Long | No | Reading the full data of a specific snapshot only works when streaming is set to false. |
as-of-timestamp | (none) | Long | No | Reading the full data of the latest snapshot taken before the specified timestamp only works when streaming is set to false. |
start-snapshot-id | (none) | Long | No | When streaming is set to false, you need to specify the end-snapshot-id to read the incremental data within two intervals (snapshot1, snapshot2]. When streaming is set to true, you can read the incremental data after the specified snapshot. If not specified, it will read the incremental data after the current snapshot (excluding the current one). |
end-snapshot-id | (none) | Long | No | When streaming is set to false, you need to specify the start-snapshot-id to read the incremental data within two intervals (snapshot1, snapshot2]. |
other table parameters | (none) | String | No | All parameters of an Amoro table can be dynamically modified through SQL Hint, but only for the current task. For a list of specific parameters, please refer to Table Configurations. For permissions-related configurations on the catalog, they can also be configured in Hint using parameters such as properties.auth.XXX in catalog DDL |
Batch mode (primary key table)
-- Merge on Read the current mixed-format table and return append-only data.
SELECT * FROM keyed /*+ OPTIONS('streaming'='false', 'scan.startup.mode'='earliest')*/;
Streaming mode
Amoro supports reading incremental data from FileStore or LogStore in streaming mode.
Streaming mode (LogStore)
-- Run Flink tasks in streaming mode in the current session
SET execution.runtime-mode = streaming;
-- Enable dynamic table parameter configuration to make hint options configured in Flink SQL effective
SET table.dynamic-table-options.enabled=true;
SELECT * FROM test_table /*+ OPTIONS('mixed-format.read.mode'='log') */;
The following Hint Options are supported:
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
mixed-format.read.mode | file | String | No | To specify the type of data to read from an Amoro table, either File or Log, use the mixed-format.read.mode parameter. If the value is set to log, the Log configuration must be enabled. |
scan.startup.mode | latest | String | No | The valid values are ’earliest’, ’latest’, ’timestamp’, ‘group-offsets’ and ‘specific-offsets’. ’earliest’ reads the data from the earliest offset possible. ’latest’ reads the data from the latest offset. ‘timestamp’ reads from a specified time position, which requires configuring the ‘scan.startup.timestamp-millis’ parameter. ‘group-offsets’ reads the data from committed offsets in ZK / Kafka brokers of a specific consumer group. ‘specific-offsets’ read the data from user-supplied specific offsets for each partition, which requires configuring the ‘scan.startup.specific-offsets’ parameter. |
scan.startup.timestamp-millis | (none) | Long | No | Valid when ‘scan.startup.mode’ = ’timestamp’, reads data from the specified Kafka time with a millisecond timestamp starting at 00:00:00.000 GMT on 1 Jan 1970 |
scan.startup.specific-offsets | (none) | String | No | specify offsets for each partition in case of ‘specific-offsets’ startup mode, e.g. ‘partition:0,offset:42;partition:1,offset:300’. |
properties.group.id | (none) | String | If the LogStore for an Amoro table is Kafka, it is mandatory to provide its details while querying the table. Otherwise, it can be left empty. | The group id used to read the Kafka Topic |
properties.pulsar.admin.adminUrl | (none) | String | Required if LogStore is pulsar, otherwise not required | Pulsar admin’s HTTP URL, e.g. http://my-broker.example.com:8080 |
properties.* | (none) | String | No | Parameters for Logstore: For Logstore with Kafka (’log-store.type’=‘kafka’ default value), all other parameters supported by the Kafka Consumer can be set by prefixing properties. to the parameter name, for example, ‘properties.batch.size’=‘16384’. The complete parameter information can be found in the Kafka official documentation; For LogStore set to Pulsar (’log-store.type’=‘pulsar’), all relevant configurations supported by Pulsar can be set by prefixing properties. to the parameter name, for example: ‘properties.pulsar.client.requestTimeoutMs’=‘60000’. For complete parameter information, refer to the Flink-Pulsar-Connector documentation |
log.consumer.changelog.modes | all-kinds | String | No | The type of RowKind that will be generated when reading log data, supports: all-kinds, append-only. all-kinds: will read cdc data, including +I/-D/-U/+U; append-only: will only generate Insert data, recommended to use this configuration when reading without primary key. |
Notes
- When log-store.type = pulsar, the parallelism of the Flink task cannot be less than the number of partitions in the Pulsar topic, otherwise some partition data cannot be read.
- When the number of topic partitions in log-store is less than the parallelism of the Flink task, some Flink subtasks will be idle. At this time, if the task has a watermark, the parameter table.exec.source.idle-timeout must be configured, otherwise the watermark will not advance. See official documentation for details.
Streaming mode (FileStore non-primary key table)
-- Run Flink tasks in streaming mode in the current session
SET execution.runtime-mode = streaming;
-- Enable dynamic table parameter configuration to make hint options configured in Flink SQL effective
SET table.dynamic-table-options.enabled = true;
-- Read incremental data after the current snapshot.
SELECT * FROM unkeyed /*+ OPTIONS('monitor-interval'='1s')*/ ;
Hint Options
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
streaming | true | Boolean | No | Reads bounded data or unbounded data in a streaming mode, false: reads bounded data, true: reads unbounded data |
mixed-format.read.mode | file | String | No | To specify the type of data to read from an Amoro table, either File or Log, use the mixed-format.read.mode parameter. If the value is set to log, the Log configuration must be enabled. |
monitor-interval | 10s | Duration | No | The mixed-format.read.mode = file parameter needs to be set for this to take effect. The time interval for monitoring newly added data files |
start-snapshot-id | (none) | Long | No | To read incremental data starting from a specified snapshot (excluding the data in the start-snapshot-id snapshot), specify the snapshot ID using the start-snapshot-id parameter. If not specified, the reader will start reading from the snapshot after the current one (excluding the data in the current snapshot). |
other table parameters | (none) | String | No | All parameters of an Amoro table can be dynamically modified through SQL Hints, but they only take effect for this specific task. For the specific parameter list, please refer to the Table Configuration. For permissions-related configurations on the catalog, they can also be configured in Hint using parameters such as [properties.auth.XXX in catalog DDL](./flink-ddl.md#Flink SQL) |
Streaming Mode (FileStore primary key table)
After using CDC (Change Data Capture) to ingest data into the lake, you can use the Flink engine to read both stock data and incremental data in the same task without restarting the task, and ensure consistent data reading. Amoro Source will save the file offset information in the Flink state.
In this way, the task can continue to read data from the last read offset position, ensuring data consistency and being able to process newly arrived incremental data.
-- Run Flink tasks in streaming mode in the current session
SET execution.runtime-mode = streaming;
-- Enable dynamic table parameter configuration to make hint options configured in Flink SQL effective
SET table.dynamic-table-options.enabled = true;
-- Incremental unified reading of BaseStore and ChangeStore
SELECT * FROM keyed /*+ OPTIONS('streaming'='true', 'scan.startup.mode'='earliest')*/;
Hint Options
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
streaming | true | String | No | Reads bounded data or unbounded data in a streaming mode, false: reads bounded data, true: reads unbounded data |
mixed-format.read.mode | file | String | No | Specifies the data to read from an Amoro table, either file or log. If the value is “log”, Log configuration must be enabled |
monitor-interval | 10s | String | No | This parameter only takes effect when mixed-format.read.mode = file. It sets the time interval for monitoring newly added data files |
scan.startup.mode | latest | String | No | The valid values are ’earliest’, ’latest’. ’earliest’ reads the full table data and will continue to read incremental data when streaming=true. ’latest’ reads only the data after the current snapshot, not including the data in the current snapshot. |
other table parameters | (none) | String | No | All parameters of an Amoro table can be dynamically modified through SQL Hints, but they only take effect for this specific task. For the specific parameter list, please refer to the Table Configuration. For permissions-related configurations on the catalog, they can also be configured in Hint using parameters such as properties.auth.XXX in catalog DDL |
Writing With SQL
Amoro tables support writing data to LogStore or FileStore using Flink SQL.
INSERT OVERWRITE
Currently, INSERT OVERWRITE is only supported for non-primary key tables. It replaces the data in the table, and the overwrite operation is atomic. Partitions are dynamically generated from the query statement, and the data in these partitions will be fully replaced.
INSERT OVERWRITE only allows running in Flink Batch mode.
INSERT OVERWRITE unkeyed VALUES (1, 'a', '2022-07-01');
-- It is also possible to overwrite data for a specific partition:
INSERT OVERWRITE `mixed_catalog`.`mixed_db`.`unkeyed` PARTITION(data='2022-07-01') SELECT 5, 'b';
For non-partitioned tables, INSERT OVERWRITE will overwrite the entire data in the table.
INSERT INTO
For Amoro tables, it is possible to specify whether to write data to FileStore or LogStore.
For Amoro primary key tables, writing to FileStore will also write CDC data to the ChangeStore.
INSERT INTO `mixed_catalog`.`mixed_db`.`test_table`
/*+ OPTIONS('mixed-format.emit.mode'='log,file') */
SELECT id, name from `source`;
Hint Options
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
mixed-format.emit.mode | auto | String | No | Data writing modes currently supported are: file, log, and auto. For example: ‘file’ means data is only written to the Filestore. ’log’ means data is only written to the Logstore. ‘file,log’ means data is written to both the Filestore and the Logstore. ‘auto’ means data is written only to the Filestore if the Logstore for the Amoro table is disabled. If the Logstore for the Amoro table is enabled, it means data is written to both the Filestore and the Logstore. It is recommended to use ‘auto’. |
mixed-format.emit.auto-write-to-logstore.watermark-gap | (none) | Duration | No | This feature is only enabled when ‘mixed-format.emit.mode’=‘auto’. If the watermark of the Amoro writers is greater than the current system timestamp minus a specific value, the writers will also write data to the Logstore. The default setting is to enable the Logstore writer immediately after the job starts. The value for this feature must be greater than 0. |
log.version | v1 | String | No | The log data format currently has only one version, so it can be left empty |
sink.parallelism | (none) | String | No | The parallelism for writing to the Filestore and Logstore is determined separately. The parallelism for submitting the file operator is always 1. |
write.distribution-mode | hash | String | No | The distribution modes for writing to the Amoro table include: none and hash. |
write.distribution.hash-mode | auto | String | No | The hash strategy for writing to an Amoro table only takes effect when write.distribution-mode=hash. The available options are: primary-key, partition-key, primary-partition-key, and auto. primary-key: Shuffle by primary key partition-key: Shuffle by partition key primary-partition-key: Shuffle by primary key and partition key auto: If the table has both a primary key and partitions, use primary-partition-key; if the table has a primary key but no partitions, use primary-key; if the table has partitions but no primary key, use partition-key. Otherwise, use none. |
properties.pulsar.admin.adminUrl | (none) | String | If the LogStore is Pulsar and it is required for querying, it must be filled in, otherwise it can be left empty. | The HTTP URL for Pulsar Admin is in the format: http://my-broker.example.com:8080. |
properties.* | (none) | String | No | Parameters for Logstore: For Logstore with Kafka (’log-store.type’=‘kafka’ default value), all other parameters supported by the Kafka Consumer can be set by prefixing properties. to the parameter name, for example, ‘properties.batch.size’=‘16384’. The complete parameter information can be found in the Kafka official documentation; For LogStore set to Pulsar (’log-store.type’=‘pulsar’), all relevant configurations supported by Pulsar can be set by prefixing properties. to the parameter name, for example: ‘properties.pulsar.client.requestTimeoutMs’=‘60000’. For complete parameter information, refer to the Flink-Pulsar-Connector documentation |
other table parameters | (none) | String | No | All parameters of an Amoro table can be dynamically modified through SQL Hints, but they only take effect for this specific task. For the specific parameter list, please refer to the Table Configuration. For permissions-related configurations on the catalog, they can also be configured in Hint using parameters such as properties.auth.XXX in catalog DDL |
Lookup join with SQL
A Lookup Join is used to enrich a table with data that is queried from Amoro Table. The join requires one table to have a processing time attribute and the other table to be backed by a lookup source connector.
The following example shows the syntax to specify a lookup join.
-- amoro flink connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'mixed-format',
'metastore.url' = '',
'mixed-format.catalog' = '',
'mixed-format.database' = '',
'mixed-format.table' = '',
'lookup.cache.max-rows' = ''
);
-- Create a temporary left table, like from kafka
CREATE TEMPORARY TABLE orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = '...',
'properties.bootstrap.servers' = '...',
'format' = 'json'
...
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
Lookup Options
Key | Default Value | Type | Required | Description |
---|---|---|---|---|
lookup.cache.max-rows | 10000 | Long | No | The maximum number of rows in the lookup cache, beyond which the oldest row will expire. |
lookup.reloading.interval | 10s | Duration | No | Configuration option for specifying the interval in seconds to reload lookup data in RocksDB. |
lookup.cache.ttl-after-write | 0s | Duration | No | The TTL after which the row will expire in the lookup cache. |
rocksdb.auto-compactions | false | Boolean | No | Enable automatic compactions during the initialization process. After the initialization completed, will enable the auto_compaction. |
rocksdb.writing-threads | 5 | Int | No | Writing data into rocksDB thread number. |
rocksdb.block-cache.capacity | 1048576 | Long | No | Use the LRUCache strategy for blocks, the size of the BlockCache can be configured based on your memory requirements and available system resources. |
rocksdb.block-cache.numShardBits | -1 | Int | No | Use the LRUCache strategy for blocks. The cache is sharded to 2^numShardBits shards, by hash of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and number of shard bits will not exceed 6. |
other table parameters | (none) | String | No | All parameters of an Amoro table can be dynamically modified through SQL Hints, but they only take effect for this specific task. For the specific parameter list, please refer to the Table Configuration. For permissions-related configurations on the catalog, they can also be configured in Hint using parameters such as properties.auth.XXX in catalog DDL |