Construct a transactional information lake utilizing Apache Iceberg, AWS Glue, and cross-account information shares utilizing AWS Lake Formation and Amazon Athena


Constructing an information lake on Amazon Easy Storage Service (Amazon S3) offers quite a few advantages for a corporation. It means that you can entry various information sources, construct enterprise intelligence dashboards, construct AI and machine studying (ML) fashions to offer custom-made buyer experiences, and speed up the curation of latest datasets for consumption by adopting a fashionable information structure or information mesh structure.

Nonetheless, many use circumstances, like performing change information seize (CDC) from an upstream relational database to an Amazon S3-based information lake, require dealing with information at a file stage. Performing an operation like inserting, updating, and deleting particular person information from a dataset requires the processing engine to learn all of the objects (recordsdata), make the adjustments, and rewrite complete datasets as new recordsdata. Moreover, making the information out there within the information lake in near-real time usually results in the information being fragmented over many small recordsdata, leading to poor question efficiency and compaction upkeep.

In 2022, we introduced that you may implement fine-grained entry management insurance policies utilizing AWS Lake Formation and question information saved in any supported file format utilizing desk codecs resembling Apache Iceberg, Apache Hudi, and extra utilizing Amazon Athena queries. You get the flexibleness to decide on the desk and file format finest suited to your use case and get the advantage of centralized information governance to safe information entry when utilizing Athena.

On this put up, we present you find out how to configure Lake Formation utilizing Iceberg desk codecs. We additionally clarify find out how to upsert and merge in an S3 information lake utilizing an Iceberg framework and apply Lake Formation entry management utilizing Athena.

Iceberg is an open desk format for very giant analytic datasets. Iceberg manages giant collections of recordsdata as tables, and it helps fashionable analytical information lake operations resembling record-level insert, replace, delete, and time journey queries. The Iceberg specification permits seamless desk evolution resembling schema and partition evolution, and its design is optimized for utilization on Amazon S3. Iceberg additionally helps assure information correctness below concurrent write eventualities.

Resolution overview

To clarify this setup, we current the next structure, which integrates Amazon S3 for the information lake (Iceberg desk format), Lake Formation for entry management, AWS Glue for ETL (extract, rework, and cargo), and Athena for querying the most recent stock information from the Iceberg tables utilizing customary SQL.

The answer workflow consists of the next steps, together with information ingestion (Steps 1–3), information governance (Step 4), and information entry (Step 5):

  1. We use AWS Database Migration Service (AWS DMS) or the same instrument to hook up with the information supply and transfer incremental information (CDC) to Amazon S3 in CSV format.
  2. An AWS Glue PySpark job reads the incremental information from the S3 enter bucket and performs deduplication of the information.
  3. The job then invokes Iceberg’s MERGE statements to merge the information with the goal S3 bucket.
  4. We use the AWS Glue Information Catalog as a centralized catalog, which is utilized by AWS Glue and Athena. An AWS Glue crawler is built-in on prime of S3 buckets to robotically detect the schema. Lake Formation means that you can centrally handle permissions and entry management for Information Catalog assets in your S3 information lake. You should use fine-grained entry management in Lake Formation to limit entry to information in question outcomes.
  5. We use Athena built-in with Lake Formation to question information from the Iceberg desk utilizing customary SQL and validate table- and column-level entry on Iceberg tables.

For this answer, we assume that the uncooked information recordsdata are already out there in Amazon S3, and concentrate on processing the information utilizing AWS Glue with Iceberg desk format. We use pattern merchandise information that has the next attributes:

  • op – This represents the operation on the supply file. This reveals values I to characterize insert operations, U to characterize updates, and D to characterize deletes. You want to be sure that this attribute is included in your CDC incremental information earlier than it will get written to Amazon S3. Ensure you seize this attribute, in order that your ETL logic can take applicable motion whereas merging it.
  • product_id – That is the first key column within the supply information desk.
  • class – This column represents the class of an merchandise.
  • product_name – That is the title of the product.
  • quantity_available – That is the amount out there within the stock. After we showcase the incremental information for UPSERT or MERGE, we scale back the amount out there for the product to showcase the performance.
  • last_update_time – That is the time when the merchandise file was up to date on the supply information.

