Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ repos:
- types-pytz
- types-pymysql
- types-requests
- repo: https://github.com/tconbeer/sqlfmt
rev: v0.27.0
hooks:
- id: sqlfmt
language_version: python
additional_dependencies: ['.[jinjafmt]']
- repo: https://github.com/sqlfluff/sqlfluff
rev: 3.4.2
hooks:
Expand Down
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ build-backend = "hatchling.build"

[tool.sqlfmt]
line_length = 120
exclude=["target/**/*", "dbt_packages/**/*"]
exclude=[
"target/**/*",
"dbt_packages/**/*",
"src/ol_dbt/models/staging/ovs/stg__ovs__studio__postgres__ui_encodejob.sql"
]

[tool.sqlfluff.core]
templater = "jinja"
Expand Down
34 changes: 12 additions & 22 deletions src/ol_dbt/macros/apply_deduplication_query.sql
Original file line number Diff line number Diff line change
@@ -1,39 +1,29 @@
{% macro deduplicate_query(cte_name1='source', cte_name2='most_recent_source', partition_columns='id') %}
{% macro deduplicate_query(cte_name1="source", cte_name2="most_recent_source", partition_columns="id") %}
,
/*
Add additional queries to handle duplicated data introduced by airbyte sync mode "Incremental Sync - Append"
where cursor is set to a timestamp field. The deduplication logic works like this:
- If there are multiple copies of the same record based on partition_columns, we will use the record from most
recent sync in the source table.
- If there is only one record for the same partition_columns, it will just load that record as it is.
*/
, source_sorted as (
select
*
, row_number() over ( partition by {{ partition_columns }} order by _airbyte_emitted_at desc) as row_num
source_sorted as (
select *, row_number() over (partition by {{ partition_columns }} order by _airbyte_emitted_at desc) as row_num
from {{ cte_name1 }}
)

, {{ cte_name2 }} as (
select *
from source_sorted
where row_num = 1
)
),
{{ cte_name2 }} as (select * from source_sorted where row_num = 1)

{% endmacro %}

