Flink DataStream

Reading with DataStream

Amoro supports reading data in Batch or Streaming mode through Java API.

Batch mode

Using Batch mode to read the full and incremental data in the FileStore.

  • Non-primary key tables support reading full data in batch mode, snapshot data with a specified snapshot-id or timestamp, and incremental data with a specified snapshot interval.
  • The primary key table temporarily only supports reading the current full amount and later CDC data.
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
InternalCatalogBuilder catalogBuilder =
    InternalCatalogBuilder
        .builder()
        .metastoreUrl("thrift://<url>:<port>/<catalog_name>");

TableIdentifier tableId = TableIdentifier.of("catalog_name", "database_name", "test_table");
AmoroTableLoader tableLoader = AmoroTableLoader.of(tableId, catalogBuilder);

Map<String, String> properties = new HashMap<>();
//  Default is true.
properties.put("streaming", "false");

DataStream<RowData> batch = 
    FlinkSource.forRowData()
        .env(env)
        .tableLoader(tableLoader)
        // The primary key table only supports reading the current full amount and later CDC data temporarily, without the properties parameter .
        .properties(properties)
        .build();

// print All data read
batch.print();

// Submit and execute the task
env.execute("Test Amoro Batch Read");

The map properties contain below keys, currently only valid for non-primary key tables:

Key Default Type Required Description
case-sensitive false Boolean No Case-sensitive
snapshot-id (none) Long No Read the full amount of data of the specified snapshot, only effective when streaming is false or not configured
as-of-timestamp (none) String No Read the last time less than the timestamp The full amount of snapshot data is valid only when streaming is false or not configured
start-snapshot-id (none) String No When streaming is false, end-snapshot-id needs to be used to read the two intervals Incremental data (snapshot1, snapshot2]. When streaming is true, read the incremental data after the snapshot, if not specified, read the incremental data after the current snapshot (not including the current one)
end-snapshot-id (none ) String No Need to cooperate with start-snapshot-id to read incremental data in two intervals (snapshot1, snapshot2]

Streaming mode

Amoro supports reading incremental data in FileStore or LogStore through Java API in Streaming mode

Streaming mode (LogStore)

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
InternalCatalogBuilder catalogBuilder = 
    InternalCatalogBuilder
        .builder()
        .metastoreUrl("thrift://<url>:<port>/<catalog_name>");

TableIdentifier tableId = TableIdentifier.of("catalog_name", "database_name", "test_table");
AmoroTableLoader tableLoader = AmoroTableLoader.of(tableId, catalogBuilder);

AmoroTable table = AmoroUtils.load(tableLoader);
// Read table All fields. If you only read some fields, you can construct the schema yourself, for example: 
// Schema userSchema = new Schema(new ArrayList<Types.NestedField>() {{
//   add(Types.NestedField.optional(0, "f_boolean", Types.BooleanType.get()));
//   add(Types.NestedField.optional(1, "f_int", Types.IntegerType.get()));
// }});
Schema schema = table.schema();

// -----------Hidden Kafka--------------
LogKafkaSource source = LogKafkaSource.builder(schema, table.properties()).build();

or

// -----------Hidden Pulsar--------------
LogPulsarSource source = LogPulsarSource.builder(schema, table.properties()).build();

DataStream<RowData> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Log Source");

// Print all the read data
stream.print();

// Submit and execute the task
env.execute("Test Amoro Stream Read");

Streaming mode (FileStore)

StreamExecutionEnvironment env = ...;
InternalCatalogBuilder catalogBuilder = ...;
TableIdentifier tableId = ...;
AmoroTableLoader tableLoader = ...;

Map<String, String> properties = new HashMap<>();
// default is true 
properties.put("streaming", "true");

DataStream<RowData> stream = 
    FlinkSource.forRowData()
        .env(env)
        .tableLoader(tableLoader)
        // The primary key table only supports reading the current full amount and later CDC data for the time being, without the properties parameter
        .properties(properties)
        .build();

// Print All read data 
stream.print();

// Submit and execute the task
env.execute("Test Amoro Stream Read");

StreamExecutionEnvironment env = ...; 
InternalCatalogBuilder catalogBuilder = ...; 
TableIdentifier tableId = ...; 
AmoroTableLoader tableLoader = ...; 
Map properties = new HashMap<>(); 
// default is true properties.put("streaming", "true"); 
DataStream stream = 
    FlinkSource.forRowData() 
        .env(env) 
        .tableLoader(tableLoader) 
        // The primary key table only supports reading the current full amount and later CDC data for the time being, without the properties parameter
        .properties(properties) 
        .build(); 

// print All read data 
stream.print(); 

// Submit and execute the task 
env.execute("Test Amoro Stream Read"); 

DataStream API supports reading primary key tables and non-primary key tables. The configuration items supported by properties can refer to Querying With SQL chapter Hint Option

Writing with DataStream

Amoro table supports writing data to LogStore or FileStore through Java API

Overwrite data

Amoro table currently Only supports the existing data in the dynamic Overwrite table of the non-primary key table

DataStream<RowData> input = ...;
InternalCatalogBuilder catalogBuilder = ...;
TableIdentifier tableId = ...;
AmoroTableLoader tableLoader = ...;

TableSchema FLINK_SCHEMA = TableSchema.builder()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("op_time", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
    .build();

FlinkSink
    .forRowData(input)
    .tableLoader(tableLoader)
    .overwrite(true)
    .flinkSchema(FLINK_SCHEMA)
    .build();

// Submit and execute the task
env.execute("Test Amoro Overwrite");
DataStream input = ...; InternalCatalogBuilder catalogBuilder = ...; TableIdentifier tableId = ...; AmoroTableLoader tableLoader = ...; TableSchema FLINK_SCHEMA = TableSchema.builder() .field("id", DataTypes.INT()) .field ("name", DataTypes.STRING()) .field("op_time", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()) .build(); FlinkSink .forRowData(input) .tableLoader(tableLoader) .overwrite(true) .flinkSchema(FLINK_SCHEMA) .build(); // Submit and execute the task env.execute(“Test Amoro Overwrite”); 

Appending data

For the Amoro table, it supports specifying to write data to FileStore or LogStore through Java API.

DataStream<RowData> input = ...;
InternalCatalogBuilder catalogBuilder = ...;
TableIdentifier tableId = ...;
AmoroTableLoader tableLoader = ...;

TableSchema FLINK_SCHEMA = TableSchema.builder()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("op_time", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
    .build();

AmoroTable table = AmoroUtils.loadAmoroTable(tableLoader);

table.properties().put("arctic.emit.mode", "log,file");

FlinkSink
    .forRowData(input)
    .table(table)
    .tableLoader(tableLoader)
    .flinkSchema(FLINK_SCHEMA)
    .build();

env.execute("Test Amoro Append");

The DataStream API supports writing to primary key tables and non-primary key tables. The configuration items supported by properties can refer to Writing With SQL chapter Hint Options

TIPS

arctic.emit.mode contains log, you need to configure log-store.enabled = true Enable Log Configuration

arctic.emit.mode When file is included, the primary key table will only be written to ChangeStore, and the non-primary key table will be directly written to BaseStore.