Python in dbt

In dbt v1.3.0, Python models were introduced in order to solve use cases that you can’t solve with SQL. Using Python allows you to perform analyses using tools available in the open source Python ecosystem, including state-of-the-art packages for data science and statistics. Before, you would have needed separate infrastructure and orchestration to run Python.

For more information, review the Python models documentation.

What about tests?

Just like with models, we’d also like to be able to test our data using those powerful tools and the convenient Python runtime. Certain problems such as validating JSONs or comparing tables become tremendously easier to solve. Currently, dbt doesn’t provide an out-of-the-box mechanism to run tests using Python. Elementary now provides the ability to do so using dbt’s building blocks.

Elementary Python tests

This is a beta feature, introduced in version 0.5.3 of Elementary’s dbt package.

Elementary introduces a generic dbt test called elementary.python that allows users to run Python code and validate their data.

How does it work?

Let’s start with a basic example and proceed gradually to show the full potential of Python tests.

A Python test is defined like any other dbt test.

models/schema.yml
- name: orders
  tests:
    - elementary.python:
        code_macro: check_undelivered_orders

Then, we need to define a macro under macros/<macro>.sql that contains the Python code the test will execute.

macros/py_check_undelivered_orders.sql
{% macro check_undelivered_orders(args) %}
def test(model_df, ref, session):
    return model_df.filter(model_df['ORDER_STATUS'] != 'delivered')
{% endmacro %}

As you can see, the macro’s name is equivalent to the code_macro argument that is passed to the test. The macro needs to contain a def test(model_df, ref, session) function with the test’s logic.

A test function receives the following parameters:

  1. model_df: The model’s DataFrame object.
  2. ref: A function that allows you to ref to other resources - ref('my_other_model').
  3. session: A Session object, either Snowpark or PySpark.

In practice, you’ll most likely only need the model_df when testing a model. We’ll show examples below for the usage of the other parameters.

You can return either of the following data types:

  • DataFrame: Either a Spark or a Pandas DataFrame. The test’s failed results count will be the amount of rows in the DataFrame.
  • int (number): The amount of failed results.
  • bool: Whether the test passed (True) or failed (False).

In the example above, the test filters out rows that their ORDER_STATUS column is not delivered and returns the new DataFrame. Let’s move on to demonstrate Python’s true power when it comes to testing.

Example use cases

Comparing tables

Let’s compare two different tables, or views, within our warehouse.

models/schema.yml
- name: orders
  tests:
    - elementary.python:
        code_macro: compare_tables
        macro_args:
          other_table: raw_orders

We’re passing an additional argument to the test called macro_args. This argument allows us to make our tests generic, meaning that we can simply change the other_table field and the compared table will be different without ever touching the Python code.

Here’s the actual implementation.

macros/py_compare_tables.sql
{% macro compare_tables(args) %}
import pandas as pd

def test(model_df, ref, session):
    original_table = model_df.toPandas().drop_duplicates()
    other_table = ref('{{ args.other_table }}').toPandas().drop_duplicates()
    return pd.concat([original_table, other_table]).drop_duplicates(keep=False)
{% endmacro %}

Note how the macro itself receives an args which is the macro_args that we passed in the test’s definition. We’re using ref('{{ args.other_table }}') in order to get the other table, just like we would with SQL. Afterwards, we return a new Pandas DataFrame that holds the difference between the two tables. If the DataFrame is not empty, the test will fail with the amount of failed results equal to the amount of rows.

Validating JSONs

In this example we’ll validate a JSON column according to a pre-defined schema.

models/schema.yml
- name: login_events
  tests:
    - elementary.python:
        code_macro: validate_json
        macro_args:
          schema: "{'type': 'object', 'properties': {'country': {'type': 'string'}}}"
          column: geolocation_json
        packages: ["jsonschema"]

Here we’ll be testing that the country that is provided in the geolocation_json column in the login_events model is a string.

macros/py_validate_json.sql
{% macro validate_json(args) %}
import json
import jsonschema

def test(model_df, ref, session):
    model_df = model_df.toPandas()
    invalid_row_count = 0
    for json_row in model_df['{{ args.column }}']:
        try:
            jsonschema.validate(json.loads(json_row), {{ args.schema }})
        except (json.JSONDecodeError, jsonschema.ValidationError):
            invalid_row_count += 1
    return invalid_row_count
{% endmacro %}

In the test’s definition, we’re passing a packages argument which lists additional PyPI Python packages that are required for the code to work. Here, we’ve added jsonschema to our requirements. Using the macro_args, we’re specifying the JSON schema that we want to test against, and the column name. In our validate_json macro, we’ll do the formatting accordingly to our Python code.

The code itself iterates over the column’s rows, loads the rows into a JSON and validates them. If it failed to either load the row into a JSON or that it was out of schema, it’ll mark the row as invalid. At the end, we return the amount of invalid rows. IN this example we’re returning an int compared to the other in which we returned a DataFrame.

API Reference

  • elementary.python dbt test:
    • code_macro: str: Name of the macro that returns the Python code.
    • macro_args: dict: Arguments that are supplied to the macro - {{ code_macro(macro_args) }}.
    • where_expression: str: SQL WHERE clause to limit the test scope.
    • All available configuration in Python models such as:
      • packages: list[str]: PyPI package requirements.
      • submission_method: str: cluster or serverless for BigQuery.
  • def test(model_df, ref, session) Python function:
    • Arguments
      • model_df: DataFrame: The model’s DataFrame object.
      • ref: Function: A function that allows you to ref to other resources - ref('my_other_model').
      • session: Session: A Session object, either Snowpark or PySpark.
    • Returns
      • DataFrame: Either a Spark or a Pandas DataFrame. The test’s failed results count will be the amount of rows in the DataFrame.
      • int: The amount of failed results.
      • bool: Whether the test passed (True) or failed (False).

Setup

Additional setup might be required depending on your warehouse. Given that Elementary’s Python tests base on Python models’ building blocks, the requirements are the same and can be found here .

Beta feature

Running Python within the warehouse is a new feature in dbt and even newer in Elementary. Therefore, there might be some bumps along the road. Please contact us in any of the mediums you find fit, and we’ll be glad to help.