-
Notifications
You must be signed in to change notification settings - Fork 312
/
Copy path_job_helpers.py
572 lines (489 loc) · 20.7 KB
/
_job_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for interacting with the job REST APIs from the client.
For queries, there are three cases to consider:
1. jobs.insert: This always returns a job resource.
2. jobs.query, jobCreationMode=JOB_CREATION_REQUIRED:
This sometimes can return the results inline, but always includes a job ID.
3. jobs.query, jobCreationMode=JOB_CREATION_OPTIONAL:
This sometimes doesn't create a job at all, instead returning the results.
For better debugging, an auto-generated query ID is included in the
response.
Client.query() calls either (1) or (2), depending on what the user provides
for the api_method parameter. query() always returns a QueryJob object, which
can retry the query when the query job fails for a retriable reason.
Client.query_and_wait() calls (3). This returns a RowIterator that may wrap
local results from the response or may wrap a query job containing multiple
pages of results. Even though query_and_wait() waits for the job to complete,
we still need a separate job_retry object because there are different
predicates where it is safe to generate a new query ID.
"""
import copy
import functools
import os
import uuid
from typing import Any, Dict, TYPE_CHECKING, Optional
import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries
from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table
# Avoid circular imports
if TYPE_CHECKING: # pragma: NO COVER
from google.cloud.bigquery.client import Client
# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to
# happen before the client-side timeout. This is not strictly necessary, as the
# client retries client-side timeouts, but the hope by making the server-side
# timeout slightly shorter is that it can save the server from some unncessary
# processing time.
#
# 250 milliseconds is chosen arbitrarily, though should be about the right
# order of magnitude for network latency and switching delays. It is about the
# amount of time for light to circumnavigate the world twice.
_TIMEOUT_BUFFER_MILLIS = 250
def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str:
"""Construct an ID for a new job.
Args:
job_id: the user-provided job ID.
prefix: the user-provided prefix for a job ID.
Returns:
str: A job ID
"""
if job_id is not None:
return job_id
elif prefix is not None:
return str(prefix) + str(uuid.uuid4())
else:
return str(uuid.uuid4())
def job_config_with_defaults(
job_config: Optional[job.QueryJobConfig],
default_job_config: Optional[job.QueryJobConfig],
) -> Optional[job.QueryJobConfig]:
"""Create a copy of `job_config`, replacing unset values with those from
`default_job_config`.
"""
if job_config is None:
return default_job_config
if default_job_config is None:
return job_config
# Both job_config and default_job_config are not None, so make a copy of
# job_config merged with default_job_config. Anything already explicitly
# set on job_config should not be replaced.
return job_config._fill_from_default(default_job_config)
def query_jobs_insert(
client: "Client",
query: str,
job_config: Optional[job.QueryJobConfig],
job_id: Optional[str],
job_id_prefix: Optional[str],
location: Optional[str],
project: str,
retry: Optional[retries.Retry],
timeout: Optional[float],
job_retry: Optional[retries.Retry],
) -> job.QueryJob:
"""Initiate a query using jobs.insert.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
"""
job_id_given = job_id is not None
job_id_save = job_id
job_config_save = job_config
def do_query():
# Make a copy now, so that original doesn't get changed by the process
# below and to facilitate retry
job_config = copy.deepcopy(job_config_save)
job_id = make_job_id(job_id_save, job_id_prefix)
job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config)
try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc
try:
query_job = client.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise
else:
return query_job
else:
return query_job
future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
if not job_id_given:
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry
return future
def _validate_job_config(request_body: Dict[str, Any], invalid_key: str):
"""Catch common mistakes, such as passing in a *JobConfig object of the
wrong type.
"""
if invalid_key in request_body:
raise ValueError(f"got unexpected key {repr(invalid_key)} in job_config")
def _to_query_request(
job_config: Optional[job.QueryJobConfig] = None,
*,
query: str,
location: Optional[str] = None,
timeout: Optional[float] = None,
) -> Dict[str, Any]:
"""Transform from Job resource to QueryRequest resource.
Most of the keys in job.configuration.query are in common with
QueryRequest. If any configuration property is set that is not available in
jobs.query, it will result in a server-side error.
"""
request_body = copy.copy(job_config.to_api_repr()) if job_config else {}
_validate_job_config(request_body, job.CopyJob._JOB_TYPE)
_validate_job_config(request_body, job.ExtractJob._JOB_TYPE)
_validate_job_config(request_body, job.LoadJob._JOB_TYPE)
# Move query.* properties to top-level.
query_config_resource = request_body.pop("query", {})
request_body.update(query_config_resource)
# Default to standard SQL.
request_body.setdefault("useLegacySql", False)
# Since jobs.query can return results, ensure we use the lossless timestamp
# format. See: https://github.com/googleapis/python-bigquery/issues/395
request_body.setdefault("formatOptions", {})
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore
if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
if location is not None:
request_body["location"] = location
request_body["query"] = query
return request_body
def _to_query_job(
client: "Client",
query: str,
request_config: Optional[job.QueryJobConfig],
query_response: Dict[str, Any],
) -> job.QueryJob:
job_ref_resource = query_response["jobReference"]
job_ref = job._JobReference._from_api_repr(job_ref_resource)
query_job = job.QueryJob(job_ref, query, client=client)
query_job._properties.setdefault("configuration", {})
# Not all relevant properties are in the jobs.query response. Populate some
# expected properties based on the job configuration.
if request_config is not None:
query_job._properties["configuration"].update(request_config.to_api_repr())
query_job._properties["configuration"].setdefault("query", {})
query_job._properties["configuration"]["query"]["query"] = query
query_job._properties["configuration"]["query"].setdefault("useLegacySql", False)
query_job._properties.setdefault("statistics", {})
query_job._properties["statistics"].setdefault("query", {})
query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get(
"cacheHit"
)
query_job._properties["statistics"]["query"]["schema"] = query_response.get(
"schema"
)
query_job._properties["statistics"]["query"][
"totalBytesProcessed"
] = query_response.get("totalBytesProcessed")
# Set errors if any were encountered.
query_job._properties.setdefault("status", {})
if "errors" in query_response:
# Set errors but not errorResult. If there was an error that failed
# the job, jobs.query behaves like jobs.getQueryResults and returns a
# non-success HTTP status code.
errors = query_response["errors"]
query_job._properties["status"]["errors"] = errors
# Avoid an extra call to `getQueryResults` if the query has finished.
job_complete = query_response.get("jobComplete")
if job_complete:
query_job._query_results = google.cloud.bigquery.query._QueryResults(
query_response
)
# We want job.result() to refresh the job state, so the conversion is
# always "PENDING", even if the job is finished.
query_job._properties["status"]["state"] = "PENDING"
return query_job
def _to_query_path(project: str) -> str:
return f"/projects/{project}/queries"
def query_jobs_query(
client: "Client",
query: str,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
project: str,
retry: retries.Retry,
timeout: Optional[float],
job_retry: retries.Retry,
) -> job.QueryJob:
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=timeout
)
def do_query():
request_body["requestId"] = make_job_id()
span_attributes = {"path": path}
api_response = client._call_api(
retry,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=timeout,
)
return _to_query_job(client, query, job_config, api_response)
future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry
return future
def query_and_wait(
client: "Client",
query: str,
*,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
retry: Optional[retries.Retry],
job_retry: Optional[retries.Retry],
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> table.RowIterator:
"""Run the query, wait for it to finish, and return the results.
While ``jobCreationMode=JOB_CREATION_OPTIONAL`` is in preview in the
``jobs.query`` REST API, use the default ``jobCreationMode`` unless
the environment variable ``QUERY_PREVIEW_ENABLED=true``. After
``jobCreationMode`` is GA, this method will always use
``jobCreationMode=JOB_CREATION_OPTIONAL``. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
Args:
client:
BigQuery client to make API calls.
query (str):
SQL query to be executed. Defaults to the standard SQL
dialect. Use the ``job_config`` parameter to change dialects.
job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
Extra configuration options for the job.
To override any options that were previously set in
the ``default_query_job_config`` given to the
``Client`` constructor, manually set those options to ``None``,
or whatever value is preferred.
location (Optional[str]):
Location where to run the job. Must match the location of the
table used in the query as well as the destination table.
project (Optional[str]):
Project ID of the project of where to run the job. Defaults
to the client's project.
api_timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
wait_timeout (Optional[float]):
The number of seconds to wait for the query to finish. If the
query doesn't finish before this timeout, the client attempts
to cancel the query.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
a reasonable default that should only be overridden
with care.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry. Not all jobs can be retried.
page_size (Optional[int]):
The maximum number of rows in each page of results from this
request. Non-positive values are ignored.
max_results (Optional[int]):
The maximum total number of rows from this request.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
set, which counts the total number of rows **in the result
set** (this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
If the query is a special query that produces no results, e.g.
a DDL query, an ``_EmptyRowIterator`` instance is returned.
Raises:
TypeError:
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
"""
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)
# Some API parameters aren't supported by the jobs.query API. In these
# cases, fallback to a jobs.insert call.
if not _supported_by_jobs_query(request_body):
return _wait_or_cancel(
query_jobs_insert(
client=client,
query=query,
job_id=None,
job_id_prefix=None,
job_config=job_config,
location=location,
project=project,
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)
path = _to_query_path(project)
if page_size is not None and max_results is not None:
request_body["maxResults"] = min(page_size, max_results)
elif page_size is not None or max_results is not None:
request_body["maxResults"] = page_size or max_results
if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"
def do_query():
request_body["requestId"] = make_job_id()
span_attributes = {"path": path}
# For easier testing, handle the retries ourselves.
if retry is not None:
response = retry(client._call_api)(
retry=None, # We're calling the retry decorator ourselves.
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)
else:
response = client._call_api(
retry=None,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)
# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
# to fetch, there will be a job ID for jobs.getQueryResults.
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
response
)
page_token = query_results.page_token
more_pages = page_token is not None
if more_pages or not query_results.complete:
# TODO(swast): Avoid a call to jobs.get in some cases (few
# remaining pages) by waiting for the query to finish and calling
# client._list_rows_from_query_results directly. Need to update
# RowIterator to fetch destination table via the job ID if needed.
return _wait_or_cancel(
_to_query_job(client, query, job_config, response),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)
return table.RowIterator(
client=client,
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
path=None,
schema=query_results.schema,
max_results=max_results,
page_size=page_size,
total_rows=query_results.total_rows,
first_page_response=response,
location=query_results.location,
job_id=query_results.job_id,
query_id=query_results.query_id,
project=query_results.project,
num_dml_affected_rows=query_results.num_dml_affected_rows,
)
if job_retry is not None:
return job_retry(do_query)()
else:
return do_query()
def _supported_by_jobs_query(request_body: Dict[str, Any]) -> bool:
"""True if jobs.query can be used. False if jobs.insert is needed."""
request_keys = frozenset(request_body.keys())
# Per issue: https://github.com/googleapis/python-bigquery/issues/1867
# use an allowlist here instead of a denylist because the backend API allows
# unsupported parameters without any warning or failure. Instead, keep this
# set in sync with those in QueryRequest:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest
keys_allowlist = {
"kind",
"query",
"maxResults",
"defaultDataset",
"timeoutMs",
"dryRun",
"preserveNulls",
"useQueryCache",
"useLegacySql",
"parameterMode",
"queryParameters",
"location",
"formatOptions",
"connectionProperties",
"labels",
"maximumBytesBilled",
"requestId",
"createSession",
}
unsupported_keys = request_keys - keys_allowlist
return len(unsupported_keys) == 0
def _wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
retry: Optional[retries.Retry],
page_size: Optional[int],
max_results: Optional[int],
) -> table.RowIterator:
"""Wait for a job to complete and return the results.
If we can't return the results within the ``wait_timeout``, try to cancel
the job.
"""
try:
return job.result(
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
)
except Exception:
# Attempt to cancel the job since we can't return the results.
try:
job.cancel(retry=retry, timeout=api_timeout)
except Exception:
# Don't eat the original exception if cancel fails.
pass
raise