Sunday, December 25, 2022
HomeData ScienceEnhance Your Cloud Knowledge Purposes with DuckDB and Iceberg API | by...

Enhance Your Cloud Knowledge Purposes with DuckDB and Iceberg API | by Alon Agmon | Dec, 2022


Picture by Hubert Neufeld on Unsplash

Apache Iceberg is generally identified for making it attainable for fashionable question engines, similar to Spark, Dremio and Trino, to reliably question and manipulate data in big tables saved in knowledge lakes, and to take action in scale whereas making certain protected concurrent reads and writes. As such, it addresses among the main issues that characterize fashionable data-lake platforms, similar to knowledge integrity, efficiency, upkeep, and price.

A lot of what allows Iceberg’s magic is the way in which it effectively organizes the desk’s metadata, in addition to monitoring its knowledge recordsdata, their statistics, and their model historical past. That is additionally what makes queries on Iceberg tables a lot quicker and involving much less knowledge scan. Queries on tables that do not use or save file-level metadata (e.g., Hive) sometimes contain expensive checklist and scan operations, simply to determine the related knowledge recordsdata over which the question must run. In distinction, when the info that we’d like to be able to fulfill a question lies in a particular set of recordsdata, then Iceberg’s can in precept inform us this instantly with out even scanning a single knowledge file.

Iceberg is a desk format, however it’s additionally a library that exposes a robust API and set of procedures that implement its principal performance. Iceberg’s API allows question engines to make the most of its strong metadata construction and run queries that scan much less recordsdata and keep away from costly checklist operations.

Nevertheless, not all use instances essentially require or justify the heavy lifting of question engines, similar to Spark or Trino, which could not all the time be obtainable. Fortunately, we don’t have to make use of a hefty compute engine to make the most of the performance uncovered by Iceberg’s API. Any utility that may take care of parquet recordsdata can use Iceberg tables and its API to be able to question extra effectively. The aim of this quick and hands-on submit is to reveal how.

DuckDB is a well-liked in-process database that, amongst different options, excels in studying and operating SQL queries over parquet recordsdata. Due to this fact, it offers cloud knowledge purposes an necessary performance that may make them extra strong and fewer depending on hefty question engines. But, its skill to deal with massive volumes of knowledge is proscribed, largely by the reminiscence dimension of the machine or course of that runs it. That’s precisely the place Iceberg API kicks in.

This submit will proceed as follows: Part 2 exhibits and explains the related use case and the way Iceberg API + DuckDB can tackle it effectively. Subsequent, Part 3 demonstrates how one can work with Iceberg API and Part 4 provides DuckDB in to the image. Part 5 explains how each play out collectively and tackle the issue acknowledged above. Part 6 concludes.

(I ought to level out on the outset that I largely use Scala within the code examples although it depends on Iceberg’s Java API and may be simply ported)

We’ve a large iceberg desk that’s partitioned by date, and comprises occasion knowledge streamed from our clients. Every occasion is recognized by a discipline named account_id, comprises a bunch of counters, and an event_type discipline. As a result of most knowledge shoppers sometimes add the account_id to their WHERE clause, we determined to type the recordsdata in every partition by account_id ( this is a vital bit right here. I’ll clarify extra shortly).

The requirement is to create a service that can inform us what number of occasions a given buyer had over a particular day. The service will obtain a sure date as a parameter, in addition to an account id, and can return an aggregation by event_type (in JSON format). In brief, the service must run one thing like the next question:

SELECT date, account_id, event_type, rely(*) as occasions
FROM customer_events
WHERE date = date '2022-01-02' AND account_id = '2500'
GROUP BY 1,2,3

Sounds easy sufficient? nicely, not essentially. This service must reply dozens of requests per day for one thing like 100 clients out of 10K. Due to this fact, it is sensible to run this on demand or solely when the info of the precise buyer is required. Such use case doesn’t appear to justify spinning a Spark or Trino cluster, however but the info it has to scan appears fairly large. Even when we all know the required partition, list-ing and scanning trough the partition is prone to be reminiscence and compute intense.

That is the place Iceberg API shines. If the desk is partitioned by date and recordsdata are sorted by customer_id (as within the image beneath) then the desk’s metadata would present that reasonably than scanning the entire partition and its recordsdata, there is only one knowledge file that the question above really must scan — knowledge file #2 (WHERE date=2022-01-02 AND account_id=2500). Certainly, operating this question in opposition to an Iceberg desk would end in a scan of simply this file, and that’s what Iceberg API will inform us.

Assuming that every knowledge file weighs round 250MB, then the question and scan operations of a single file appear far more possible for knowledge purposes to course of.

