hydro.spark#

hydro.spark.fields(df: pyspark.sql.dataframe.DataFrame) list[str][source]#

Returns a list of names of all fields of a DataFrame, including nested ones.

Parameters

df – DataFrame that you want to extract all fields from

Returns

A list of column names, all strings

Example

import hydro.spark as hs
df = spark.range(1)
hs.fields(df)
['id']
hydro.spark.fields_with_types(df: pyspark.sql.dataframe.DataFrame) list[tuple[str, pyspark.sql.types.DataType]][source]#

Returns a list of tuples of names and types of all fields of a DataFrame, including nested ones.

Parameters

df – DataFrame that you want to extract all fields and types from

Returns

A list of tuples of (column_name, type)

Example

import hydro.spark as hs
df = spark.range(1)
hs.fields_with_types(df)
[('id', LongType())]
hydro.spark.deduplicate_dataframe(df: pyspark.sql.dataframe.DataFrame, keys: list[str] | str = None, tiebreaking_columns: list[str] = None) pyspark.sql.dataframe.DataFrame[source]#

Removes duplicates from a Spark DataFrame.

Parameters
  • df – The target Delta Lake table that contains duplicates.

  • keys – A list of column names used to distinguish rows. The order of this list does not matter. If not provided, will operate the same as Dataframe.drop_duplicates()

  • tiebreaking_columns – A list of column names used for ordering. The order of this list matters, with earlier elements “weighing” more than lesser ones. The columns will be evaluated in descending order. If not provided, will will operate the same as Dataframe.drop_duplicates(keys)

Returns

The deduplicated DataFrame

Example

Given an input DataFrame

+---+-----+-----------------------+
|id |type |ts                     |
+---+-----+-----------------------+
|1  |watch|2023-01-09 15:48:00.000|
|1  |watch|2023-01-09 15:48:00.001|
+---+-----+-----------------------+

There are two events with the same primary key id, but with a slightly different timestamp. Two rows should not share a primary key. A common way of dealing with this is to break the “tie” based on some orderable column(s).

This can be done with the following code:

import hydro.spark as hs
data  = [{"id": 1, "type": "watch", "ts": "2023-01-09 15:48:00.001"}, {"id": 1, "type": "watch", "ts": "2023-01-09 15:48:00.001"}]
df = spark.createDataFrame(data)
hs.deduplicate_dataframe(df, ["id"], ["ts"])

Results in

+---+-----------------------+-----+
|id |ts                     |type |
+---+-----------------------+-----+
|1  |2023-01-09 15:48:00.001|watch|
+---+-----------------------+-----+
hydro.spark.hash_fields(df: pyspark.sql.dataframe.DataFrame, denylist_fields: list[str] = None, algorithm: str = 'xxhash64', num_bits=256) pyspark.sql.column.Column[source]#

Generates a hash digest of all fields.

Parameters
  • df – Input dataframe that is to be hashed.

  • denylist_fields – Fields that will not be hashed.

  • algorithm

    The function that is used to generate the hash digest, includes:

    • xxhash64 (default) pyspark.sql.functions.xxhash64

    • md5 pyspark.sql.functions.md5

    • sha1 pyspark.sql.functions.sha1

    • sha2 pyspark.sql.functions.sha2

    • hash pyspark.sql.functions.hash

  • num_bits – Only for sha2. The desired bit length of the result.

Returns

A column that represents the hash.

Example

Given an input DataFrame

+---+---+-----+
| id| ts| type|
+---+---+-----+
|  1|  1|watch|
|  1|  1|watch|
|  1|  2|watch|
+---+---+-----+

Row hashes are very helpful and convenient when trying to compare rows, especially rows with a lot of columns.

import hydro.spark as hs
data  = [{"id": 1, "type": "watch", "ts": "1"}, {"id": 1, "type": "watch", "ts": "1"}, {"id": 1, "type": "watch", "ts": "2"}]
df = spark.createDataFrame(data)
df.withColumn("row_hash", hs.hash_fields(df))

Results in

