Extenders, UDFs, and Parsers¶
This reference explains how to extend Fabricks with custom Python code and reusable SQL assets: - Extenders: Python functions that transform a Spark DataFrame before it is written. - UDFs: User-defined functions you register on the Spark session and use in SQL. - Parsers: Source-specific readers/cleaners that return a DataFrame (optional, advanced).
Use these to encapsulate business logic, reuse patterns, and keep SQL jobs focused and readable.
Extenders¶
Extenders are Python functions that take a DataFrame and return a transformed DataFrame. They are applied during job execution (typically in Silver/Gold) right before write.
- Location: Put Python modules under your runtime, for example
fabricks/extenders
. - Referencing: In job YAML use
extender: name
and optionallyextender_options: { ... }
to pass arguments.
Example (adds a country
column):
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
from fabricks.core.extenders import extender
@extender(name="add_country")
def add_country(df: DataFrame, **kwargs) -> DataFrame:
return df.withColumn("country", lit(kwargs.get("country", "Unknown")))
In a job:
- job:
step: silver
topic: sales_analytics
item: daily_summary
options:
mode: update
extender: add_country
extender_options:
country: CH
Notes: - Extenders should be idempotent and fast. - Avoid heavy I/O; handle source reading in Bronze or via a Parser. - Prefer small, composable transformations.
UDFs¶
UDFs are registered on the Spark session and then callable from SQL. Place the UDF registration code in your runtime (e.g., fabricks/udfs
), and ensure it runs before jobs that need it (for example during initialization or via a notebook hook).
Example (simple addition UDF):
from pyspark.sql import SparkSession
from fabricks.core.udfs import udf
@udf(name="addition")
def addition(spark: SparkSession):
def _add(a: int, b: int) -> int:
return a + b
spark.udf.register("udf_addition", _add)
Using built-in helpers and a custom UDF in SQL:
select
udf_identity(s.id, id) as __identity,
udf_key(array(s.id)) as __key,
s.id as id,
s.name as monarch,
udf_addition(1, 2) as addition
from silver.monarch_scd1__current s
Notes: - UDFs should validate inputs and avoid side-effects. - Prefer Spark SQL built-ins and DataFrame functions where possible for performance.
Parsers¶
Parsers read and lightly clean raw data. They return a DataFrame and should not write output or mutate state.
What a parser should do:
- Read raw data from data_path
(batch or stream based on stream
flag)
- Optionally apply/validate a schema from schema_path
- Perform light, source-specific cleanup (drops/renames/type fixes)
- Return a Spark DataFrame (no writes, no side effects)
Inputs:
- data_path
: source location (directory or file)
- schema_path
: optional schema location (e.g., JSON/DDL)
- spark
: active SparkSession
- stream
: when true, prefer readStream
where supported
Reference in a job:
- job:
step: bronze
topic: demo
item: source
options:
mode: append
uri: /mnt/demo/raw/demo
parser: monarch
parser_options:
file_format: parquet
read_options:
mergeSchema: true
Example implementation:
from pyspark.sql import DataFrame, SparkSession
from fabricks.core.parsers import parser
from fabricks.utils.path import Path
@parser(name="monarch")
class MonarchParser:
def parse(self, data_path: Path, schema_path: Path, spark: SparkSession, stream: bool) -> DataFrame:
df = spark.read.parquet(data_path.string)
cols_to_drop = [c for c in df.columns if c.startswith("BEL_")]
if cols_to_drop:
df = df.drop(*cols_to_drop)
return df
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
from fabricks.core.extenders import extender
@extender(name="add_country")
def add_country(df: DataFrame, **kwargs) -> DataFrame:
return df.withColumn("country", lit(kwargs.get("country")))
Related¶
- Steps: Bronze • Silver • Gold
- Reference: Checks & Data Quality • Table Options
- Runtime: Runtime Configuration