Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fix graph visualization to work with latest spanner-graph-notebook code; also, allow visualization when only some columns are json. #102

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
308faf8
Multi-column support
ericfe-google Mar 11, 2025
22c9e8f
Remove 'rows' field in results, as it's not used by the Javascript. A…
ericfe-google Mar 11, 2025
cbffa75
reformat
ericfe-google Mar 11, 2025
43eb06f
Fix test_bigquery.py tests, remove unnecessary mocking of GraphServer
ericfe-google Mar 11, 2025
46dbcc1
reformat
ericfe-google Mar 11, 2025
57e22c0
Get basic graph visualization working against latest spanner code
ericfe-google Mar 11, 2025
36e0f34
Fix unit tests
ericfe-google Mar 11, 2025
469583e
Ignore columns we don't know how to visualize for visualization purpo…
ericfe-google Mar 11, 2025
e5d4ac0
Merge branch 'main' into graph3
ericfe-google Mar 12, 2025
6223652
reformat
ericfe-google Mar 12, 2025
c929e33
Remove unused dependency on networkx
ericfe-google Mar 12, 2025
3cdf1ac
Implement stub callback for node expansion
ericfe-google Mar 12, 2025
ba92a07
Fix test_bigquery_graph_missing_spanner_deps.
ericfe-google Mar 12, 2025
176c854
reformat
ericfe-google Mar 12, 2025
2d899b4
Add unit test for GraphServerHandler::handler_post_node_expansion()
ericfe-google Mar 12, 2025
3b4903f
Add test for invalid node expansion request
ericfe-google Mar 12, 2025
49d1aac
reformat
ericfe-google Mar 12, 2025
9b4a567
Tweaks to improve code coverage
ericfe-google Mar 12, 2025
502149a
More tweaks to improve code coverage
ericfe-google Mar 12, 2025
148a74c
avoid list comprehension due to code coverage tooling
ericfe-google Mar 12, 2025
ce96f22
Fix visualization in colab.
ericfe-google Mar 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions bigquery_magics/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,19 +596,31 @@ def _handle_result(result, args):
return result


def _is_colab() -> bool:
"""Check if code is running in Google Colab"""
try:
import google.colab # noqa: F401
def _colab_query_callback(query: str, params: str):
return IPython.core.display.JSON(
graph_server.convert_graph_data(query_results=json.loads(params))
)

return True
except ImportError:
return False

def _colab_node_expansion_callback(request: dict, params_str: str):
"""Handle node expansion requests in Google Colab environment

Args:
request: A dictionary containing node expansion details including:
- uid: str - Unique identifier of the node to expand
- node_labels: List[str] - Labels of the node
- node_properties: List[Dict] - Properties of the node with key, value, and type
- direction: str - Direction of expansion ("INCOMING" or "OUTGOING")
- edge_label: Optional[str] - Label of edges to filter by
params_str: A JSON string containing connection parameters

def _colab_callback(query: str, params: str):
Returns:
JSON: A JSON-serialized response containing either:
- The query results with nodes and edges
- An error message if the request failed
"""
return IPython.core.display.JSON(
graph_server.convert_graph_data(query_results=json.loads(params))
graph_server.execute_node_expansion(params_str, request)
)


Expand All @@ -628,20 +640,30 @@ def _add_graph_widget(query_result):
# visualizer widget. In colab, we are not able to create an http server on a
# background thread, so we use a special colab-specific api to register a callback,
# to be invoked from Javascript.
if _is_colab():
port = None
try:
from google.colab import output

output.register_callback("graph_visualization.Query", _colab_callback)
else:
output.register_callback("graph_visualization.Query", _colab_query_callback)
output.register_callback(
"graph_visualization.NodeExpansion", _colab_node_expansion_callback
)

# In colab mode, the Javascript doesn't use the port value we pass in, as there is no
# graph server, but it still has to be set to avoid triggering an exception.
# TODO: Clean this up when the Javascript is fixed on the spanner-graph-notebook side.
port = 0
except ImportError:
global singleton_server_thread
alive = singleton_server_thread and singleton_server_thread.is_alive()
if not alive:
singleton_server_thread = graph_server.graph_server.init()
port = graph_server.graph_server.port

# Create html to invoke the graph server
html_content = generate_visualization_html(
query="placeholder query",
port=graph_server.graph_server.port,
port=port,
params=query_result.to_json().replace("\\", "\\\\").replace('"', '\\"'),
)
IPython.display.display(IPython.core.display.HTML(html_content))
Expand All @@ -656,11 +678,13 @@ def _is_valid_json(s: str):


def _supports_graph_widget(query_result: pandas.DataFrame):
num_rows, num_columns = query_result.shape
# Visualization is supported if we have any json items to display.
# (Non-json items are excluded from visualization, but we still want to bring up
# the visualizer for the json items.)
for column in query_result.columns:
if not query_result[column].apply(_is_valid_json).all():
return False
return True
if query_result[column].apply(_is_valid_json).any():
return True
return False


