Using Logstore
Due to the limitations of traditional offline data warehouse architectures in supporting real-time business needs, real-time data warehousing has experienced rapid evolution in recent years. In the architecture of real-time data warehousing, Apache Kafka is often used as the storage system for real-time data. However, this also brings about the issue of data disconnection between offline data warehouses.
Developers often need to pay attention to data stored in HDFS as well as data in Kafka, which increases the complexity of business development. Therefore, Amoro proposes the addition of an optional parameter, “LogStore enabled” (log-store.enabled
), to the table configuration. This allows for retrieving data with sub-second and minute-level latency by operating on a single table while ensuring the eventual consistency of data from both sources.
Real-Time data in LogStore
Amoro tables provide two types of storage: FileStore and LogStore. FileStore stores massive full data, while LogStore stores real-time incremental data.
Real-time data can provide second-level data visibility and ensure data consistency without enabling LogStore transactions.
Its underlying storage system can be connected to external message queuing middleware, currently supporting only Kafka and Pulsar.
Users can enable LogStore by configuring the following parameters when creating an Amoro table. For specific configurations, please refer to LogStore related configurations.
Overview
Flink | Kafka |
---|---|
Flink 1.15 | ✔ |
Flink 1.16 | ✔ |
Flink 1.17 | ✔ |
Kafka as LogStore Version Description:
Flink Version | Kafka Versions |
---|---|
1.15.x | 0.10.2.* 0.11.* 1.* 2.* 3.* |
1.16.x | 0.10.2.* 0.11.* 1.* 2.* 3.* |
1.17.x | 0.10.2.* 0.11.* 1.* 2.* 3.* |
Prerequisites for using LogStore
When creating an Amoro table, LogStore needs to be enabled.
- You can create a table after selecting a specific Catalog on the Amoro Dashboard - Terminal page
CREATE TABLE db.log_table (
id int,
name string,
ts timestamp,
primary key (id)
) using mixed_iceberg
tblproperties (
"log-store.enabled" = "true",
"log-store.topic"="topic_log_test",
"log-store.address"="localhost:9092"
);
- You can also use Flink SQL to create tables in Flink-SQL-Client
-- First use the use catalog command to switch to the mixed-format catalog.
CREATE TABLE db.log_table (
id int,
name string,
ts timestamp,
primary key (id) not enforced
) WITH (
'log-store.enabled' = 'true',
'log-store.topic'='topic_log_test',
'log-store.address'='localhost:9092');
Double write LogStore and FileStore
Amoro Connector writes data to LogStore and ChangeStore at the same time through double-write operations, without opening Kafka transactions to ensure data consistency between the two, because opening transactions will bring a few minutes of delay to downstream tasks (the specific delay time depends on upstream tasks checkpoint interval).
INSERT INTO db.log_table /*+ OPTIONS('mixed-format.emit.mode'='log') */
SELECT id, name, ts from sourceTable;
Currently, only the Apache Flink engine implements the dual-write LogStore and FileStore.