Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import collections
import logging
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Set
Expand Down Expand Up @@ -52,11 +53,13 @@ class LineageEdge:
immutability and hashability.

Attributes:
platform: The upstream entry's platform (bigquery, gcs, etc.)
entry_id: The upstream entry ID in Dataplex format
audit_stamp: When this lineage was observed
lineage_type: Type of lineage (TRANSFORMED, COPY, etc.)
"""

platform: str
entry_id: str
audit_stamp: datetime
lineage_type: str = DatasetLineageTypeClass.TRANSFORMED
Expand Down Expand Up @@ -340,11 +343,13 @@ def build_lineage_map(

# Convert upstream FQNs to LineageEdge objects
for upstream_fqn in lineage_data.get("upstream", []):
# Extract dataset ID from FQN (full path like project.dataset.table)
upstream_dataset_id = self._extract_entry_id_from_fqn(upstream_fqn)
# Extract platform and dataset ID from FQN
result = self._extract_platform_and_entry_id_from_fqn(upstream_fqn)

if upstream_dataset_id:
if result is not None:
upstream_platform, upstream_dataset_id = result
edge = LineageEdge(
platform=upstream_platform or entry.source_platform,
entry_id=upstream_dataset_id,
audit_stamp=datetime.now(timezone.utc),
lineage_type=DatasetLineageTypeClass.TRANSFORMED,
Expand All @@ -366,20 +371,21 @@ def build_lineage_map(

return lineage_by_full_dataset_id

def _extract_entry_id_from_fqn(self, fqn: str) -> Optional[str]:
def _extract_platform_and_entry_id_from_fqn(self, fqn: str) -> Optional[tuple]:
"""
Extract entry ID from a fully qualified name.
Extract platform and entry ID from a fully qualified name.

Handles platform-specific FQN formats:
- BigQuery: bigquery:{project}.{dataset}.{table} -> {project}.{dataset}.{table}
- GCS: gcs:{bucket}/{path} -> {bucket}/{path}
- GCS: gcs:{bucket} -> {bucket}
- BigQuery: bigquery:{project}.{dataset}.{table} -> ("bigquery", "{project}.{dataset}.{table}")
- GCS: gcs:{bucket}/{path} -> ("gcs", "{bucket}/{path}")
- GCS: gcs:{bucket} -> ("gcs", "{bucket}")

Args:
fqn: Fully qualified name in format "{platform}:{identifier}"

Returns:
Entry ID (everything after the platform prefix) or None if extraction fails
Tuple of (platform, entry_id) or None if extraction fails.
Platform may be None if the FQN has no platform prefix.
"""
try:
if ":" in fqn:
Expand All @@ -389,15 +395,51 @@ def _extract_entry_id_from_fqn(self, fqn: str) -> Optional[str]:
if platform not in ["bigquery", "gcs", "dataplex"]:
logger.warning(f"Unexpected platform '{platform}' in FQN: {fqn}")

return entry_part
# Normalize GCS entry IDs to match DataHub GCS source URN format
if platform == "gcs":
entry_part = self._normalize_gcs_entry_id(entry_part)

return (platform, entry_part)
else:
# No platform prefix, return as-is (shouldn't happen in practice)
logger.warning(f"FQN missing platform prefix: {fqn}")
return fqn
return (None, fqn)
except Exception as e:
logger.error(f"Failed to extract entry ID from FQN '{fqn}': {e}")
return None

def _extract_entry_id_from_fqn(self, fqn: str) -> Optional[str]:
"""Extract entry ID from a fully qualified name (backward-compatible wrapper)."""
result = self._extract_platform_and_entry_id_from_fqn(fqn)
if result is None:
return None
return result[1]

def _normalize_gcs_entry_id(self, entry_id: str) -> str:
"""Normalize a GCS entry ID from Data Lineage API format to DataHub URN format.

The GCP Data Lineage API uses: bucket.`path/to/files/*.csv`
DataHub GCS source uses: bucket/path/to/files

Transformations:
1. Strip backticks
2. Replace first dot (bucket.path separator) with slash
3. Remove trailing glob patterns (/*.csv, /*.parquet, etc.)
"""
# Strip backticks
normalized = entry_id.replace("`", "")
# Replace first dot with slash (bucket.path -> bucket/path)
# The Data Lineage API uses dot as bucket/path separator.
# Only replace if the dot comes before the first slash (or there is no slash),
# to avoid double-normalizing already-slash-separated paths.
dot_pos = normalized.find(".")
slash_pos = normalized.find("/")
if dot_pos != -1 and (slash_pos == -1 or dot_pos < slash_pos):
normalized = normalized[:dot_pos] + "/" + normalized[dot_pos + 1 :]
# Remove trailing glob pattern (e.g. /*.csv, /*.parquet, /*)
normalized = re.sub(r"/\*(\.\w+)?$", "", normalized)
return normalized

def get_lineage_for_table(
self, dataset_id: str, dataset_urn: str, platform: str
) -> Optional[UpstreamLineageClass]:
Expand All @@ -418,9 +460,9 @@ def get_lineage_for_table(
upstream_list: list[UpstreamClass] = []

for lineage_edge in self.lineage_by_full_dataset_id[dataset_id]:
# Generate URN for the upstream entry using the full dataset_id
# Generate URN for the upstream entry using its own platform
upstream_urn = builder.make_dataset_urn_with_platform_instance(
platform=platform,
platform=lineage_edge.platform,
name=lineage_edge.entry_id,
platform_instance=None,
env=self.config.env,
Expand Down
Loading
Loading