Skip to main content
Follow these best practices to get the most out of the Elementary Python SDK and ensure reliable data quality monitoring.

Object ID Management

Use Consistent ID Formats

Use a consistent format for all object IDs across your application:
# Good: Consistent format
table_id = f"{database}.{schema}.{table}"
test_id = f"{table_id}_{test_name}"
execution_id = f"{test_id}_exec_{timestamp}"

# Bad: Inconsistent formats
table_id = f"{table}"  # Missing database and schema
test_id = f"test_{random_id}"  # Random IDs are hard to track

Make IDs Deterministic

Generate IDs deterministically so the same object always has the same ID:
# Good: Deterministic ID
table_id = f"{database}.{schema}.{table}"

# Bad: Non-deterministic ID
table_id = f"table_{uuid.uuid4()}"  # Changes every time

Data Freshness

Send Timestamps Accurately

Always use UTC timestamps and include timezone information:
from datetime import datetime, timezone

# Good: UTC timezone
timestamp = datetime.now(timezone.utc)

# Bad: Local timezone without conversion
timestamp = datetime.now()  # May not be UTC

Update Objects Regularly

Send updated objects when metadata changes:
# When table description changes, send updated table asset
updated_table_asset = TableAsset(
    id=table_id,
    name=table_name,
    # ... other fields ...
    description="Updated description",  # New description
    # ... other fields ...
)

Test Execution Reporting

Report All Test Runs

Send test execution results for both passing and failing tests:
# Good: Report all results
if test_passed:
    status = TestExecutionStatus.PASS
else:
    status = TestExecutionStatus.FAIL

test_execution = TestExecution(
    # ... fields ...
    status=status,
    failure_count=failure_count
)

Include Detailed Failure Information

For failed tests, include as much detail as possible:
test_execution = TestExecution(
    # ... required fields ...
    status=TestExecutionStatus.FAIL,
    failure_count=failed_row_count,
    description=f"Found {failed_row_count} rows that failed validation",
    code=test_query,  # Include the query that was run
    exception=error_message if error_occurred else None,
    traceback=traceback_string if error_occurred else None
)

Error Handling

Implement Robust Error Handling

Always wrap SDK calls in try-except blocks:
try:
    client.ingest(request)
except ElementaryAPIError as e:
    logger.error(f"API error: {e.status_code} - {e.message}")
    # Handle API errors appropriately
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    # Handle unexpected errors

Use Retry Logic

Implement retry logic for transient failures:
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def send_with_retry(client, request):
    client.ingest(request)

Performance Optimization

Batch Objects Efficiently

Collect objects and send them in batches:
# Good: Batch multiple objects
objects = []
for table in tables:
    objects.append(create_table_asset(table))
    objects.extend(create_tests_for_table(table))
    objects.extend(get_recent_executions(table))

request = ElementaryCloudIngestRequest(
    project="my-project",
    timestamp=datetime.now(timezone.utc),
    objects=objects
)
client.ingest(request)

Avoid Redundant Sends

Don’t send the same data multiple times unnecessarily:
# Good: Track what's been sent
sent_executions = set()

for execution in new_executions:
    if execution.id not in sent_executions:
        objects.append(execution)
        sent_executions.add(execution.id)

Security

Protect API Keys

Never hardcode API keys or commit them to version control:
# Good: Use environment variables
import os
api_key = os.getenv("ELEMENTARY_API_KEY")

# Bad: Hardcoded key
api_key = "sk-1234567890abcdef"  # Never do this!

Use Secrets Management

For production, use proper secrets management:
# Example with AWS Secrets Manager
import boto3

def get_api_key():
    client = boto3.client('secretsmanager')
    response = client.get_secret_value(SecretId='elementary/api-key')
    return response['SecretString']

Monitoring and Observability

Log SDK Operations

Log important SDK operations for debugging:
import logging

logger = logging.getLogger(__name__)

try:
    client.ingest(request)
    logger.info(f"Successfully sent {len(request.objects)} objects")
except Exception as e:
    logger.error(f"Failed to send objects: {e}", exc_info=True)

Track Metrics

Monitor SDK usage and performance:
import time
from datetime import datetime, timezone

start_time = time.time()
try:
    client.ingest(request)
    duration = time.time() - start_time
    logger.info(f"Ingest completed in {duration:.2f}s")
except Exception as e:
    duration = time.time() - start_time
    logger.error(f"Ingest failed after {duration:.2f}s: {e}")

Code Organization

Create Helper Functions

Organize SDK usage into reusable functions:
class ElementaryReporter:
    def __init__(self, api_key, env_id, base_url):
        self.client = ElementaryClient(
            api_key=api_key,
            base_url=base_url,
            env_id=env_id
        )
        self.project = "my-project"
    
    def report_table(self, table_info):
        """Report a table asset"""
        table_asset = TableAsset(
            id=f"{table_info.database}.{table_info.schema}.{table_info.table}",
            name=table_info.table,
            database_name=table_info.database,
            schema_name=table_info.schema,
            table_name=table_info.table,
            db_type=table_info.db_type,
            description=table_info.description,
            owners=table_info.owners,
            tags=table_info.tags
        )
        self._send([table_asset])
    
    def report_test_result(self, test_id, result):
        """Report a test execution result"""
        execution = TestExecution(
            id=f"{test_id}_exec_{int(result.timestamp.timestamp())}",
            test_id=test_id,
            test_sub_unique_id=test_id,
            sub_type=result.test_type,
            status=result.status,
            start_time=result.timestamp,
            failure_count=result.failure_count,
            duration_seconds=result.duration
        )
        self._send([execution])
    
    def _send(self, objects):
        """Internal method to send objects"""
        request = ElementaryCloudIngestRequest(
            project=self.project,
            timestamp=datetime.now(timezone.utc),
            objects=objects
        )
        self.client.ingest(request)