We display implementing the answer with the next steps:

  1. Create an S3 bucket for enter and output information.
  2. Create enter and output tables utilizing Athena.
  3. Insert the information into the Iceberg desk from Athena.
  4. Question the Iceberg desk utilizing Athena.
  5. Add incremental (CDC) information for additional processing.
  6. Run the AWS Glue job once more to course of the incremental recordsdata.
  7. Question the Iceberg desk once more utilizing Athena.
  8. Outline Lake Formation insurance policies.

Stipulations

For Athena queries, we have to configure an Athena workgroup with engine model 3 to help Iceberg desk format.

To validate cross-account entry via Lake Formation for Iceberg desk, on this put up we used two accounts (main and secondary).

Now let’s dive into the implementation steps.

Create an S3 bucket for enter and output information

Earlier than we run the AWS Glue job, we’ve to add the pattern CSV recordsdata to the enter bucket and course of them with AWS Glue PySpark code for the output.

To create an S3 bucket, full the next steps:

  1. On the Amazon S3 console, select Buckets within the navigation pane.
  2. Select Create bucket.
  3. Specify the bucket title asiceberg-blog and depart the remaining fields as default.

S3 bucket names are globally distinctive. Whereas implementing the answer, you might get an error saying the bucket title already exists. Be sure to offer a singular title and use the identical title whereas implementing the remainder of the implementation steps. Formatting the bucket title as<Bucket-Title>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}may show you how to get a singular title.

  1. On the bucket particulars web page, select Create folder.
  2. Create two subfolders. For this put up, we createiceberg-blog/raw-csv-input andiceberg-blog/iceberg-output.
  3. Add theLOAD00000001.csvfile into the raw-csv-input folder.

The next screenshot offers a pattern of the enter dataset.

Create enter and output tables utilizing Athena

To create enter and output Iceberg tables within the AWS Glue Information Catalog, open the Athena question editor and run the next queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_lf_db;

As we clarify later on this put up, it’s important to file the information areas when incorporating Lake Formation entry controls.

-- Create exterior desk in enter CSV recordsdata. Substitute the S3 path along with your bucket title
CREATE EXTERNAL TABLE iceberg_lf_db.csv_input(
op string,
product_id bigint,
class string,
product_name string,
quantity_available bigint,
last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
'areColumnsQuoted'='false',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'typeOfData'='file');

-- Create output Iceberg desk with partitioning. Substitute the S3 bucket title along with your bucket title
CREATE TABLE iceberg_lf_db.iceberg_table_lf (
product_id bigint,
class string,
product_name string,
quantity_available bigint,
last_update_time timestamp)
PARTITIONED BY (class, bucket(16,product_id))
LOCATION 's3://glue-iceberg-demo/iceberg_blog/iceberg-output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912'
);

-- Validate the enter information
SELECT * FROM iceberg_lf_db.csv_input;

SELECT * FROM iceberg_lf_db.iceberg_table_lf;

Alternatively, you should utilize an AWS Glue crawler to create the desk definition for the enter recordsdata.

Insert the information into the Iceberg desk from Athena

Optionally, we are able to insert information into the Iceberg desk via Athena utilizing the next code:

insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,class,product_name,quantity_available,last_update_time) values (200,'Cellular','Cellular model 1',25,solid('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,class,product_name,quantity_available,last_update_time) values (201,'Laptop computer','Laptop computer model 1',20,solid('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,class,product_name,quantity_available,last_update_time) values (202,'Pill','Kindle',30,solid('2023-01-19 09:51:41' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,class,product_name,quantity_available,last_update_time) values (203,'Speaker','Alexa',10,solid('2023-01-19 09:51:42' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,class,product_name,quantity_available,last_update_time) values (204,'Speaker','Alexa',50,solid('2023-01-19 09:51:43' as timestamp));

For this put up, we load the information utilizing an AWS Glue job. Full the next steps to create the job:

  1. On the AWS Glue console, select Jobs within the navigation pane.
  2. Select Create job.
  3. Choose Visible with a clean canvas.
  4. Select Create.
  5. Select Edit script.
  6. Substitute the script with the next script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.features import *
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.window import Window
from pyspark.sql.features import rank, max

from pyspark.conf import SparkConf

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
conf = SparkConf()

## spark.sql.catalog.job_catalog.warehouse might be handed as an ## runtime argument with worth because the S3 path
## Please be sure that to cross runtime argument –
## iceberg_job_catalog_warehouse with worth because the S3 path 
conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


## Learn Enter Desk
## glueContext.create_data_frame.from_catalog might be extra 
## performant and might be changed rather than 
## create_dynamic_frame.from_catalog.

IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_lf_db", table_name = "csv_input", transformation_ctx = "IncrementalInputDyF")
IncrementalInputDF = IncrementalInputDyF.toDF()

if not IncrementalInputDF.rdd.isEmpty():
## Apply De-duplication logic on enter information, to pickup newest file based mostly on timestamp and operation
IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)

# Add new columns to seize OP worth and what's the newest timestamp
inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))

# Filter out new information which might be inserted, then choose newest file from present information and merge each to get deduplicated output
NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

# Register the deduplicated enter as non permanent desk to make use of in Iceberg Spark SQL statements
finalInputDF.createOrReplaceTempView("incremental_input_data")
finalInputDF.present()

## Carry out merge operation on incremental enter information with MERGE INTO. This part of the code makes use of Spark SQL to showcase the expressive SQL strategy of Iceberg to carry out a Merge operation
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_lf_db.iceberg_table_lf t
USING (SELECT op, product_id, class, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, class, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.class, s.product_name, s.quantity_available, s.last_update_time)
""")

job.commit()

  1. On the Job particulars tab, specify the job title (iceberg-lf).
  2. For IAM Function, assign an AWS Identification and Entry Administration (IAM) position that has the required permissions to run an AWS Glue job and browse and write to the S3 bucket.
  3. For Glue model, select Glue 4.0 (Glue 3.0 can be supported).
  4. For Language, select Python 3.
  5. Be sure Job bookmark has the default worth of Allow.
  6. For Job parameters, add the next:
    1. Add the important thing--datalake-formatswith the worthiceberg.
    2. Add the important thing--iceberg_job_catalog_warehouse with the worth as your S3 path (s3://<bucket-name>/<iceberg-warehouse-path>).
  7. Select Save after which Run, which ought to write the enter information to the Iceberg desk with a MERGE assertion.

Question the Iceberg desk utilizing Athena

After you will have efficiently run the AWS Glue job, you may validate the output in Athena with the next SQL question:

SELECT * FROM iceberg_lf_db.iceberg_table_lf restrict 10;

The output of the question ought to match the enter, with one distinction: the Iceberg output desk doesn’t have theopcolumn.

Add incremental (CDC) information for additional processing

After we course of the preliminary full load file, let’s add an incremental file.

This file consists of up to date information on two gadgets.

Run the AWS Glue job once more to course of incremental recordsdata

As a result of the AWS Glue job has bookmarks enabled, the job picks up the brand new incremental file and performs a MERGE operation on the Iceberg desk.

To run the job once more, full the next steps:

  1. On the AWS Glue console, select Jobs within the navigation pane.
  2. Choose the job and select Run.

For this put up, we run the job manually, however you may configure your AWS Glue jobs to run as a part of an AWS Glue workflow or by way of AWS Step Features (for extra info, see Handle AWS Glue Jobs with Step Features).

Question the Iceberg desk utilizing Athena after incremental information processing

When the incremental information processing is full, you may run the identical SELECT assertion once more and validate that the amount worth is up to date for gadgets 200 and 201.

The next screenshot reveals the output.

Outline Lake Formation insurance policies

For information governance, we use Lake Formation. Lake Formation is a completely managed service that simplifies information lake setup, helps centralized safety administration, and offers transactional entry on prime of your information lake. Furthermore, it allows information sharing throughout accounts and organizations. There are two methods to share information assets in Lake Formation: named useful resource entry management (NRAC) and tag-based entry management (TBAC). NRAC makes use of AWS Useful resource Entry Supervisor (AWS RAM) to share information assets throughout accounts utilizing Lake Formation V3. These are consumed by way of useful resource hyperlinks which might be based mostly on created useful resource shares. Lake Formation tag-based entry management (LF-TBAC) is one other strategy to share information assets in Lake Formation, which defines permissions based mostly on attributes. These attributes are referred to as LF-tags.

On this instance, we create databases within the main account. Our NRAC database is shared with an information area by way of AWS RAM. Entry to information tables that we register on this database shall be dealt with via NRAC.

Configure entry controls within the main account

Within the main account, full the next steps to arrange entry controls utilizing Lake Formation:

  1. On the Lake Formation console, select Information lake areas within the navigation pane.
  2. Select Register location.
  3. Replace the Iceberg Amazon S3 location path proven within the following screenshot.

Grant entry to the database to the secondary account

To grant database entry to the exterior (secondary) account, full the next steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, select Grant.
  3. Select Exterior accounts and enter the secondary account quantity.
  4. Choose Named information catalog assets.
  5. Confirm the database title.

The primary grant must be at database stage, and the second grant is at desk stage.

  1. For Database permissions, specify your permissions (for this put up, we choose Describe).
  2. Select Grant.

Now you must grant permissions on the desk stage.

  1. Choose Exterior accounts and enter the secondary account quantity.
  2. Choose Named information catalog assets.
  3. Confirm the desk title.
  4. For Desk permissions, specify the permissions you need to grant. For this put up, we choose Choose and Describe.
  5. Select Grant.

In the event you see the next error, you have to revokeIAMAllowedPrincipalsfrom the information lake permissions.

To take action, choose IAMAllowedPrincipals and select Revoke.

Select Revoke once more to substantiate.

After you revoke the information permissions, the permissions ought to seem as proven within the following screenshot.

Add AWS Glue IAM position permissions

As a result of the IAM principal position was revoked, the AWS Glue IAM position that was used within the AWS Glue job must be added solely to grant entry as proven within the following screenshot.

You want to repeat these steps for the AWS Glue IAM position at desk stage.

Confirm the permissions granted to the AWS Glue IAM position on the Lake Formation console.

Grant entry to the Iceberg desk to the exterior account

Within the secondary account, full the next steps to grant entry to the Iceberg desk to exterior account.

  1. On the AWS RAM console, select Useful resource shares within the navigation pane.
  2. Select the useful resource shares invitation despatched from the first account.
  3. Select Settle for useful resource share.

The useful resource standing ought to now be energetic.

Subsequent, you must create a useful resource hyperlink for the shared Iceberg desk and entry via Athena.

  1. On the Lake Formation console, select Tables within the navigation pane.
  2. Choose the Iceberg desk (shared from the first account).
  3. On the Actions menu, select Create useful resource hyperlink.
  4. For Useful resource hyperlink title, enter a reputation (for this put up,iceberg_table_lf_demo).
  5. For Database, select your database and confirm the shared desk and database are robotically populated.
  6. Select Create.
  7. Choose your desk and on the Actions menu, select View information.

You’re redirected to the Athena console, the place you may question the information.

Grant column-based entry within the main account

For column-level restricted entry, you must grant entry on the column stage on the Iceberg desk. Full the next steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, select Grant.
  3. Choose Exterior accounts and enter the secondary account quantity.
  4. Choose Named information catalog assets.
  5. Confirm the desk title.
  6. For Desk permissions, select the permissions you need to grant. For this put up, we choose Choose.
  7. Underneath Information permissions, select Column-based entry.
  8. Choose Embody columns and select your permission filters (for this put up, Class and Quantity_available).
  9. Select Grant.

Information with restricted columns can now be queried via the Athena console.

Clear up

To keep away from incurring ongoing prices, full the next steps to scrub up your assets:

  1. In your secondary account, log in to the Lake Formation console.
  2. Drop the useful resource share desk.
  3. In your main account, log in to the Lake Formation console.
  4. Revoke the entry you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and every other assets that you just created as a part of the conditions for this put up.

Conclusion

This put up explains how you should utilize the Iceberg framework with AWS Glue and Lake Formation to outline cross-account entry controls and question information utilizing Athena. It offers an outline of Iceberg and its options and integration approaches, and explains how one can ingest information, grant cross-account entry, and question information via a step-by-step information.

We hope this provides you an incredible place to begin for utilizing Iceberg to construct your information lake platform together with AWS analytics companies to implement your answer.


In regards to the Authors

Vikram Sahadevan is a Senior Resident Architect on the AWS Information Lab staff. He enjoys efforts that focus round offering prescriptive architectural steering, sharing finest practices, and eradicating technical roadblocks with joint engineering engagements between prospects and AWS technical assets that speed up information, analytics, synthetic intelligence, and machine studying initiatives.

Suvendu Kumar Patra possesses 18 years of expertise in infrastructure, database design, and information engineering, and he presently holds the place of Senior Resident Architect at Amazon Internet Providers. He’s a member of the specialised focus group, AWS Information Lab, and his main duties entail working with govt management groups of strategic AWS prospects to develop their roadmaps for information, analytics, and AI/ML. Suvendu collaborates intently with prospects to implement information engineering, information hub, information lake, information governance, and EDW options, in addition to enterprise information technique and information administration.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles