-
Notifications
You must be signed in to change notification settings - Fork 6.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(BigQuery): Programmatic retries for a continuous query #13201
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @norlove, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
This pull request introduces programmatic retries for continuous queries in BigQuery. It adds a new Python script (programmatic-retries.py
) that defines a Cloud Function to retry failed BigQuery queries. The function parses Pub/Sub messages containing log entries, extracts the failed SQL query, updates the timestamp in the query, and resubmits the query with a new job ID. Additionally, the pull request updates the requirements.txt
and requirements-test.txt
files to include necessary dependencies like functions-framework
, google-cloud-bigquery
, google-auth
, and pytest
.
Highlights
- New Feature: Implements programmatic retries for BigQuery continuous queries using a Cloud Function triggered by Pub/Sub messages.
- Timestamp Handling: Updates the timestamp in the failed SQL query to ensure the retried query processes data from the point of failure.
- Dependency Management: Adds necessary dependencies to
requirements.txt
andrequirements-test.txt
for the Cloud Function to run correctly.
Changelog
- bigquery/continuous-queries/programmatic-retries.py
- Introduces a new Cloud Function
retry_bigquery_query
to handle retries. - Parses Pub/Sub messages to extract the failed SQL query and job ID.
- Updates the timestamp in the SQL query based on the previous job's end time.
- Resubmits the query with a new job ID using the BigQuery API.
- Includes error handling and logging for debugging.
- Introduces a new Cloud Function
- bigquery/continuous-queries/requirements-test.txt
- Adds
pytest==8.2.0
as a testing dependency.
- Adds
- bigquery/continuous-queries/requirements.txt
- Adds
functions-framework==3.*
,google-cloud-bigquery
, andgoogle-auth
as dependencies for the Cloud Function.
- Adds
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Trivia time!
What is the purpose of UUIDs (Universally Unique Identifiers) as used in the code?
Click here for the answer
UUIDs are used to generate unique job IDs for the retried queries, preventing conflicts with previous job IDs and ensuring each retry is treated as a distinct job.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The code introduces a mechanism to retry failed BigQuery continuous queries by extracting information from log entries and resubmitting the query with an updated timestamp. This is a valuable addition to improve the resilience of continuous queries. The implementation appears well-structured, but there are a few areas that could benefit from refinement.
Summary of Findings
- Error Handling and Logging: The error handling could be improved by providing more context-specific error messages and potentially implementing retry logic with exponential backoff to avoid overwhelming the system.
- Timestamp Handling: The logic for updating timestamps in the SQL query could be made more robust to handle various timestamp formats and edge cases.
- Security Considerations: Storing the service account directly in the code is not recommended. It should be managed more securely, potentially through environment variables or a secrets management system.
Merge Readiness
The code provides a functional solution for retrying failed BigQuery continuous queries. However, before merging, it's recommended to address the error handling, timestamp handling, and security considerations outlined in the review comments. I am unable to directly approve this pull request, and recommend that others review and approve this code before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good start. There are some blocking issues that need to be addressed before we can merge.
# - How to handle limiting retries or using exponential backoff. | ||
|
||
# Make sure you provide your SERVICE_ACCOUNT and CUSTOM_JOB_ID_PREFIX. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: include region tags.
See https://googlecloudplatform.github.io/samples-style-guide/#region-tags
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed via updated code.
# Check if 'protoPayload' exists | ||
if 'protoPayload' in log_entry: | ||
# Extract the SQL query | ||
sql_query = log_entry['protoPayload']['metadata']['jobChange']['job']['jobConfig']['queryConfig']['query'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: break this into multiple lines. This line is pretty long--I recommend putting the log_entry part of the expression between parentheses and moving it onto a subsequent line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: are we sure that this series of keys will always be present in the dict? It seems like there's a danger of a KeyError being raised.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed via updated code.
sql_query = log_entry['protoPayload']['metadata']['jobChange']['job']['jobConfig']['queryConfig']['query'] | ||
|
||
# Record Job ID that failed and will attempt to be restarted | ||
failed_job_id = log_entry['protoPayload']['metadata']['jobChange']['job']['jobName'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous about a potential KeyError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed via updated code.
print(f"Retrying failed job: {failed_job_id}") | ||
|
||
# Extract the endTime from the log entry | ||
end_timestamp = log_entry['protoPayload']['metadata']['jobChange']['job']['jobStats']['endTime'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous about a potential KeyError.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed via updated code.
log_entry = json.loads(base64.b64decode(event['data']).decode('utf-8')) | ||
|
||
# Check if 'protoPayload' exists | ||
if 'protoPayload' in log_entry: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: reduce cyclomatic complexity. It looks like there are a series of nested ifs in this code. So many code paths are hard to read and the harder it is to test.
Consider using the "Return Early Pattern" to reduce the number of branches.
See https://googlecloudplatform.github.io/samples-style-guide/#complexity.
I recommend switching to the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed via updated code.
access_token = credentials.token | ||
|
||
# API endpoint | ||
url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: use the BigQuery client library whenever possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately the continuous queries flag isn't available yet in the BigQuery client. So right now this needs to be an API call. But we can change this in the future once the client has been updated.
Here is the summary of changes. You are about to add 6 region tags.
This comment is generated by snippet-bot.
|
@telpirion I've gone ahead and updated the code. Can you please do another pass? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two last items then this is good-to-go:
- Ensure that you include imports inside the sample region_tags.
- Provide an integration test for this sample. Ping me if you have questions about how to best do this.
# [START functions_bigquery_retry_decode] | ||
# Decode and parse the Pub/Sub message data | ||
log_entry = json.loads(base64.b64decode(event['data']).decode('utf-8')) | ||
# [END functions_bigquery_retry_decode] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: include the import statements in the code sample.
See: https://googlecloudplatform.github.io/samples-style-guide/#imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
# [END functions_bigquery_retry_extract_query] | ||
|
||
# Check if required fields are missing | ||
if not all([sql_query, failed_job_id, end_timestamp]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: nice use of the Return Early Pattern! Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can thank Gemini! LOL
@telpirion I've made the requested changes and added an integration test which I validated works successfully. Back to you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Thank you for your flexibility and responsiveness.
Looks like all that's needed is to run |
Description
Fixes #
Note: Before submitting a pull request, please open an issue for discussion if you are not associated with Google.
Checklist
nox -s py-3.9
(see Test Environment Setup)nox -s lint
(see Test Environment Setup)