--- a/tests/test_dag_integrity.py +++ b/tests/test_dag_integrity.py @@ -44,3 +44,79 @@ assert dag.catchup is False, ( f"{dag_id} has catchup=True — set catchup=False or justify in docstring" ) + + +# ── daily_product_usage tests ─────────────────────────────────────────── + + +def test_product_usage_dag_exists(dagbag): + dag = dagbag.get_dag("daily_product_usage") + assert dag is not None + + +def test_product_usage_dag_schedule(dagbag): + dag = dagbag.get_dag("daily_product_usage") + assert dag.schedule_interval == "0 6 * * *" + + +def test_product_usage_dag_tags(dagbag): + dag = dagbag.get_dag("daily_product_usage") + assert "product-usage" in dag.tags + assert "exec-metrics" in dag.tags + assert "etl" in dag.tags + + +def test_product_usage_dag_max_active_runs(dagbag): + dag = dagbag.get_dag("daily_product_usage") + assert dag.max_active_runs == 1 + + +def test_product_usage_task_count(dagbag): + dag = dagbag.get_dag("daily_product_usage") + # start, wait_for_partition, validate, 2 transforms, 2 loads, notify, end + assert len(dag.tasks) == 9 + + +def test_product_usage_has_expected_tasks(dagbag): + dag = dagbag.get_dag("daily_product_usage") + task_ids = {t.task_id for t in dag.tasks} + expected = { + "start", + "wait_for_partition", + "validate_partition", + "transform_clean_events", + "transform_daily_aggregates", + "load_clean_events", + "load_daily_aggregates", + "notify_success", + "end", + } + assert expected == task_ids + + +def test_product_usage_sensor_uses_reschedule(dagbag): + dag = dagbag.get_dag("daily_product_usage") + sensor = dag.get_task("wait_for_partition") + assert sensor.mode == "reschedule" + + +def test_product_usage_dependency_chain(dagbag): + """Validate critical dependency ordering.""" + dag = dagbag.get_dag("daily_product_usage") + + def upstream_ids(task_id): + return {t.task_id for t in dag.get_task(task_id).upstream_list} + + def downstream_ids(task_id): + return {t.task_id for t in dag.get_task(task_id).downstream_list} + + # Sensor must follow start + assert "start" in upstream_ids("wait_for_partition") + # Validate must follow sensor + assert "wait_for_partition" in upstream_ids("validate_partition") + # Transforms follow validation + assert "validate_partition" in upstream_ids("transform_clean_events") + # Loads exist downstream of transforms + assert "load_clean_events" in downstream_ids("transform_clean_events") + # Notify precedes end + assert "notify_success" in upstream_ids("end")