From 72fd1ffba5c80af88385aad4e11ae30b6d6e4f20 Mon Sep 17 00:00:00 2001 From: Anna Saiapina Date: Mon, 9 Sep 2024 08:51:55 +0000 Subject: [PATCH] Pipeline for agent status data --- .../agent_status_data_freshness.sqlx | 9 ++++++ definitions/sources/agent_status.sqlx | 6 ++++ definitions/staging/stg_agent_status.sqlx | 30 +++++++++++++++++++ 3 files changed, 45 insertions(+) create mode 100644 definitions/assertions/agent_status_data_freshness.sqlx create mode 100644 definitions/sources/agent_status.sqlx create mode 100644 definitions/staging/stg_agent_status.sqlx diff --git a/definitions/assertions/agent_status_data_freshness.sqlx b/definitions/assertions/agent_status_data_freshness.sqlx new file mode 100644 index 0000000..2684cea --- /dev/null +++ b/definitions/assertions/agent_status_data_freshness.sqlx @@ -0,0 +1,9 @@ +config { + type: "assertion", + tags: ["agent_status"] } + +select * from( +select max(timestamp) max_timestamp +from ${ref("pphe_five9_raw","agent_status_*")} +where _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d',current_date("Europe/Amsterdam"))) +where TIMESTAMP_DIFF(max_timestamp, TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY), hour) < 0 \ No newline at end of file diff --git a/definitions/sources/agent_status.sqlx b/definitions/sources/agent_status.sqlx new file mode 100644 index 0000000..8ed52a8 --- /dev/null +++ b/definitions/sources/agent_status.sqlx @@ -0,0 +1,6 @@ +config { + type: "declaration", + schema: "pphe_five9_raw", + name: "agent_status_*", + description: "" + } \ No newline at end of file diff --git a/definitions/staging/stg_agent_status.sqlx b/definitions/staging/stg_agent_status.sqlx new file mode 100644 index 0000000..f74ba2c --- /dev/null +++ b/definitions/staging/stg_agent_status.sqlx @@ -0,0 +1,30 @@ +config { + type: "incremental", + uniqueKey: ["AGENT_ID", "TIMESTAMP"], + schema: "pphe_five9_stg", + assertions: { + uniqueKey: ["AGENT_ID", "TIMESTAMP"], + nonNull: ["AGENT_ID", "TIMESTAMP"] + }, + dependencies: ["agent_status_data_freshness"], + description: "All history data for agent status" +} + +--Numerates by key for deduplication +with agent_status_deduplicated as ( +select *, + ROW_NUMBER() + OVER (PARTITION BY AGENT_ID, TIMESTAMP) + row_number +from ${ref("pphe_five9_raw","agent_status_*")} +${ when(incremental(), `where _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d',current_date("Europe/Amsterdam"))`) }) --last date table is taken +select AGENT_ID, + AGENT_NAME, + AGENT_START_DATE, + AGENT_GROUP, + STATE, + AGENT_STATE_TIME, + TIMESTAMP, + DATE +from agent_status_deduplicated +where row_number = 1 \ No newline at end of file