elementary.python
that allows users to run Python code and validate
their data.
macros/<macro>.sql
that contains the Python code the test will execute.
code_macro
argument that is passed to the test.
The macro needs to contain a def test(model_df, ref, session)
function with the test’s logic.
A test function receives the following parameters:
model_df
: The model’s DataFrame
object.ref
: A function that allows you to ref to other resources - ref('my_other_model')
.session
: A Session
object,
either Snowpark
or PySpark.model_df
when testing a model.
We’ll show examples below for the usage of the other parameters.
You can return either of the following data types:
DataFrame
: Either a Spark or a Pandas DataFrame
.
The test’s failed results count will be the amount of rows in the DataFrame
.int
(number): The amount of failed results.bool
: Whether the test passed (True
) or failed (False
).ORDER_STATUS
column is not delivered
and returns the
new DataFrame
. Let’s move on to demonstrate Python’s true power when it comes to testing.
macro_args
.
This argument allows us to make our tests generic,
meaning that we can simply change the other_table
field and the compared table will be different without ever touching
the Python code.
Here’s the actual implementation.
macro
itself receives an args
which is the macro_args
that we passed in the test’s definition.
We’re using ref('{{ args.other_table }}')
in order to get the other table, just like we would with SQL.
Afterwards, we return a new Pandas DataFrame
that holds the difference between the two tables.
If the DataFrame
is not empty, the test will fail with the amount of failed results equal to the amount of rows.
country
that is provided in the geolocation_json
column in the login_events
model is a string
.
packages
argument which lists additional PyPI Python packages that are
required for the code to work.
Here, we’ve added jsonschema
to our requirements. Using the macro_args
, we’re specifying the JSON schema that we
want to test against, and the column name.
In our validate_json
macro, we’ll do the formatting accordingly to our Python code.
The code itself iterates over the column’s rows, loads the rows into a JSON and validates them.
If it failed to either load the row into a JSON or that it was out of schema, it’ll mark the row as invalid.
At the end, we return the amount of invalid rows. IN this example we’re returning an int
compared to the other in
which we returned a DataFrame
.
elementary.python
dbt test:
code_macro: str
: Name of the macro that returns the Python code.macro_args: dict
: Arguments that are supplied to the macro - {{ code_macro(macro_args) }}
.where_expression: str
: SQL WHERE clause to limit the test scope.packages: list[str]
: PyPI package requirements.submission_method: str
: cluster
or serverless
for BigQuery.def test(model_df, ref, session)
Python function:
DataFrame
: Either a Spark or a Pandas DataFrame
.
The test’s failed results count will be the amount of rows in the DataFrame
.int
: The amount of failed results.bool
: Whether the test passed (True
) or failed (False
).