class ElementaryReporter:
def __init__(self, api_key, env_id, base_url):
self.client = ElementaryClient(
api_key=api_key,
base_url=base_url,
env_id=env_id
)
self.project = "my-project"
def report_table(self, table_info):
"""Report a table asset"""
table_asset = TableAsset(
id=f"{table_info.database}.{table_info.schema}.{table_info.table}",
name=table_info.table,
database_name=table_info.database,
schema_name=table_info.schema,
table_name=table_info.table,
db_type=table_info.db_type,
description=table_info.description,
owners=table_info.owners,
tags=table_info.tags
)
self._send([table_asset])
def report_test_result(self, test_id, result):
"""Report a test execution result"""
execution = TestExecution(
id=f"{test_id}_exec_{int(result.timestamp.timestamp())}",
test_id=test_id,
test_sub_unique_id=test_id,
sub_type=result.test_type,
status=result.status,
start_time=result.timestamp,
failure_count=result.failure_count,
duration_seconds=result.duration
)
self._send([execution])
def _send(self, objects):
"""Internal method to send objects"""
request = ElementaryCloudIngestRequest(
project=self.project,
timestamp=datetime.now(timezone.utc),
objects=objects
)
self.client.ingest(request)