def _make_bq_query(
Expand Down
85 changes: 50 additions & 35 deletions bigquery_magics/graph_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from typing import Dict, List


def execute_node_expansion(params, request):
return {"error": "Node expansion not yet implemented"}


def convert_graph_data(query_results: Dict[str, Dict[str, str]]):
"""
Converts graph data to the form expected by the visualization framework.
Expand Down Expand Up @@ -49,16 +53,12 @@ def convert_graph_data(query_results: Dict[str, Dict[str, str]]):
# does not even get called unless spanner_graphs has already been confirmed
# to exist upstream.
from google.cloud.spanner_v1.types import StructType, Type, TypeCode
import networkx
from spanner_graphs.conversion import (
columns_to_native_numpy,
prepare_data_for_graphing,
)
from spanner_graphs.conversion import get_nodes_edges

try:
fields: List[StructType.Field] = []
data = {}
rows = []
tabular_data = {}
for key, value in query_results.items():
column_name = None
column_value = None
Expand All @@ -73,45 +73,39 @@ def convert_graph_data(query_results: Dict[str, Dict[str, str]]):
StructType.Field(name=column_name, type=Type(code=TypeCode.JSON))
)
data[column_name] = []
tabular_data[column_name] = []
for value_key, value_value in column_value.items():
if not isinstance(value_key, str):
raise ValueError(
f"Expected inner key to be str, got {type(value_key)}"
)
if not isinstance(value_value, str):
raise ValueError(
f"Expected inner value to be str, got {type(value_value)}"
)
row_json = json.loads(value_value)

if row_json is not None:
try:
row_json = json.loads(value_value)
data[column_name].append(row_json)
rows.append([row_json])

d, ignored_columns = columns_to_native_numpy(data, fields)

graph: networkx.classes.DiGraph = prepare_data_for_graphing(
incoming=d, schema_json=None
)

nodes = []
for node_id, node in graph.nodes(data=True):
nodes.append(node)

edges = []
for from_id, to_id, edge in graph.edges(data=True):
edges.append(edge)
tabular_data[column_name].append(row_json)
except (ValueError, TypeError):
# Non-JSON columns cannot be visualized, but we still want them
# in the tabular view.
tabular_data[column_name].append(str(value_value))

nodes, edges = get_nodes_edges(data, fields, schema_json=None)

# Convert nodes and edges to json objects.
# (Unfortunately, the code coverage tooling does not allow this
# to be expressed as list comprehension).
nodes_json = []
for node in nodes:
nodes_json.append(node.to_json())
edges_json = []
for edge in edges:
edges_json.append(edge.to_json())

return {
"response": {
# These fields populate the graph result view.
"nodes": nodes,
"edges": edges,
"nodes": nodes_json,
"edges": edges_json,
# This populates the visualizer's schema view, but not yet implemented on the
# BigQuery side.
"schema": None,
# This field is used to populate the visualizer's tabular view.
"query_result": data,
"query_result": tabular_data,
}
}
except Exception as e:
Expand All @@ -133,6 +127,7 @@ class GraphServer:
endpoints = {
"get_ping": "/get_ping",
"post_ping": "/post_ping",
"post_node_expansion": "/post_node_expansion",
"post_query": "/post_query",
}

Expand Down Expand Up @@ -228,13 +223,33 @@ def handle_post_query(self):
response = convert_graph_data(query_results=json.loads(data["params"]))
self.do_data_response(response)

def handle_post_node_expansion(self):
"""Handle POST requests for node expansion.

Expects a JSON payload with:
- params: A JSON string containing connection parameters (project, instance, database, graph)
- request: A dictionary with node details (uid, node_labels, node_properties, direction, edge_label)
"""
data = self.parse_post_data()

# Execute node expansion with:
# - params_str: JSON string with connection parameters (project, instance, database, graph)
# - request: Dict with node details (uid, node_labels, node_properties, direction, edge_label)
self.do_data_response(
execute_node_expansion(
params=data.get("params"), request=data.get("request")
)
)

def do_GET(self):
assert self.path == GraphServer.endpoints["get_ping"]
self.handle_get_ping()

def do_POST(self):
if self.path == GraphServer.endpoints["post_ping"]:
self.handle_post_ping()
elif self.path == GraphServer.endpoints["post_node_expansion"]:
self.handle_post_node_expansion()
else:
assert self.path == GraphServer.endpoints["post_query"]
self.handle_post_query()
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@
"bigframes": ["bigframes >= 1.17.0"],
"geopandas": ["geopandas >= 1.0.1"],
"spanner-graph-notebook": [
"spanner-graph-notebook >= 1.1.1, <=1.1.1",
"networkx",
"spanner-graph-notebook >= 1.1.3",
"portpicker",
],
}
Expand Down
37 changes: 34 additions & 3 deletions tests/unit/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,8 @@ def test_bigquery_graph_colab(monkeypatch):
graph_visualization is None or bigquery_storage is None,
reason="Requires `spanner-graph-notebook` and `google-cloud-bigquery-storage`",
)
def test_colab_callback():
result = bigquery_magics.bigquery._colab_callback(
def test_colab_query_callback():
result = bigquery_magics.bigquery._colab_query_callback(
"query", json.dumps({"result": {}})
)
assert result.data == {
Expand All @@ -905,6 +905,26 @@ def test_colab_callback():
}


@pytest.mark.usefixtures("ipython_interactive")
@pytest.mark.skipif(
graph_visualization is None or bigquery_storage is None,
reason="Requires `spanner-graph-notebook` and `google-cloud-bigquery-storage`",
)
def test_colab_node_expansion_callback():
result = bigquery_magics.bigquery._colab_node_expansion_callback(
request={
"uid": "test_uid",
"node_labels": ["label1, label2"],
"node_properites": {},
"direction": "INCOMING",
"edge_label": None,
},
params_str="{}",
)

assert result.data == {"error": "Node expansion not yet implemented"}


@pytest.mark.usefixtures("ipython_interactive")
@pytest.mark.skipif(
graph_visualization is not None or bigquery_storage is None,
Expand Down Expand Up @@ -932,7 +952,18 @@ def test_bigquery_graph_missing_spanner_deps(monkeypatch):
"google.cloud.bigquery_storage.BigQueryReadClient", bqstorage_mock
)
sql = "SELECT graph_json FROM t"
result = pandas.DataFrame([], columns=["graph_json"])
graph_json_rows = [
"""
[{"identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQI=","kind":"node","labels":["Person"],"properties":{"birthday":"1991-12-21T08:00:00Z","city":"Adelaide","country":"Australia","id":1,"name":"Alex"}},{"destination_node_identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEO","identifier":"mUZpbkdyYXBoLlBlcnNvbk93bkFjY291bnQAeJECkQ6ZRmluR3JhcGguUGVyc29uAHiRAplGaW5HcmFwaC5BY2NvdW50AHiRDg==","kind":"edge","labels":["Owns"],"properties":{"account_id":7,"create_time":"2020-01-10T14:22:20.222Z","id":1},"source_node_identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQI="},{"identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEO","kind":"node","labels":["Account"],"properties":{"create_time":"2020-01-10T14:22:20.222Z","id":7,"is_blocked":false,"nick_name":"Vacation Fund"}}]
""",
"""
[{"identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQY=","kind":"node","labels":["Person"],"properties":{"birthday":"1986-12-07T08:00:00Z","city":"Kollam","country":"India","id":3,"name":"Lee"}},{"destination_node_identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEg","identifier":"mUZpbkdyYXBoLlBlcnNvbk93bkFjY291bnQAeJEGkSCZRmluR3JhcGguUGVyc29uAHiRBplGaW5HcmFwaC5BY2NvdW50AHiRIA==","kind":"edge","labels":["Owns"],"properties":{"account_id":16,"create_time":"2020-02-18T13:44:20.655Z","id":3},"source_node_identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQY="},{"identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEg","kind":"node","labels":["Account"],"properties":{"create_time":"2020-01-28T01:55:09.206Z","id":16,"is_blocked":true,"nick_name":"Vacation Fund"}}]
""",
"""
[{"identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQQ=","kind":"node","labels":["Person"],"properties":{"birthday":"1980-10-31T08:00:00Z","city":"Moravia","country":"Czech_Republic","id":2,"name":"Dana"}},{"destination_node_identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEo","identifier":"mUZpbkdyYXBoLlBlcnNvbk93bkFjY291bnQAeJEEkSiZRmluR3JhcGguUGVyc29uAHiRBJlGaW5HcmFwaC5BY2NvdW50AHiRKA==","kind":"edge","labels":["Owns"],"properties":{"account_id":20,"create_time":"2020-01-28T01:55:09.206Z","id":2},"source_node_identifier":"mUZpbkdyYXBoLlBlcnNvbgB4kQQ="},{"identifier":"mUZpbkdyYXBoLkFjY291bnQAeJEo","kind":"node","labels":["Account"],"properties":{"create_time":"2020-02-18T13:44:20.655Z","id":20,"is_blocked":false,"nick_name":"Rainy Day Fund"}}]
""",
]
result = pandas.DataFrame(graph_json_rows, columns=["graph_json"])
run_query_patch = mock.patch("bigquery_magics.bigquery._run_query", autospec=True)
display_patch = mock.patch("IPython.display.display", autospec=True)
query_job_mock = mock.create_autospec(
Expand Down
Loading