+---+-----+---+-------------------+
| id| type| ts|           row_hash|
+---+-----+---+-------------------+
|  1|watch|  1|8553228534919528539|
|  1|watch|  1|8553228534919528539|
|  1|watch|  2|8916583907181700702|
+---+-----+---+-------------------+

The rows with identical content have identical hash values. Rows with different content have different hash values.

This is very helpful when comparing rows. Instead of typing col1 = col1 AND col2 = col2 …, hash = hash can be used. This saves developer time and keystrokes.

hydro.spark.hash_schema(df: pyspark.sql.dataframe.DataFrame, denylist_fields: list[str] = None) pyspark.sql.column.Column[source]#

Generates a hash digest of a DataFrame’s schema. Uses the hashlib.md5 algorithm.

Parameters
  • df – Input dataframe whose schema is to be hashed.

  • denylist_fields – Fields that will not be hashed.

Returns

A column that represents the hash.

Example

import hydro.spark as hs
df = spark.range(1)
df.withColumn("schema_hash", hs.hash_schema(df))
+---+--------------------------------+
|id |schema_hash                     |
+---+--------------------------------+
|0  |b80bb7740288fda1f201890375a60c8f|
+---+--------------------------------+

The schema of tables can change, and sometimes it is helpful to be able to determine if a table’s schema has changed.

hydro.spark.map_fields(df: pyspark.sql.dataframe.DataFrame, field_list: list[str], function: Callable) pyspark.sql.dataframe.DataFrame[source]#

Applies a function function over fields that are specified in a list.

Parameters
  • df

  • field_list – A list of fields the function is to be applied to.

  • function – Any pyspark.sql.function or lambda function that takes a column.

Returns

Example

data = [{"empty": "   ", "ibm": "   🥴  ", "other": "thing"}]
df = spark.createDataFrame(data)
df.show()
+-----+-------+-----+
|empty|    ibm|other|
+-----+-------+-----+
|     |   🥴  |thing|
+-----+-------+-----+
hs.map_fields(df, ["empty", "ibm"], F.trim)
+-----+---+-----+
|empty|ibm|other|
+-----+---+-----+
|     | 🥴|thing|
+-----+---+-----+

A lambda function can be used to compose functions:

hs.map_fields(df, ["empty", "ibm"], lambda x: F.when(F.trim(x) == F.lit(""), None).otherwise(F.trim(x)))
+-----+---+-----+
|empty|ibm|other|
+-----+---+-----+
| null| 🥴|thing|
+-----+---+-----+

F.expr() can be used via interpolation with f-strings (it’s a little hard to read and write, though):

hs.map_fields(df, ["empty", "ibm"], lambda x: F.expr(f"nullif(trim({x}), '')"))
+-----+---+-----+
|empty|ibm|other|
+-----+---+-----+
| null| 🥴|thing|
+-----+---+-----+
hydro.spark.map_fields_by_regex(df: pyspark.sql.dataframe.DataFrame, regex: str, function: Callable) pyspark.sql.dataframe.DataFrame[source]#

Applies a function function over fields that match a regular expression.

Parameters
  • df

  • regex – Regular expression pattern. Uses Python’s re module.

  • function – Any pyspark.sql.function or lambda function that takes a column.

Returns

Resulting DataFrame

See also

map_fields

hydro.spark.map_fields_by_type(df: pyspark.sql.dataframe.DataFrame, target_type: pyspark.sql.types.DataType, function: Callable) pyspark.sql.dataframe.DataFrame[source]#

Applies a function function over fields that have a target type.

Parameters
  • df

  • target_type

  • function – Any pyspark.sql.function or lambda function that takes a column.

Returns

Resulting DataFrame

See also

map_fields

hydro.spark.select_fields_by_type(df: pyspark.sql.dataframe.DataFrame, target_type: pyspark.sql.types.DataType)[source]#

Selects fields according to a provided regular expression pattern. Works with nested fields (but un-nests them)

Parameters
  • df

  • target_type – A DataType type that is to be selected.

Returns

A new DataFrame with the selected fields

Example

