Spark Writes

Writing with SQL

INSERT OVERWRITE

INSERT OVERWRITE can replace the partition in a table with the results of a query.

The default overwrite mode of Spark is Static, you can change the overwrite mode by

SET spark.sql.sources.partitionOverwriteMode=dynamic

To demonstrate the behavior of dynamic and static overwrites, a test table is defined using the following DDL:

CREATE TABLE arctic_catalog.db.sample (
    id int,
    data string,
    ts timestamp,
    primary key (id))
USING arctic
PARTITIONED BY (days(ts))

When Spark’s overwrite mode is dynamic, the partitions of the rows generated by the SELECT query will be replaced.

INSERT OVERWRITE arctic_catalog.db.sample values 
(1, 'aaa',  timestamp(' 2022-1-1 09:00:00 ')), 
(2, 'bbb',  timestamp(' 2022-1-2 09:00:00 ')), 
(3, 'ccc',  timestamp(' 2022-1-3 09:00:00 '))

When Spark’s overwrite mode is static, the PARTITION clause will be translated into the result set of the SELECT from the table. If the PARTITION clause is omitted, all partitions will be replaced.

INSERT OVERWRITE arctic_catalog.db.sample 
partition( dt = '2021-1-1')  values 
(1, 'aaa'), (2, 'bbb'), (3, 'ccc') 

In Static mode, it is not supported to define transforms on partitioning columns.

You can enable uniqueness check of the primary key on the source table by setting spark.sql.arctic.check-source-data-uniqueness.enabled = true in SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.

INSERT INTO

To append new data to a table, use INSERT INTO.

INSERT INTO arctic_catalog.db.sample VALUES (1, 'a'), (2, 'b')

INSERT INTO prod.db.table SELECT ...

Upsert to table with primary keys.

To add new data to a table with a primary key, you can control whether to enable the UPSERT function by setting the write.upsert.enabled parameter.

When UPSERT is enabled, if a row with the same primary key already exists, an UPDATE operation will be performed, and if it does not exist, an INSERT operation will be performed.

When UPSERT is disabled, only INSERT operation will be performed, even if there are rows with the same primary key in the table.

CREATE TABLE arctic_catalog.db.keyedTable (
    id int,
    data string,
    primary key (id))
USING arctic
TBLPROPERTIES ('write.upsert.enabled' = 'true')
INSERT INTO arctic_catalog.db.keyedTable VALUES (1, 'a'), (2, 'b')

INSERT INTO prod.db.keyedTable SELECT ...

You can enable uniqueness check of the primary key on the source table by setting spark.sql.arctic.check-source-data-uniqueness.enabled = true in SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.

DELETE FROM

The DELETE FROM statements delete rows from table.

DELETE FROM arctic_catalog.db.sample
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'

DELETE FROM arctic_catalog.db.sample
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)

DELETE FROM arctic_catalog.db.sample AS t1
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)

UPDATE

The UPDATE statement modifies rows in the table.

UPDATE arctic_catalog.db.sample
SET c1 = 'update_c1', c2 = 'update_c2'
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'

UPDATE arctic_catalog.db.sample
SET session_time = 0, ignored = true
WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)

UPDATE arctic_catalog.db.sample AS t1
SET order_status = 'returned'
WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)

MERGE INTO

MERGE INTO prod.db.target t   -- a target table
USING (SELECT ...) s          -- the source updates
ON t.id = s.id                -- condition to find updates for target rows
WHEN ...                      -- updates

The MERGE INTO statement supports multi action WHEN MATCHED ... THEN ... to execute UPDATE, DELETE, INSERT.


MERGE INTO prod.db.target t   
USING prod.db.source s       
ON t.id = s.id             
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1          
WHEN NOT MATCHED THEN INSERT *

Writing with DataFrames

Appending data

Using append() to add data to a MixedFormat table.

val data: DataFrame = ...
data.writeTo("arctic_catalog.db.sample").append()

Overwriting data

Using overwritePartitions() to overwriting data.

val data: DataFrame = ...
data.writeTo("arctic_catalog.db.sample").overwritePartitions()

Creating tables

The create() will create a table and write data to the table, just like CREATE TABLE AS SELECT

val data: DataFrame = ...
data.writeTo("arctic_catalog.db.sample").create()

The primary keys and partition keys could be specified by partitionBy() and option("primary.keys", "'xxx'").

val data: DataFrame = ...
data.write().format("arctic")
    .partitionBy("data")
    .option("primary.keys", "'xxx'")
    .save("arctic_catalog.db.sample")