hydro.delta#

hydro.delta.scd(delta_table: delta.tables.DeltaTable, source: pyspark.sql.dataframe.DataFrame, keys: list[str] | str, effective_field: str, end_field: str = None, scd_type: int = 2) delta.tables.DeltaTable[source]#

Executes a slowly changing dimensions transformation and merge.

Supports Type 1 and Type 2 SCD.

Parameters
  • delta_table – The target Delta Lake table that is to be updated. See hydro.delta.bootstrap_scd2 if you need to create an SCD2 table.

  • source – The source data that will be used to merge with delta_table

  • keys – Column name(s) that identify unique rows. Can be a single column name as a string, or a list of strings, where order of the list does not matter.

  • effective_field – The name of the existing column that will be used as the “start” or “effective” indicator for a given entity. The column be of any orderable type, including timestamp, date, string, and numeric types.

  • end_field – Only required for type 2. The name of the non-existing column that will be used as the “end” indicator for a given entity. Its type will match the type of effective_field.

  • scd_type – The type of SCD that is to be performed

Returns

The same delta_table

Example

For `SCD Type 2 <https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row>`,

Given a Delta Lake table:

+---+-----------+----------+----------+
| id|   location|      date|  end_date|
+---+-----------+----------+----------+
|  1|      Kochi|2018-01-01|2019-01-01|
|  1|Lake Forest|2019-01-01|      null|
+---+-----------+----------+----------+

And a source DataFrame:

+---+--------+----------+
| id|location|      date|
+---+--------+----------+
|  1|  Irvine|2020-01-01|
|  2|  Venice|2022-01-01|
+---+--------+----------+
import hydro.delta as hd
hd.scd(delta_table, df, ["id"], effective_field="date", end_field="end_date")

Results in:

+---+-----------+----------+----------+
| id|   location|      date|  end_date|
+---+-----------+----------+----------+
|  1|      Kochi|2018-01-01|2019-01-01|
|  1|Lake Forest|2019-01-01|2020-01-01|
|  1|     Irvine|2020-01-01|      null|
|  2|     Venice|2022-01-01|      null|
+---+-----------+----------+----------+

See also

bootstrap_scd2

hydro.delta.bootstrap_scd2(source_df: pyspark.sql.dataframe.DataFrame, keys: list[str] | str, effective_field: str, end_field: str, table_properties: dict[str, str] = None, partition_columns: list[str] = None, comment: str = None, path: str = None, table_identifier: str = None) delta.tables.DeltaTable[source]#

Creates an SCD2-ready Delta Lake table from a source DataFrame.

