from elementary_python_sdk.core.cloud.cloud_client import ElementaryCloudClient
from elementary_python_sdk.core.tests import (
boolean_test,
elementary_test_context,
expected_range,
)
from elementary_python_sdk.core.types.asset import TableAsset
import pandas as pd
@boolean_test(
name="unique_ids",
description="All user IDs must be unique",
severity="ERROR",
)
def test_unique_ids(df: pd.DataFrame) -> bool:
return len(df["id"]) == len(df["id"].unique())
@expected_range(
name="average_age",
min=18,
max=50,
severity="ERROR",
)
def test_average_age(df: pd.DataFrame) -> float:
return df["age"].mean()
# Define your asset
asset = TableAsset(
name="users",
database_name="prod",
schema_name="public",
table_name="users"
)
# Run tests within context
with elementary_test_context(asset=asset) as ctx:
users_df = pd.DataFrame({"id": [1, 2, 3], "age": [25, 30, 35]})
# Run tests - results are automatically captured
test_unique_ids(users_df)
test_average_age(users_df)
# Send results to Elementary Cloud
PROJECT_ID = "my-python-project" # Your Python project identifier (used to deduplicate and identify assets)
API_KEY = "your-api-key"
URL = "https://app.elementary-data.com/sdk-ingest/{env_id}/batch"
client = ElementaryCloudClient(PROJECT_ID, API_KEY, URL)
client.send_to_cloud(ctx)