As for the way in which we’re going to run these queries over the parquet recordsdata, I’ve already talked about that we’re going to use the more and more fashionable DuckDB, attributable to its skill to effectively run SQL queries over parquet recordsdata laying in S3, along with the benefit wherein it may be embedded in knowledge purposes.

So now we will conclude the movement of our service (see diagram beneath): the service will likely be invoked with a date variable and an account_id. Subsequent, we name Iceberg API to question the desk’s metadata for knowledge recordsdata which might be related to our question. Lastly, after we get the checklist of recordsdata, we create and execute a DuckDB question assertion that may also format the end result set in JSON (a neat DuckDB characteristic) and return them to the caller.

In case you want a very good intro to Iceberg API, then be happy to test among the hyperlinks on the backside of the submit, although I imagine that the code is straightforward sufficient even when you already know little or no about Iceberg.

The very first thing we do is create a filtering Expression object that Iceberg will use to be able to question the desk metadata for the recordsdata that include knowledge that match our filter. You possibly can chain expressions, in addition to mix them as I did beneath.

    val partitionPredicate = Expressions.and(
Expressions.equal("date", "2022-11-05"),
Expressions.equal("accountID", "0012345000"))

Subsequent, we create a Catalog object which is the entry level for Iceberg’s API. We use AWS Glue as our catalog, however you too can use JDBC and Hadoop catalogs as nicely. (Word that for this to work you want AWS credentials in your env or path). The second operate within the code block beneath merely returns an Iceberg desk object that will likely be used to invoke the required operations, and the final operate executes a desk scan procedures with the optionally available variable of an Expression object that we now have outlined above. It’s this final operate that can basically inform us, on the premise of Iceberg’s metadata, which recordsdata are related for the question.


non-public def getGlueCatalog(): Attempt[GlueCatalog] = Attempt{
val catalog = new GlueCatalog
val props = Map(
CatalogProperties.CATALOG_IMPL -> classOf[GlueCatalog].getName,
CatalogProperties.WAREHOUSE_LOCATION -> "s3://Doesnt_Matter_In_This_Context",
CatalogProperties.FILE_IO_IMPL -> classOf[S3FileIO].getName
).asJava
catalog.initialize("myCatalog", props)
catalog
}

non-public def getIcebergTableByName(namespace: String, tableName: String, catalog: GlueCatalog): Attempt[Table] =
Attempt{
val tableID = TableIdentifier.of(namespace, tableName)
catalog.loadTable(tableID)
}

non-public def scanTableWithPartitionPredicate(desk:Desk, partitionPredicate:Expression):Attempt[TableScan] =
Attempt(desk.newScan.filter(partitionPredicate))

After we created an Iceberg Catalog, loaded our Desk, and executed a brand new scan with a filtering Expression, Iceberg returns a TableScan object. The TableScan object will hopefully include a listing of the deliberate knowledge recordsdata that reply our filtering expression. The operate beneath merely extracts the file names from the desk scan and chain them in a protracted string that can enable us to question for them utilizing DuckDB.

  non-public def getDataFilesLocations(tableScan:TableScan): Attempt[String] = Attempt {
// chain all recordsdata to scan in a single string => "'file1', 'file2'"
tableScan.planFiles().asScala
.map(f => "'" + f.file.path.toString + "'")
.mkString(",")
}

This sums up our service’s utilization of the Iceberg API. We begin with a Catalog, and utilizing an Expression filter provoke a table-scan that makes use of our desk’s metadata to find out the recordsdata which might be related to our question.

DuckDB is an more and more fashionable in-process OLAP database that excels in operating mixture queries on quite a lot of knowledge sources. DuckDB differs from comparable merchandise (similar to SQLite) within the efficiency it provides to OLAP queries, in addition to within the flexibility it supplies. In brief, it’s basically an in-process mini-DWH that allows us to run aggregation-heavy queries on comparatively massive datasets.

Nevertheless, some datasets are simply too massive for the form of machines or pods we now have obtainable or that we need to use. And, that’s precisely what makes the mixture of DuckDB with Iceberg a really highly effective one — with Iceberg API we will run queries on a lot much less knowledge in a lot much less time.

DuckDB permits querying parquet recordsdata utilizing the key phrase parquet_scan() which can be utilized in a question like this:

SELECT date, account_id, event_type, rely(*) as occasions
FROM parquet_scan([ <FILES_LIST>])
WHERE date = date '2022-01-02' AND account_id = '2500'
GROUP BY 1,2,3

So, after getting the checklist of knowledge recordsdata utilizing Iceberg’s API, we will merely exchange the string “<FILES_LIST>” within the question above with the checklist of recordsdata that we acquired from Iceberg, and execute the question on the filtered recordsdata.

Because the code block beneath exhibits, we begin by initializing DuckDB in-memory. As a result of we need to work with tables in S3 then we first want to put in and cargo the httpfs module (along with the parquet module that ought to already be loaded by default). Modules are loaded in DuckDB by executing statements, which additionally embrace variable setters, that we use right here to set AWS credentials. The second operate within the code beneath merely injects the checklist of recordsdata within the question, as we noticed above, and the final operate executes the assertion.

  non-public def initDuckDBConnection: Attempt[Connection] = Attempt {
val con = DriverManager.getConnection("jdbc:duckdb:")
val init_statement =
s"""
|INSTALL httpfs;
|LOAD httpfs;
|SET s3_region='eu-west-1';
|SET s3_access_key_id='${sys.env.get("AWS_ACCESS_KEY_ID").get}';
|SET s3_secret_access_key='${sys.env.get("AWS_SECRET_ACCESS_KEY").get}';
|SET s3_session_token='${sys.env.get("AWS_SESSION_TOKEN").get}';
|""".stripMargin
con.createStatement().execute(init_statement)
con
}

non-public def formatQuery(question:String, dataFilesStr:String):Attempt[String] = Attempt {
question.replaceAll("<FILES_LIST>", dataFilesStr)
}

non-public def executQuery(connection: Connection, question:String):Attempt[ResultSet] = Attempt{
connection.createStatement.executeQuery(question)
}

That settles the core of our service on the question aspect. After we get the checklist recordsdata, we simply want to ensure DuckDB is correctly initialized, format the question and execute it.

As talked about earlier, the processing logic of our service includes 2 principal levels that unfold in 7 steps detailed within the code block beneath (Scala’s for comprehension really makes this beautiful easy).


val queryStatement = """
|SELECT row_to_json(resultsDF)
|FROM (
| SELECT date, account_id, event_type, rely(*) as occasions
| FROM parquet_scan([ <FILES_LIST>])
| WHERE acc_id = '2500' AND date = '2022-01-01'
| GROUP BY 1,2,3
|) resultsDF
|""".stripMargin

val filterExpr = Expressions.and(
Expressions.equal("date", "2022-11-05"),
Expressions.equal("accountID", "0012345000"))

val jsonDataRows = for {
catalog <- getGlueCatalog
desk <- getIcebergTableByName("db_name", "table_name", catalog)
tableScan <- scanTableWithPartitionPredicate(desk, filterExpr)
dataFilesString <- getDataFilesLocations(tableScan)
queryStatement <- formatQuery(question, dataFilesString)
dbConnection <- initDuckDBConnection
resultSet <- executQuery(dbConnection, queryStatement)
} yield resultSet.toStringList

Iceberg API is first used to acquire desk reference and execute a desk scan that fetches the names and areas of the info recordsdata that include the date we’re fascinated about, in addition to the customer_id. As soon as we get the filtered checklist of recordsdata and their areas in S3, we chain them in a protracted string and inject it into to the question. Lastly, the question is executed solely on the related recordsdata utilizing DuckDB and the outcomes are obtained.

As you’ll be able to see, the question template above is wrapped in a row_to_json operate that transforms the end result set right into a JSON doc, which is what we needed to realize.

Apache Iceberg is shortly changing into an ordinary for metadata administration and group of large knowledge lake tables. Due to this fact, it isn’t stunning that fashionable question engines, similar to Impala, Spark, and Trino, in addition to public cloud suppliers, had been quick to announce their help within the Iceberg desk format and API.

The aim of this submit was to reveal a robust manner wherein knowledge purposes can make the most of the advantages provided by Iceberg tables independently of a hefty question engine. It confirmed how we will use Iceberg’s API along with DuckDB to be able to create mild weight although highly effective knowledge purposes that may run environment friendly queries on large tables. We noticed that by utilizing the API uncovered by Iceberg, we will basically create an “optimized” question, which solely scans the recordsdata related for the question. The convenience wherein DuckDB can be utilized to run queries over parquet recordsdata saved in cloud storage makes the mixture of the 2 an especially highly effective one, and highlights the potential within the Iceberg desk format and its options.

The total supply code is accessible right here

I Hope this will likely be helpful !

* Assets

An excellent intro to Iceberg may be discovered right here and right here

Some extra targeted details about the Java API may be discovered right here

** All pictures, except in any other case famous, are by the writer

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments