Wednesday, June 29, 2022
HomeBig DataImplement a CDC-based UPSERT in a knowledge lake utilizing Apache Iceberg and...

Implement a CDC-based UPSERT in a knowledge lake utilizing Apache Iceberg and AWS Glue


Because the implementation of information lakes and fashionable knowledge structure will increase, prospects’ expectations round its options additionally improve, which embody ACID transaction, UPSERT, time journey, schema evolution, auto compaction, and plenty of extra. By default, Amazon Easy Storage Service (Amazon S3) objects are immutable, which implies you’ll be able to’t replace information in your knowledge lake as a result of it helps append-only transactions. However there are use circumstances the place you is perhaps receiving incremental updates with change knowledge seize (CDC) out of your supply techniques, and also you would possibly must replace present knowledge in Amazon S3 to have a golden copy. Beforehand, you needed to overwrite the entire S3 object or folders, however with the evolution of frameworks similar to Apache Hudi, Apache Iceberg, Delta Lake, and ruled tables in AWS Lake Formation, you will get database-like UPSERT options in Amazon S3.

Apache Hudi integration is already supported with AWS analytics companies, and just lately AWS Glue, Amazon EMR, and Amazon Athena introduced help for Apache Iceberg. Apache Iceberg is an open desk format initially developed at Netflix, which bought open-sourced as an Apache venture in 2018 and graduated from incubator mid-2020. It’s designed to help ACID transactions and UPSERT on petabyte-scale knowledge lakes, and is getting fashionable due to its versatile SQL syntax for CDC-based MERGE, full schema evolution, and hidden partitioning options.

On this put up, we stroll you thru an answer to implement CDC-based UPSERT or MERGE in an S3 knowledge lake utilizing Apache Iceberg and AWS Glue.

Configure Apache Iceberg with AWS Glue

You’ll be able to combine Apache Iceberg JARs into AWS Glue by means of its AWS Market connector. The connector helps AWS Glue variations 1.0, 2.0, and three.0, and is free to make use of. Configuring this connector is as simple as clicking few buttons on the person interface.

The next steps information you thru the setup course of:

  1. Navigate to the AWS Market connector web page.
  2. Select Proceed to Subscribe after which Settle for Phrases.
  3. Select Proceed to Configuration.
  4. Select the AWS Glue model and software program model.
  5. Select Proceed to Launch.
  6. Select Utilization Instruction, which opens a web page that has a hyperlink to activate the connector.
  7. Create a connection by offering a reputation and selecting Create connection and activate connector.

You’ll be able to affirm your new connection on the AWS Glue Studio Connectors web page.

To make use of this connector, while you create an AWS Glue job, ensure you add this connector to your job. Later within the implementation steps, while you create an AWS Glue job, we present find out how to use the connector you simply configured.

Answer overview

Let’s assume you’ve gotten a relational database that has product stock knowledge, and also you wish to transfer it into an S3 knowledge lake on a steady foundation, in order that your downstream purposes or shoppers can use it for analytics. After your preliminary knowledge motion to Amazon S3, you’re alleged to obtain incremental updates from the supply database as CSV recordsdata utilizing AWS DMS or equal instruments, the place every document has a further column to symbolize an insert, replace, or delete operation. Whereas processing the incremental CDC knowledge, one of many major necessities you’ve gotten is merging the CDC knowledge within the knowledge lake and offering the potential to question earlier variations of the info.

To unravel this use case, we current the next easy structure that integrates Amazon S3 for the info lake, AWS Glue with the Apache Iceberg connector for ETL (extract, remodel, and cargo), and Athena for querying the info utilizing normal SQL. Athena helps in querying the most recent product stock knowledge from the Iceberg desk’s newest snapshot, and Iceberg’s time journey function helps in figuring out a product’s value at any earlier date.

The next diagram illustrates the answer structure.

The answer workflow consists of the next steps:

  • Information ingestion:
    • Steps 1.1 and 1.2 use AWS Database Migration Service (AWS DMS), which connects to the supply database and strikes incremental knowledge (CDC) to Amazon S3 in CSV format.
    • Steps 1.3 and 1.4 encompass the AWS Glue PySpark job, which reads incremental knowledge from the S3 enter bucket, performs deduplication of the information, after which invokes Apache Iceberg’s MERGE statements to merge the info with the goal UPSERT S3 bucket.
  • Information entry:
    • Steps 2.1 and a pair of.2 symbolize Athena integration to question knowledge from the Iceberg desk utilizing normal SQL and validate the time journey function of Iceberg.
  • Information Catalog:
    • The AWS Glue Information Catalog is handled as a centralized catalog, which is utilized by AWS Glue and Athena. An AWS Glue crawler is built-in on prime of S3 buckets to routinely detect the schema.

