|
11 | 11 | import xarray as xr |
12 | 12 | import xarray_sql as qr |
13 | 13 |
|
| 14 | + |
| 15 | +def local_data(start: str, end: str) -> xr.Dataset: |
| 16 | + import numpy as np |
| 17 | + import pandas as pd |
| 18 | + |
| 19 | + np.random.seed(42) |
| 20 | + |
| 21 | + lat = np.linspace(-90, 90, num=720) |
| 22 | + lon = np.linspace(-180, 180, num=1440) |
| 23 | + time = pd.date_range(start, end, freq='H') |
| 24 | + level = np.array([1000, 500], dtype=np.int32) |
| 25 | + reference_time = pd.Timestamp(start) |
| 26 | + |
| 27 | + temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) |
| 28 | + precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) |
| 29 | + |
| 30 | + return xr.Dataset( |
| 31 | + data_vars=dict( |
| 32 | + sea_surface_temperature=( |
| 33 | + ['lat', 'lon', 'time', 'level'], |
| 34 | + temperature, |
| 35 | + ), |
| 36 | + precipitation=(['lat', 'lon', 'time', 'level'], precipitation), |
| 37 | + ), |
| 38 | + coords=dict( |
| 39 | + lat=lat, |
| 40 | + lon=lon, |
| 41 | + time=time, |
| 42 | + level=level, |
| 43 | + reference_time=reference_time, |
| 44 | + ), |
| 45 | + attrs=dict(description='Random weather.'), |
| 46 | + ) |
| 47 | + |
| 48 | + |
14 | 49 | parser = argparse.ArgumentParser() |
15 | | -parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') |
16 | | -parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') |
17 | | -parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') |
| 50 | +parser.add_argument( |
| 51 | + '--start', type=str, default='2020-01-01', help='start time ISO string' |
| 52 | +) |
| 53 | +parser.add_argument( |
| 54 | + '--end', type=str, default='2020-01-02', help='end time ISO string' |
| 55 | +) |
| 56 | +parser.add_argument( |
| 57 | + '--cluster', |
| 58 | + action='store_true', |
| 59 | + help='deploy on coiled cluster, default: local cluster', |
| 60 | +) |
| 61 | +parser.add_argument( |
| 62 | + '--fake', |
| 63 | + action='store_true', |
| 64 | + help='use local dummy data, default: ARCO-ERA5 data', |
| 65 | +) |
18 | 66 |
|
19 | 67 | args = parser.parse_args() |
20 | 68 |
|
21 | 69 | if args.cluster: |
22 | 70 | from coiled import Cluster |
23 | 71 |
|
24 | 72 | cluster = Cluster( |
25 | | - region='us-central1', |
26 | | - worker_memory='16 GiB', |
27 | | - spot_policy='spot_with_fallback', |
28 | | - arm=True, |
| 73 | + region='us-central1', |
| 74 | + worker_memory='16 GiB', |
| 75 | + spot_policy='spot_with_fallback', |
| 76 | + arm=True, |
29 | 77 | ) |
30 | 78 | client = cluster.get_client() |
31 | 79 | cluster.adapt(minimum=1, maximum=100) |
32 | 80 | else: |
33 | 81 | from dask.distributed import LocalCluster |
| 82 | + |
34 | 83 | cluster = LocalCluster(processes=False) |
35 | 84 | client = cluster.get_client() |
36 | 85 |
|
37 | | -era5_ds = xr.open_zarr( |
38 | | - 'gs://gcp-public-data-arco-era5/ar/' |
39 | | - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', |
40 | | - chunks={'time': 240, 'level': 1} |
41 | | -) |
| 86 | +if args.fake: |
| 87 | + era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) |
| 88 | +else: |
| 89 | + era5_ds = xr.open_zarr( |
| 90 | + 'gs://gcp-public-data-arco-era5/ar/' |
| 91 | + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', |
| 92 | + chunks={'time': 240, 'level': 1}, |
| 93 | + ) |
| 94 | + |
42 | 95 | print('dataset opened.') |
43 | 96 | era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( |
44 | | - time=slice(args.start, args.end), |
45 | | - level=1000, # surface level only. |
| 97 | + time=slice(args.start, args.end), |
| 98 | + level=1000, # surface level only. |
46 | 99 | ) |
47 | 100 |
|
48 | 101 | c = qr.Context() |
49 | 102 | # chunk sizes determined from VM memory limit of 16 GiB. |
50 | 103 | c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) |
51 | 104 |
|
52 | 105 | print('beginning query.') |
53 | | -df = c.sql(""" |
| 106 | +# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. |
| 107 | +df = c.sql( |
| 108 | + """ |
54 | 109 | SELECT |
55 | | - DATE("time") as date, |
| 110 | + "time", |
56 | 111 | AVG("sea_surface_temperature") as daily_avg_sst |
57 | 112 | FROM |
58 | 113 | "era5" |
59 | 114 | GROUP BY |
60 | | - DATE("time") |
61 | | -""") |
| 115 | + "time" |
| 116 | +""" |
| 117 | +) |
62 | 118 |
|
63 | | -df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') |
| 119 | +df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs') |
0 commit comments