data = [{"string1": "one", "int": 1, "string2": "two"}]
df = spark.createDataFrame(data)
import hydro.spark as hs
import pyspark.sql.types as T
hs.select_fields_by_type(df, T.StringType())
+-------+-------+
|string1|string2|
+-------+-------+
|    one|    two|
+-------+-------+
hydro.spark.select_fields_by_regex(df: pyspark.sql.dataframe.DataFrame, regex: str) pyspark.sql.dataframe.DataFrame[source]#

Selects fields according to a provided regular expression pattern. Works with nested fields (but un-nests them)

Parameters
  • df

  • regex – Regular expression pattern. Uses Python’s re module.

Returns

A new DataFrame with the selected fields

data = [{"string1": "one", "int": 1, "string2": "two"}]
df = spark.createDataFrame(data)
import hydro.spark as hs
import pyspark.sql.types as T
hs.select_fields_by_regex(df, "string.*")
+-------+-------+
|string1|string2|
+-------+-------+
|    one|    two|
+-------+-------+
hydro.spark.drop_fields(df: pyspark.sql.dataframe.DataFrame, fields_to_drop: list[str]) pyspark.sql.dataframe.DataFrame[source]#

Drops a DataFrame’s fields, including nested fields and top-level columns.

Parameters
  • df

  • fields_to_drop – A list of field names that are to be dropped

Returns

A new DataFrame without the specified fields

Example

# This is a silly way of creating a nested DataFrame, it's here for brevity
row = Row(nest=Row(key="val", society="spectacle"))
df = spark.createDataFrame([row])
df.printSchema()

And here is the schema:

root
 |-- nest: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- society: string (nullable = true)

Using hydro, we can simply drop nested fields.

import hydro.spark as hs
hs.drop_fields(df, ["nest.key"])

With the resulting schema:

root
 |-- nest: struct (nullable = true)
 |    |-- society: string (nullable = true)
hydro.spark.infer_json_field(df: pyspark.sql.dataframe.DataFrame, target_field: str, options: dict[str, str] = None) pyspark.sql.types.StructType[source]#

Parses a JSON string and infers its schema.

Parameters
  • df

  • target_field – A field that contains JSON strings that are to be inferred.

  • options – Standard JSON reader options, including header. See :class:pyspark.sql.DataFrameReader.json

Returns

The inferred StructType

Example

data = [{"id": 1, "payload": '{"salt": "white"}'}, {"id": 2, "payload": '{"pepper": "black"}'}]
df = spark.createDataFrame(data)
df.show()

Looks like:

+---+-------------------+
| id|            payload|
+---+-------------------+
|  1|  {"salt": "white"}|
|  2|{"pepper": "black"}|
+---+-------------------+

But there’s a problem, our schema doesn’t include salt and pepper.

root
 |-- id: long (nullable = true)
 |-- payload: string (nullable = true)

We can use hydro to fix this:

import hydro.spark as hs
schema = hs.infer_json_field(df, "payload")

And use the resulting schema to parse the fields:

df.withColumn("payload", F.from_json("payload", schema))

With the new schema:

root
 |-- id: long (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- pepper: string (nullable = true)
 |    |-- salt: string (nullable = true)

Notice how there are separate fields for salt and pepper. And now these are addressable leaf nodes.

hydro.spark.infer_csv_field(df: pyspark.sql.dataframe.DataFrame, target_field: str, options: dict[str, str] = None) pyspark.sql.types.StructType[source]#

Parses a CSV string and infers its schema.

Parameters
  • df

  • target_field – A field that contains CSV strings that are to be inferred.

  • options – Standard CSV reader options, including header. See :class:pyspark.sql.DataFrameReader.csv

Returns

The inferred StructType

Example

data = {
    'id': 1, 'payload': '''type,date
    watch,2023-01-09
    watch,2023-01-10
''',
}
df = spark.createDataFrame([data])
df.show()
+---+-----------------------------------------------+
|id |payload                                        |
+---+-----------------------------------------------+
|1  |type,date\nwatch,2023-01-09\nwatch,2023-01-10\n|
+---+-----------------------------------------------+
import hydro.spark as hs
csv_options = {'header': 'True'}
schema = hs.infer_csv_field(df, 'payload', options=csv_options)
schema.simpleString()
'struct<type:string,date:string>'