Now we have referenced AWS DMS as a part of the structure, however whereas showcasing the answer steps, we assume that the AWS DMS output is already obtainable in Amazon S3, and give attention to processing the info utilizing AWS Glue and Apache Iceberg.

To demo the implementation steps, we use pattern product stock knowledge that has the next attributes:

  • op – Represents the operation on the supply document. This reveals values I to symbolize insert operations, U to symbolize updates, and D to symbolize deletes. It’s essential to be certain that this attribute is included in your CDC incremental knowledge earlier than it will get written to Amazon S3. AWS DMS allows you to embody this attribute, however when you’re utilizing different mechanisms to maneuver knowledge, ensure you seize this attribute, in order that your ETL logic can take applicable motion whereas merging it.
  • product_id – That is the first key column within the supply database’s merchandise desk.
  • class – This column represents the product’s class, similar to Electronics or Cosmetics.
  • product_name – That is the title of the product.
  • quantity_available – That is the amount obtainable within the stock for a product. Once we showcase the incremental knowledge for UPSERT or MERGE, we cut back the amount obtainable for the product to showcase the performance.
  • last_update_time – That is the time when the product document was up to date on the supply database.

In case you’re utilizing AWS DMS to maneuver knowledge out of your relational database to Amazon S3, then by default AWS DMS consists of the op attribute for incremental CDC knowledge, nevertheless it’s not included by default for the preliminary load. In case you’re utilizing CSV as your goal file format, you’ll be able to embody IncludeOpForFullLoad as true in your S3 goal endpoint setting of AWS DMS to have the op attribute included in your preliminary full load file. To be taught extra in regards to the Amazon S3 settings in AWS DMS, seek advice from S3Settings.

To implement the answer, we create AWS sources similar to an S3 bucket and an AWS Glue job, and combine the Iceberg code for processing. Earlier than we run the AWS Glue job, we now have to add the pattern CSV recordsdata to the enter bucket and course of it with AWS Glue PySpark code for the output.

Stipulations

Earlier than getting began on the implementation, ensure you have the required permissions to carry out the next in your AWS account:

For this put up, we use the us-east-1 Area, however you’ll be able to combine it in your most well-liked Area if the AWS companies included within the structure can be found in that Area.

Now let’s dive into the implementation steps.

Create an S3 bucket for enter and output

To create an S3 bucket, full the next steps:

  1. On the Amazon S3 console, select Buckets within the navigation pane.
  2. Select Create bucket.
  3. Specify the bucket title as glue-iceberg-demo, and depart the remaining fields as default.
    S3 bucket names are globally distinctive. Whereas implementing the answer, you could get an error saying the bucket title already exists. Be certain that to supply a singular title and use the identical title whereas implementing the remainder of the implementation steps. Formatting the bucket title as <Bucket-Title>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE} would possibly assist you get a singular title.
  4. Select Create bucket.
  5. On the bucket particulars web page, select Create folder.
  6. Create two subfolders: raw-csv-input and iceberg-output.
  7. Add the LOAD00000001.csv file into the raw-csv-input folder of the bucket.

The next screenshot supplies a pattern of the enter dataset.

Create enter and output tables utilizing Athena

To create enter and output Iceberg tables within the AWS Glue Information Catalog, open the Athena console and run the next queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_demo;
-- Create exterior desk in enter CSV recordsdata. Substitute the S3 path together with your bucket title
CREATE EXTERNAL TABLE iceberg_demo.raw_csv_input(
  op string, 
  product_id bigint, 
  class string, 
  product_name string, 
  quantity_available bigint, 
  last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',', 
  'typeOfData'='file');
-- Create output Iceberg desk with partitioning. Substitute the S3 bucket title together with your bucket title
CREATE TABLE iceberg_demo.iceberg_output (
  product_id bigint,
  class string,
  product_name string,
  quantity_available bigint,
  last_update_time timestamp) 
PARTITIONED BY (class, bucket(16,product_id)) 
LOCATION 's3://glue-iceberg-demo/iceberg-output/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912' 
)
-- Validate the enter knowledge
SELECT * FROM iceberg_demo.raw_csv_input;

Alternatively, you’ll be able to combine an AWS Glue crawler on prime of the enter to create the desk. Subsequent, let’s create the AWS Glue PySpark job to course of the enter knowledge.

Create the AWS Glue job

Full the next steps to create an AWS Glue job:

  1. On the AWS Glue console, select Jobs within the navigation pane.
  2. Select Create job.
  3. Choose Spark script editor.
  4. For Choices, choose Create a brand new script with boilerplate code.
  5. Select Create.
  6. Substitute the script with the next script:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    from pyspark.sql.features import *
    from awsglue.dynamicframe import DynamicFrame
    
    from pyspark.sql.window import Window
    from pyspark.sql.features import rank, max
    
    from pyspark.conf import SparkConf
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])
    conf = SparkConf()
    
    ## Please be certain that to move runtime argument --iceberg_job_catalog_warehouse with worth because the S3 path 
    conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
    conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")
    
    sc = SparkContext(conf=conf)
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    ## Learn Enter Desk
    IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_demo", table_name = "raw_csv_input", transformation_ctx = "IncrementalInputDyF")
    IncrementalInputDF = IncrementalInputDyF.toDF()
    
    if not IncrementalInputDF.rdd.isEmpty():
        ## Apply De-duplication logic on enter knowledge, to pickup newest document based mostly on timestamp and operation 
        IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
                      
        # Add new columns to seize first and final OP worth and what's the newest timestamp
        inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))
        
        # Filter out new information which are inserted, then choose newest document from present information and merge each to get deduplicated output 
        NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
        UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
        finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)
    
        # Register the deduplicated enter as non permanent desk to make use of in Iceberg Spark SQL statements
        finalInputDF.createOrReplaceTempView("incremental_input_data")
        finalInputDF.present()
        
        ## Carry out merge operation on incremental enter knowledge with MERGE INTO. This part of the code makes use of Spark SQL to showcase the expressive SQL method of Iceberg to carry out a Merge operation
        IcebergMergeOutputDF = spark.sql("""
        MERGE INTO job_catalog.iceberg_demo.iceberg_output t
        USING (SELECT op, product_id, class, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
        ON t.product_id = s.product_id
        WHEN MATCHED AND s.op = 'D' THEN DELETE
        WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time 
        WHEN NOT MATCHED THEN INSERT (product_id, class, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.class, s.product_name, s.quantity_available, s.last_update_time)
        """)
    
        job.commit()

  7. On the Job particulars tab, specify the job title.
  8. For IAM Position, assign an IAM function that has the required permissions to run an AWS Glue job and browse and write to the S3 bucket.
  9. For Glue model, select Glue 3.0.
  10. For Language, select Python 3.
  11. Be certain that Job bookmark has default worth of Allow.
  12. Beneath Connections, select the Iceberg connector.
  13. Beneath Job parameters, specify Key as --iceberg_job_catalog_warehouse and Worth as your S3 path (e.g. s3://<bucket-name>/<iceberg-warehouse-path>).
  14. Select Save after which Run, which ought to write the enter knowledge to the Iceberg desk with a MERGE assertion.

As a result of the goal desk is empty within the first run, the Iceberg MERGE assertion runs an INSERT assertion for all information.

Question the Iceberg desk utilizing Athena

After you’ve gotten efficiently run the AWS Glue job, you’ll be able to validate the output in Athena with the next SQL question:

SELECT * FROM iceberg_demo.iceberg_output restrict 10;

The output of the question ought to match the enter, with one distinction: The Iceberg output desk doesn’t have the op column.

Add incremental (CDC) knowledge for additional processing

After we course of the preliminary full load file, let’s add the next two incremental recordsdata, which embody insert, replace, and delete information for a number of merchandise.

The next is a snapshot of first incremental file (20220302-1134010000.csv).

The next is a snapshot of the second incremental file (20220302-1135010000.csv), which reveals that document 102 has one other replace transaction earlier than the following ETL job processing.

After you add each incremental recordsdata, you need to see them within the S3 bucket.

Run the AWS Glue job once more to course of incremental recordsdata

As a result of we enabled bookmarks on the AWS Glue job, the following job picks up solely the 2 new incremental recordsdata and performs a merge operation on the Iceberg desk.

To run the job once more, full the next steps:

  • On the AWS Glue console, select Jobs within the navigation pane.
  • Choose the job and select Run.

As defined earlier, the PySpark script is predicted to deduplicate the enter knowledge earlier than merging to the goal Iceberg desk, which implies it solely picks up the most recent document of the 102 product.

For this put up, we run the job manually, however you’ll be able to configure your AWS Glue jobs to run as a part of an AWS Glue workflow or by way of AWS Step Capabilities (for extra info, see Handle AWS Glue Jobs with Step Capabilities).

Question the Iceberg desk utilizing Athena, after incremental knowledge processing

After incremental knowledge processing is full, you’ll be able to run the identical SELECT assertion once more and validate that the amount worth is up to date for document 102 and product document 103 is deleted.

The next screenshot reveals the output.

Question the earlier model of information with Iceberg’s time journey function

You’ll be able to run the next SQL question in Athena that makes use of the AS OF TIME assertion of Iceberg to question the earlier model of the info:

-SELECT * FROM iceberg_demo.iceberg_output FOR SYSTEM_TIME AS OF TIMESTAMP '2022-03-23 18:56:00'

The next screenshot reveals the output. As you’ll be able to see, the amount worth of product ID 102 is 30, which was obtainable through the preliminary load.

Notice that you must change the AS OF TIMESTAMP worth based mostly in your runtime.

This concludes the implementation steps.

Issues

The next are a number of concerns you need to remember whereas integrating Apache Iceberg with AWS Glue:

  • Athena help for Iceberg grew to become usually obtainable just lately, so ensure you evaluation the concerns and limitations of utilizing this function.
  • AWS Glue supplies DynamicFrame APIs to learn from completely different supply techniques and write to completely different targets. For this put up, we built-in Spark DataFrame as an alternative of AWS Glue DynamicFrame as a result of Iceberg’s MERGE statements aren’t supported with AWS Glue DynamicFrame APIs.
    To be taught extra about AWS integration, seek advice from Iceberg AWS Integrations.

Conclusion

This put up explains how you should utilize the Apache Iceberg framework with AWS Glue to implement UPSERT on an S3 knowledge lake. It supplies an summary of Apache Iceberg, its options and integration approaches, and explains how one can implement it by means of a step-by-step information.

I hope this provides you a fantastic start line for utilizing Apache Iceberg with AWS analytics companies and you can construct on prime of it to implement your resolution.

Appendix: AWS Glue DynamicFrame pattern code to work together with Iceberg tables

  • The next code pattern demonstrates how one can combine the DynamicFrame methodology to learn from an Iceberg desk:
IcebergDyF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="market.spark",
        connection_options={
            "path": "job_catalog.iceberg_demo.iceberg_output",
            "connectionName": "Iceberg Connector for Glue 3.0",
        },
        transformation_ctx="IcebergDyF",
    )
)