Parameters
  • source_df – Source data that will populate the final Delta Lake table

  • keys – Column name(s) that identify unique rows. Can be a single column name as a string, or a list of strings, where order of the list does not matter.

  • effective_field – The name of the existing column that will be used as the “start” or “effective” timestamp for a given entity. The column be of any orderable type, including timestamp, date, string, and numeric types.

  • end_field – The name of the non-existing column that will be used as the “end” timestamp for a given entity. Its type will match the type of effective_field.

  • table_properties – A set of [Delta Lake table properties](https://docs.delta.io/latest/table-properties.html) or custom properties.

  • partition_columns – A set of column names that will be used to partition the resulting Delta Lake table.

  • comment – Comment that describes the table.

  • path – Specify the path to the directory where table data is stored, which could be a path on distributed storage.

  • table_identifier – The table name. Optionally qualified with a database name [catalog_name.] [database_name.] table_name.

Returns

The resulting DeltaTable object

Example

Given a DataFrame:

+---+-----------+----------+
| id|   location|      date|
+---+-----------+----------+
|  1|      Kochi|2018-01-01|
|  1|Lake Forest|2019-01-01|
+---+-----------+----------+

Run bootstrap_scd2:

import hydro.delta as hd
delta_table = hd.bootstrap_scd2(
    df,
    keys=["id"],
    effective_field="date",
    end_field="end_date",
    path="/path/to/delta/table",
)

Results in the below data being persisted to the given path, in this case, /path/to/delta/table

+---+-----------+----------+----------+
| id|   location|      date|  end_date|
+---+-----------+----------+----------+
|  1|      Kochi|2018-01-01|2019-01-01|
|  1|Lake Forest|2019-01-01|      null|
+---+-----------+----------+----------+

See also

scd

hydro.delta.partial_update_set(delta_frame: pyspark.sql.dataframe.DataFrame | delta.tables.DeltaTable, source_alias: str = 'source', target_alias: str = 'target') pyspark.sql.functions.col[source]#

Generates an update set for a Delta Lake MERGE operation where the source data provides partial updates.

Partial updates in this case are when some columns in the data are NULL, but are meant to be non-destructive, or is there no semantic meaning to the NULLs.

Parameters
  • delta_frame – A DeltaTable or DataFrame that describes a source MERGE dataframe

  • source_alias – A temporary name given to the source data of the MERGE

  • target_alias – A temporary name given to the target Delta Table of the MERGE

Returns

A dictionary that describes non-destructive updates for all fields in delta_frame in {key: coalesce(source.key, target.key)} form

Example

Example

Given a Delta Lake table:

+---+----------+------+
| id|  location|status|
+---+----------+------+
|  1|california|  null|
+---+----------+------+

And some source data that will partially update the Delta Lake table:

+---+--------+------+
| id|location|status|
+---+--------+------+
|  1|    null|active|
+---+--------+------+

Perform the following MERGE:

import hydro.delta as hd
delta_table.alias("target").merge(
    df.alias("source"), "source.id = target.id"
).whenNotMatchedInsertAll().whenMatchedUpdate(set=hd.partial_update_set(df)).execute()

With the resulting Delta Lake table:

+---+----------+------+
| id|  location|status|
+---+----------+------+
|  1|california|active|
+---+----------+------+
hydro.delta.file_stats(delta_table: delta.tables.DeltaTable) pyspark.sql.dataframe.DataFrame[source]#

Returns detailed information about the files of the current snapshot of a Delta Lake table, including (per-file):

  • name of file

  • size of file

  • partition values

  • modification time

  • is data change

  • statistics (min, max, and null counts)

  • tags

This is done by scanning the table’s transaction log, so it is fast, cheap, and scalable.

Returns

A DataFrame that describes the physical files that compose a given Delta Lake table

hydro.delta.partition_stats(delta_table: delta.tables.DeltaTable) pyspark.sql.dataframe.DataFrame[source]#

Returns detailed information about the partitions of the current snapshot of a Delta Lake table including (per-partition):

  • total size in bytes

  • file byte size quantiles (0, .25, .5, .75, 1.0) - where 0 is min and 1.0 is max

  • total number of records

  • total number of files

  • oldest and newest timestamps

This is done by scanning the table’s transaction log, so it is fast, cheap, and scalable.

Returns

A DataFrame that describes the size of all partitions in the table

hydro.delta.zordering_stats(delta_table: delta.tables.DeltaTable) pyspark.sql.dataframe.DataFrame[source]#

Returns a DataFrame that describes the Z-Ordering that has been applied to the table.

The resulting DataFrame has schema of zOrderBy, count.

Returns

A DataFrame with schema zOrderBy, count.

hydro.delta.detail(delta_table: delta.tables.DeltaTable) dict[str, Any][source]#

Returns details about a Delta Lake table including:

  • table created timestamp

  • description

  • table format

  • table id

  • table last modified

  • location

  • minimum reader version

  • minimum writer version

  • table name

  • total number of records of current snapshot

  • total number of files of current snapshot

  • partition columns

  • total number of partitions of the current snapshot

  • properties

  • total data size of the current snapshot

  • percentage collected stats of the current snapshot

  • snapshot version

Returns

A dictionary representing enhanced details of a Delta Lake table

hydro.delta.summarize_all_files(delta_table: delta.tables.DeltaTable, humanize: bool = True) dict[str, str][source]#

Lists and summarizes all of the contents of a Delta Lake table’s data directory.

The directory will contain:

  • data files that are part of the current snapshot

  • data files that are “tombstoned” and not part of the current snapshot

Returns summary statistics including:

  • the total number of files

  • the total size of the files

  • the oldest timestamp of the files

Param

humanize: Whether or not the results should be made more easily read by humans. Turn this to False if you’re looking to do calculations on the raw metrics.

Returns

A dictionary containing summary statistics about all of the data files under the Delta Lake table’s location

hydro.delta.idempotency_markers(delta_table: delta.tables.DeltaTable) str[source]#

Exposes a Delta Lake table’s idempotency markers, i.e txnAppId and txnVersion.

Currently this returns a string representation of a Scala map.

Returns

A string that represents a Scala map of the idempotency markers. This map is in the form Map(key-> value) where key is a given txnAppId and value is the associated version.