{% macro deduplicate_raw_table(order_by='_airbyte_extracted_at', partition_columns='id') %}
{% macro deduplicate_raw_table(order_by="_airbyte_extracted_at", partition_columns="id") %}
,
/*
This dedupe applied to the raw tables via Incremental Sync from Airbyte. It will deduplicate the data based on
the partition_columns and order by columns.
*/
, source_sorted as (
select
*
, row_number() over ( partition by {{ partition_columns }} order by {{ order_by }} desc) as row_num
source_sorted as (
select *, row_number() over (partition by {{ partition_columns }} order by {{ order_by }} desc) as row_num
from source
)
, most_recent_source as (
select * from source_sorted
where row_num = 1
)
),
most_recent_source as (select * from source_sorted where row_num = 1)
{% endmacro %}
8 changes: 4 additions & 4 deletions src/ol_dbt/macros/apply_grants_macro_override.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{% macro apply_grants(relation, grant_config, should_revoke) %}
{% if target.name == 'production' %}
{#-- Only apply grants in production schema --#}
{% if target.name == "production" %}
{#- - Only apply grants in production schema --#}
{{ return(adapter.dispatch("apply_grants", "dbt")(relation, grant_config, should_revoke)) }}
{% else %}
{#-- Bypass code-based grant application in non-prod targets. --#}
{#-- This is so that we don't accidentally expose test tables to end-consumers of data --#}
{#- - Bypass code-based grant application in non-prod targets. --#}
{#- - This is so that we don't accidentally expose test tables to end-consumers of data --#}
{% endif %}
{% endmacro %}
10 changes: 5 additions & 5 deletions src/ol_dbt/macros/cast_date_to_iso8601.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{% macro cast_date_to_iso8601(column_name) %}
case
when try_cast({{ column_name }} AS date) is not null -- Use try_cast to check if the column is already a date
then to_iso8601(try_cast({{ column_name }} AS date))
case
when try_cast({{ column_name }} as date) is not null -- Use try_cast to check if the column is already a date
then to_iso8601(try_cast({{ column_name }} as date))

else to_iso8601(from_iso8601_date(try_cast({{ column_name }} AS varchar))) -- Convert to ISO 8601 from a string
end
else to_iso8601(from_iso8601_date(try_cast({{ column_name }} as varchar))) -- Convert to ISO 8601 from a string
end
{% endmacro %}
12 changes: 6 additions & 6 deletions src/ol_dbt/macros/cast_timestamp_to_iso8601.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{% macro cast_timestamp_to_iso8601(column_name) %}
case
-- If the column is already a timestamp, convert to ISO 8601
when try_cast({{ column_name }} AS timestamp) is not null
then to_iso8601(try_cast({{ column_name }} AS timestamp))
case
-- If the column is already a timestamp, convert to ISO 8601
when try_cast({{ column_name }} as timestamp) is not null
then to_iso8601(try_cast({{ column_name }} as timestamp))
-- otherwise, convert to ISO 8601 from a string
else to_iso8601(from_iso8601_timestamp(try_cast({{ column_name }} AS varchar)))
end
else to_iso8601(from_iso8601_timestamp(try_cast({{ column_name }} as varchar)))
end
{% endmacro %}
74 changes: 36 additions & 38 deletions src/ol_dbt/macros/extract_course_id.sql
Original file line number Diff line number Diff line change
@@ -1,49 +1,47 @@
{% macro extract_course_id_from_tracking_log(course_id_has_old_format=false) %}
---course ID format: {key type}:{org}+{course number}+{run tag} for courses created since Fall 2014
---course ID format: {org}/{course number}/{run tag} for courses created before Fall 2014
---Course number and run tag can be letters, numbers, period, dashes, underscores
-- -course ID format: {key type}:{org}+{course number}+{run tag} for courses created since Fall 2014
-- -course ID format: {org}/{course number}/{run tag} for courses created before Fall 2014
-- -Course number and run tag can be letters, numbers, period, dashes, underscores
{% if course_id_has_old_format %}
{% set course_id_regex = '(([\w\.\-\_]+):([\w\.\-\_]+)[+]([\w\.\-\_]+)[+]([\w\.\-\_]+))|(([\w\.\-\_]+)[/]([\w\.\-\_]+)[/]([\w\.\-\_]+))' %}
{% else %}
{% set course_id_regex = 'course-v(\d{1}):([\w\.\-\_]+)\+([\w\.\-\_]+)\+([\w\.\-\_]+)' %}
{% endif %}
{% set course_id_regex = "(([\w\.\-\_]+):([\w\.\-\_]+)[+]([\w\.\-\_]+)[+]([\w\.\-\_]+))|(([\w\.\-\_]+)[/]([\w\.\-\_]+)[/]([\w\.\-\_]+))" %}
{% else %} {% set course_id_regex = "course-v(\d{1}):([\w\.\-\_]+)\+([\w\.\-\_]+)\+([\w\.\-\_]+)" %}
{% endif %}

case
when regexp_extract(json_query(context, 'lax $.course_id' omit quotes), '{{ course_id_regex }}') is not null
then json_query(context, 'lax $.course_id' omit quotes)
when regexp_extract(json_query(context, 'lax $.path' omit quotes), '{{ course_id_regex }}') is not null
then regexp_extract(json_query(context, 'lax $.path' omit quotes), '{{ course_id_regex }}')
when regexp_extract(event_type, '{{ course_id_regex }}') is not null
then regexp_extract(event_type, '{{ course_id_regex }}')
when regexp_extract(page, '{{ course_id_regex }}') is not null
then regexp_extract(page, '{{ course_id_regex }}')
end
case
when regexp_extract(json_query(context, 'lax $.course_id' omit quotes), '{{ course_id_regex }}') is not null
then json_query(context, 'lax $.course_id' omit quotes)
when regexp_extract(json_query(context, 'lax $.path' omit quotes), '{{ course_id_regex }}') is not null
then regexp_extract(json_query(context, 'lax $.path' omit quotes), '{{ course_id_regex }}')
when regexp_extract(event_type, '{{ course_id_regex }}') is not null
then regexp_extract(event_type, '{{ course_id_regex }}')
when regexp_extract(page, '{{ course_id_regex }}') is not null
then regexp_extract(page, '{{ course_id_regex }}')
end
{% endmacro %}


{% macro extract_course_readable_id(courserun_readable_id) %}
---Output: course_readable_id in course-v1:{org}+{course number} format
---Input: courserun_readable_id in course-v1:{org}+{course number}+{run tag} for courses created since Fall 2014,
--- {org}/{course number}/{run tag} for courses created before Fall 2014
case
when position('course-v' in {{ courserun_readable_id }} ) > 0
then regexp_extract({{ courserun_readable_id }}, 'course-v(\d{1}):([\w\.\-]+)\+([a-zA-Z0-9.-]+)')
else
concat(
'course-v1:'
, replace(regexp_extract({{ courserun_readable_id }}, '([\w]+)/([a-zA-Z0-9.-]+)'), '/', '+')
)
end
-- -Output: course_readable_id in course-v1:{org}+{course number} format
-- -Input: courserun_readable_id in course-v1:{org}+{course number}+{run tag} for courses created since Fall 2014,
-- - {org}/{course number}/{run tag} for courses created before Fall 2014
case
when position('course-v' in {{ courserun_readable_id }}) > 0
then regexp_extract({{ courserun_readable_id }}, 'course-v(\d{1}):([\w\.\-]+)\+([a-zA-Z0-9.-]+)')
else
concat(
'course-v1:', replace(regexp_extract({{ courserun_readable_id }}, '([\w]+)/([a-zA-Z0-9.-]+)'), '/', '+')
)
end
{% endmacro %}

--- course IDs come in two formats from different sources. This ensures that course IDs are consistently converted in
-- all the downstream models.
{% macro format_course_id(column_name='courserun_readable_id', convert_to_old_format=true) %}
-- - course IDs come in two formats from different sources. This ensures that course IDs are consistently converted in
-- all the downstream models.
{% macro format_course_id(column_name="courserun_readable_id", convert_to_old_format=true) %}
{% if convert_to_old_format %}
-- format as {org}/{course number}/{run}
replace(replace({{ column_name }}, 'course-v1:', ''), '+', '/')
{% else %}
-- format as course-v1:{org}+{course}+{run}
'course-v1:' || replace({{ column_name }}, '/', '+')
{% endif %}
-- format as {org}/{course number}/{run}
replace(replace({{ column_name }}, 'course-v1:', ''), '+', '/')
{% else %}
-- format as course-v1:{org}+{course}+{run}
'course-v1:' || replace({{ column_name }}, '/', '+')
{% endif %}
{% endmacro %}
12 changes: 6 additions & 6 deletions src/ol_dbt/macros/generate_hash_id.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{% macro generate_hash_id(string) %}
-- Be cautious about changing the hash function as it will impact the primary key used by Hightouch
lower(
to_hex(
sha256(
cast({{ string }} as varbinary) --noqa
)
)
lower(
to_hex(
sha256(
cast({{ string }} as varbinary) -- noqa
)
)
)
{% endmacro %}
Loading
Loading