## Optionally, convert to Spark DataFrame when you plan to leverage Iceberg’s SQL based mostly MERGE statements
InputIcebergDF = IcebergDyF.toDF()

  • The next pattern code reveals how one can combine the DynamicFrame methodology to put in writing to an Iceberg desk for append-only mode:
## Use the next 2 traces to transform Spark DataFrame to DynamicFrame, when you plan to leverage DynamicFrame API to put in writing to remaining goal
from awsglue.dynamicframe import DynamicFrame 
finalDyF = DynamicFrame.fromDF(InputIcebergDF,glueContext,"finalDyF")

WriteIceberg = glueContext.write_dynamic_frame.from_options(
    body= finalDyF,
    connection_type="market.spark",
    connection_options={
        "path": "job_catalog.iceberg_demo.iceberg_output",
        "connectionName": "Iceberg Connector for Glue 3.0",
    },
    format="parquet",
    transformation_ctx="WriteIcebergDyF",
)


In regards to the Creator

Sakti Mishra is a Principal Information Lab Answer Architect at AWS, the place he helps prospects modernize their knowledge structure and assist outline finish to finish knowledge technique together with knowledge safety, accessibility, governance, and extra. He’s additionally the writer of the e-book Simplify Huge Information Analytics with Amazon EMR. Exterior of labor, Sakti enjoys studying new applied sciences, watching films, and visiting locations with household.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments