From 564c2848e6502032d15a1d71c943cc16123198fd Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 20:55:21 +0530 Subject: [PATCH 01/17] Simplest version of the SST Demo. --- demo/sst | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 demo/sst diff --git a/demo/sst b/demo/sst new file mode 100644 index 0000000..6e38b6e --- /dev/null +++ b/demo/sst @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to access the ERA5 dataset: +``` +gcloud auth application-default login +``` +""" +import xarray as xr +import xarray_sql as qr + +# TODO(alxmrs): Add coiled or dask cluster code. + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('dataset opened.') +# TODO(alxmrs): Slice to small time range based on script args. +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + level=1000, # surface level only. +) + +# chunk sizes determined from VM memory limit of 16 GB. +c = qr.Context() +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +df = c.sql(""" +SELECT + DATE("time") as date, + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + DATE("time") +""") + +# TODO(alxmrs): time slice should be in file name. +df.to_csv('global_avg_sst_*.cvs') \ No newline at end of file From 22540afabb5a303d4a17021fc9b2f6bb263d7de8 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 21:17:09 +0530 Subject: [PATCH 02/17] Added coiled cluster config. --- demo/sst | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/demo/sst b/demo/sst index 6e38b6e..178bfa4 100644 --- a/demo/sst +++ b/demo/sst @@ -9,7 +9,16 @@ gcloud auth application-default login import xarray as xr import xarray_sql as qr -# TODO(alxmrs): Add coiled or dask cluster code. +from coiled import Cluster + +cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, +) +client = cluster.get_client() +cluster.adapt(minimum=1, maximum=100) era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' @@ -22,7 +31,7 @@ era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( level=1000, # surface level only. ) -# chunk sizes determined from VM memory limit of 16 GB. +# chunk sizes determined from VM memory limit of 16 GiB. c = qr.Context() c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) From f2f567bf816d9824e46e69794810c0dc5391c52f Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 22:50:39 +0530 Subject: [PATCH 03/17] Added script arguments. --- demo/sst | 50 --------------------------------------- demo/sst.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 3 +++ 3 files changed, 66 insertions(+), 50 deletions(-) delete mode 100644 demo/sst create mode 100755 demo/sst.py diff --git a/demo/sst b/demo/sst deleted file mode 100644 index 178bfa4..0000000 --- a/demo/sst +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -"""Demo of calculating global average sea surface temperature (SST) with SQL. - -Please run the following to access the ERA5 dataset: -``` -gcloud auth application-default login -``` -""" -import xarray as xr -import xarray_sql as qr - -from coiled import Cluster - -cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, -) -client = cluster.get_client() -cluster.adapt(minimum=1, maximum=100) - -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) -print('dataset opened.') -# TODO(alxmrs): Slice to small time range based on script args. -era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - level=1000, # surface level only. -) - -# chunk sizes determined from VM memory limit of 16 GiB. -c = qr.Context() -c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) - -print('beginning query.') -df = c.sql(""" -SELECT - DATE("time") as date, - AVG("sea_surface_temperature") as daily_avg_sst -FROM - "era5" -GROUP BY - DATE("time") -""") - -# TODO(alxmrs): time slice should be in file name. -df.to_csv('global_avg_sst_*.cvs') \ No newline at end of file diff --git a/demo/sst.py b/demo/sst.py new file mode 100755 index 0000000..1482ec5 --- /dev/null +++ b/demo/sst.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to set up cloud resources: +``` +gcloud auth application-default login +coiled setup +``` +""" +import argparse +import xarray as xr +import xarray_sql as qr + +parser = argparse.ArgumentParser() +parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') +parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') +parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') + +args = parser.parse_args() + +if args.cluster: + from coiled import Cluster + + cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, + ) + client = cluster.get_client() + cluster.adapt(minimum=1, maximum=100) +else: + from dask.distributed import LocalCluster + cluster = LocalCluster(processes=False) + client = cluster.get_client() + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('dataset opened.') +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + time=slice(args.start, args.end), + level=1000, # surface level only. +) + +c = qr.Context() +# chunk sizes determined from VM memory limit of 16 GiB. +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +df = c.sql(""" +SELECT + DATE("time") as date, + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + DATE("time") +""") + +df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') diff --git a/pyproject.toml b/pyproject.toml index d6b1c04..57153c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,9 @@ test = [ "xarray[io]", "gcsfs", ] +demo = [ + "coiled" +] [project.urls] Homepage = "https://github.com/alxmrs/xarray-sql" From 4bf1d9c9c67b05e68613b3a29dfb1cb4b6efc397 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 24 Mar 2024 16:45:12 +0530 Subject: [PATCH 04/17] SST demo works with local fake data. --- demo/sst.py | 94 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 19 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 1482ec5..6b83e75 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -11,10 +11,58 @@ import xarray as xr import xarray_sql as qr + +def local_data(start: str, end: str) -> xr.Dataset: + import numpy as np + import pandas as pd + + np.random.seed(42) + + lat = np.linspace(-90, 90, num=720) + lon = np.linspace(-180, 180, num=1440) + time = pd.date_range(start, end, freq='H') + level = np.array([1000, 500], dtype=np.int32) + reference_time = pd.Timestamp(start) + + temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) + precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) + + return xr.Dataset( + data_vars=dict( + sea_surface_temperature=( + ['lat', 'lon', 'time', 'level'], + temperature, + ), + precipitation=(['lat', 'lon', 'time', 'level'], precipitation), + ), + coords=dict( + lat=lat, + lon=lon, + time=time, + level=level, + reference_time=reference_time, + ), + attrs=dict(description='Random weather.'), + ) + + parser = argparse.ArgumentParser() -parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') -parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') -parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') +parser.add_argument( + '--start', type=str, default='2020-01-01', help='start time ISO string' +) +parser.add_argument( + '--end', type=str, default='2020-01-02', help='end time ISO string' +) +parser.add_argument( + '--cluster', + action='store_true', + help='deploy on coiled cluster, default: local cluster', +) +parser.add_argument( + '--fake', + action='store_true', + help='use local dummy data, default: ARCO-ERA5 data', +) args = parser.parse_args() @@ -22,27 +70,32 @@ from coiled import Cluster cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, ) client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) else: from dask.distributed import LocalCluster + cluster = LocalCluster(processes=False) client = cluster.get_client() -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) +if args.fake: + era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) +else: + era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1}, + ) + print('dataset opened.') era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=slice(args.start, args.end), - level=1000, # surface level only. + time=slice(args.start, args.end), + level=1000, # surface level only. ) c = qr.Context() @@ -50,14 +103,17 @@ c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) print('beginning query.') -df = c.sql(""" +# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. +df = c.sql( + """ SELECT - DATE("time") as date, + "time", AVG("sea_surface_temperature") as daily_avg_sst FROM "era5" GROUP BY - DATE("time") -""") + "time" +""" +) -df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') +df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs') From 29706c93c2cd57c0ecf92b786dfd504cd4fa0e86 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 28 Mar 2024 11:45:17 +0530 Subject: [PATCH 05/17] Renamed a method; added memory-optimized cluster option. --- demo/sst.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 6b83e75..0fe1274 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -12,7 +12,7 @@ import xarray_sql as qr -def local_data(start: str, end: str) -> xr.Dataset: +def rand_wx(start: str, end: str) -> xr.Dataset: import numpy as np import pandas as pd @@ -58,6 +58,11 @@ def local_data(start: str, end: str) -> xr.Dataset: action='store_true', help='deploy on coiled cluster, default: local cluster', ) +parser.add_argument( + '--memory-opt-cluster', + action='store_true', + help='deploy on memory-optimized coiled cluster, default: local cluster', +) parser.add_argument( '--fake', action='store_true', @@ -75,8 +80,21 @@ def local_data(start: str, end: str) -> xr.Dataset: spot_policy='spot_with_fallback', arm=True, ) + client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) +elif args.mem_opt_cluster: + from coiled import Cluster + + cluster = Cluster( + region='us-central1', + spot_policy='spot_with_fallback', + worker_vm_types=['m3-ultramem-32'], + arm=True, + ) + + client = cluster.get_client() + cluster.adapt(minimum=1, maximum=50) else: from dask.distributed import LocalCluster @@ -84,7 +102,7 @@ def local_data(start: str, end: str) -> xr.Dataset: client = cluster.get_client() if args.fake: - era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) + era5_ds = rand_wx(args.start, args.end).chunk({'time': 240, 'level': 1}) else: era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' From d7f5368acd7f03c1a11125e89b811f8b14a2be3d Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 28 Mar 2024 14:08:16 +0530 Subject: [PATCH 06/17] Details focused updates. - Using the v3 ARCO-ERA5 dataset that has the full range of data. - Looking up VM instance types to see what's appropriate - Choosing chunks based on resource and dataset size math. - Writing output to parquet when running on a cluster. --- demo/sst.py | 79 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 0fe1274..78b8b6a 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -8,21 +8,20 @@ ``` """ import argparse + +import numpy as np import xarray as xr import xarray_sql as qr -def rand_wx(start: str, end: str) -> xr.Dataset: - import numpy as np - import pandas as pd - +def rand_wx(start_time: str, end_time: str) -> xr.Dataset: + """Produce a random ARCO-ERA5-like weather dataset.""" np.random.seed(42) lat = np.linspace(-90, 90, num=720) lon = np.linspace(-180, 180, num=1440) - time = pd.date_range(start, end, freq='H') + time = xr.date_range(start_time, end_time, freq='H') level = np.array([1000, 500], dtype=np.int32) - reference_time = pd.Timestamp(start) temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) @@ -40,18 +39,22 @@ def rand_wx(start: str, end: str) -> xr.Dataset: lon=lon, time=time, level=level, - reference_time=reference_time, ), attrs=dict(description='Random weather.'), ) +def tfmt(time: np.datetime64, unit='h') -> str: + """Returns a bucket-friendly date string from a numpy datetime.""" + return np.datetime_as_string(time, unit=unit).replace(':', '') + + parser = argparse.ArgumentParser() parser.add_argument( - '--start', type=str, default='2020-01-01', help='start time ISO string' + '--start', type=str, default='1940-01-01', help='start time ISO string' ) parser.add_argument( - '--end', type=str, default='2020-01-02', help='end time ISO string' + '--end', type=str, default='1940-01-02', help='end time ISO string' ) parser.add_argument( '--cluster', @@ -76,25 +79,23 @@ def rand_wx(start: str, end: str) -> xr.Dataset: cluster = Cluster( region='us-central1', - worker_memory='16 GiB', spot_policy='spot_with_fallback', - arm=True, + worker_mv_types=['t2a-standard-16'], # 4 GiBs RAM per CPU, ARM. ) client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) elif args.mem_opt_cluster: - from coiled import Cluster + from coiled import Cluster - cluster = Cluster( - region='us-central1', - spot_policy='spot_with_fallback', - worker_vm_types=['m3-ultramem-32'], - arm=True, - ) + cluster = Cluster( + region='us-central1', + spot_policy='spot_with_fallback', + worker_vm_types=['m3-ultramem-32'], # 30.5 GiBs RAM per CPU, x86. + ) - client = cluster.get_client() - cluster.adapt(minimum=1, maximum=50) + client = cluster.get_client() + cluster.adapt(minimum=1, maximum=25) else: from dask.distributed import LocalCluster @@ -105,20 +106,37 @@ def rand_wx(start: str, end: str) -> xr.Dataset: era5_ds = rand_wx(args.start, args.end).chunk({'time': 240, 'level': 1}) else: era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3/', chunks={'time': 240, 'level': 1}, ) + assert np.datetime64(args.start) >= np.datetime64( + '1940-01-01' + ), 'ARCO-ERA5 does not go back before 1940-01-01!' + + assert ( + np.datetime64(args.end) <= era5_ds.time[-1].values + ), f'ARCO-ERA5 does not run until {args.end}!' + print('dataset opened.') -era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + +era5_sst_ds = era5_ds.sel( time=slice(args.start, args.end), level=1000, # surface level only. -) +).sea_surface_temperature + +print(f'sst_size={era5_sst_ds.nbytes / 2**40}TiBs') c = qr.Context() -# chunk sizes determined from VM memory limit of 16 GiB. -c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) +# `time=48` produces 190 MiB chunks +# `time=96` produces 380 MiB chunks +# `time=192` produces 760 MiB chunks +# `time=240` produces 950 MiB chunks +# `time=720` produces 2851 MiB chunks --> utilizes 30 GiBs memory per CPU. +time_chunks = 96 # four day chunks. +if args.mem_opt_cluster: + time_chunks = 720 # one month chunks. +c.create_table('era5', era5_sst_ds, chunks=dict(time=time_chunks)) print('beginning query.') # TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. @@ -134,4 +152,11 @@ def rand_wx(start: str, end: str) -> xr.Dataset: """ ) -df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs') +# Store the results for visualization later on. +start, end = tfmt(era5_sst_ds.time[0].values), tfmt(era5_sst_ds.time[-1].values) +now = tfmt(np.datetime64('now'), 's') +results_name = f'global_avg_sst_{start}_to_{end}.{now}' +if args.cluster or args.mem_opt_cluster: + df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/') +else: + df.to_csv(results_name + '_*.csv') From 9ff6c17787fda4552e2d790b54fa36c2dc3f3f23 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 30 Mar 2024 13:21:21 +0530 Subject: [PATCH 07/17] Fixed issues found with fake data. --- demo/sst.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 78b8b6a..99f5134 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -85,7 +85,7 @@ def tfmt(time: np.datetime64, unit='h') -> str: client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) -elif args.mem_opt_cluster: +elif args.memory_opt_cluster: from coiled import Cluster cluster = Cluster( @@ -120,10 +120,10 @@ def tfmt(time: np.datetime64, unit='h') -> str: print('dataset opened.') -era5_sst_ds = era5_ds.sel( +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( time=slice(args.start, args.end), level=1000, # surface level only. -).sea_surface_temperature +) print(f'sst_size={era5_sst_ds.nbytes / 2**40}TiBs') @@ -134,7 +134,7 @@ def tfmt(time: np.datetime64, unit='h') -> str: # `time=240` produces 950 MiB chunks # `time=720` produces 2851 MiB chunks --> utilizes 30 GiBs memory per CPU. time_chunks = 96 # four day chunks. -if args.mem_opt_cluster: +if args.memory_opt_cluster: time_chunks = 720 # one month chunks. c.create_table('era5', era5_sst_ds, chunks=dict(time=time_chunks)) @@ -156,7 +156,7 @@ def tfmt(time: np.datetime64, unit='h') -> str: start, end = tfmt(era5_sst_ds.time[0].values), tfmt(era5_sst_ds.time[-1].values) now = tfmt(np.datetime64('now'), 's') results_name = f'global_avg_sst_{start}_to_{end}.{now}' -if args.cluster or args.mem_opt_cluster: +if args.cluster or args.memory_opt_cluster: df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/') else: df.to_csv(results_name + '_*.csv') From 580d4ac0105eaa4e19601a887bd554e3c747e40b Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 30 Mar 2024 17:58:30 +0530 Subject: [PATCH 08/17] Fix cluster VM argument. --- demo/sst.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 99f5134..f531471 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -80,7 +80,7 @@ def tfmt(time: np.datetime64, unit='h') -> str: cluster = Cluster( region='us-central1', spot_policy='spot_with_fallback', - worker_mv_types=['t2a-standard-16'], # 4 GiBs RAM per CPU, ARM. + worker_vm_types='t2a-standard-16', # 4 GiBs RAM per CPU, ARM. ) client = cluster.get_client() @@ -91,7 +91,7 @@ def tfmt(time: np.datetime64, unit='h') -> str: cluster = Cluster( region='us-central1', spot_policy='spot_with_fallback', - worker_vm_types=['m3-ultramem-32'], # 30.5 GiBs RAM per CPU, x86. + worker_vm_types='m3-ultramem-32', # 30.5 GiBs RAM per CPU, x86. ) client = cluster.get_client() From 0fa8434872c2fb3d98390eb644d26a2f658297b3 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 30 Mar 2024 18:42:59 +0530 Subject: [PATCH 09/17] Safer alternative to time ranges. --- demo/sst.py | 50 +++++++++++++++++++++----------------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index f531471..0e034ec 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -13,14 +13,25 @@ import xarray as xr import xarray_sql as qr - -def rand_wx(start_time: str, end_time: str) -> xr.Dataset: +# Instead of letting users choose arbitrary time frames, we only allow +# the following choices. This design prevents users from accidentally +# processing way more data than they might have meant to. We don't +# want to bankrupt folks because they were off a few digits. +TIMEFRAMES = { + 'day': slice('1940-01-01', '1940-01-02'), + 'month': slice('1940-01-01', '1940-02-01'), + 'year': slice('1940-01-01', '1941-01-01'), + 'all': slice('1940-01-01', '2023-11-01'), +} + + +def rand_wx(times) -> xr.Dataset: """Produce a random ARCO-ERA5-like weather dataset.""" np.random.seed(42) lat = np.linspace(-90, 90, num=720) lon = np.linspace(-180, 180, num=1440) - time = xr.date_range(start_time, end_time, freq='H') + time = xr.date_range(times.start, times.stop, freq='H') level = np.array([1000, 500], dtype=np.int32) temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) @@ -44,18 +55,8 @@ def rand_wx(start_time: str, end_time: str) -> xr.Dataset: ) -def tfmt(time: np.datetime64, unit='h') -> str: - """Returns a bucket-friendly date string from a numpy datetime.""" - return np.datetime_as_string(time, unit=unit).replace(':', '') - - parser = argparse.ArgumentParser() -parser.add_argument( - '--start', type=str, default='1940-01-01', help='start time ISO string' -) -parser.add_argument( - '--end', type=str, default='1940-01-02', help='end time ISO string' -) +parser.add_argument('--timeframe', choices=TIMEFRAMES.keys(), default='day') parser.add_argument( '--cluster', action='store_true', @@ -73,6 +74,7 @@ def tfmt(time: np.datetime64, unit='h') -> str: ) args = parser.parse_args() +timeframe = TIMEFRAMES[args.timeframe] if args.cluster: from coiled import Cluster @@ -103,29 +105,20 @@ def tfmt(time: np.datetime64, unit='h') -> str: client = cluster.get_client() if args.fake: - era5_ds = rand_wx(args.start, args.end).chunk({'time': 240, 'level': 1}) + era5_ds = rand_wx(timeframe).chunk({'time': 240, 'level': 1}) else: era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3/', chunks={'time': 240, 'level': 1}, ) - assert np.datetime64(args.start) >= np.datetime64( - '1940-01-01' - ), 'ARCO-ERA5 does not go back before 1940-01-01!' - - assert ( - np.datetime64(args.end) <= era5_ds.time[-1].values - ), f'ARCO-ERA5 does not run until {args.end}!' - print('dataset opened.') era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=slice(args.start, args.end), - level=1000, # surface level only. + time=timeframe, level=1000 ) -print(f'sst_size={era5_sst_ds.nbytes / 2**40}TiBs') +print(f'sst_size={era5_sst_ds.nbytes / 2**40:.5f}TiBs') c = qr.Context() # `time=48` produces 190 MiB chunks @@ -153,9 +146,8 @@ def tfmt(time: np.datetime64, unit='h') -> str: ) # Store the results for visualization later on. -start, end = tfmt(era5_sst_ds.time[0].values), tfmt(era5_sst_ds.time[-1].values) -now = tfmt(np.datetime64('now'), 's') -results_name = f'global_avg_sst_{start}_to_{end}.{now}' +now = np.datetime_as_string(np.datetime64('now'), unit='s').replace(':', '') +results_name = f'global_avg_sst_{timeframe.start}_to_{timeframe.stop}.{now}' if args.cluster or args.memory_opt_cluster: df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/') else: From 29cb8af8f3d0a0219414b870f9ad810cdc92d1ad Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 30 Mar 2024 18:53:55 +0530 Subject: [PATCH 10/17] Choices for cluster; simplifying output name. --- demo/sst.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 0e034ec..6475e20 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -24,6 +24,8 @@ 'all': slice('1940-01-01', '2023-11-01'), } +CLUSTERS = ['local', 'arm', 'mem-opt'] + def rand_wx(times) -> xr.Dataset: """Produce a random ARCO-ERA5-like weather dataset.""" @@ -59,13 +61,10 @@ def rand_wx(times) -> xr.Dataset: parser.add_argument('--timeframe', choices=TIMEFRAMES.keys(), default='day') parser.add_argument( '--cluster', - action='store_true', - help='deploy on coiled cluster, default: local cluster', -) -parser.add_argument( - '--memory-opt-cluster', - action='store_true', - help='deploy on memory-optimized coiled cluster, default: local cluster', + choices=CLUSTERS, + default='local', + help='Choose the Dask cluster type. ' + 'Either: a local cluster, ARM VMs or memory-optimized VMs in GCP via Coiled.', ) parser.add_argument( '--fake', @@ -76,7 +75,7 @@ def rand_wx(times) -> xr.Dataset: args = parser.parse_args() timeframe = TIMEFRAMES[args.timeframe] -if args.cluster: +if args.cluster == 'arm': from coiled import Cluster cluster = Cluster( @@ -87,7 +86,7 @@ def rand_wx(times) -> xr.Dataset: client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) -elif args.memory_opt_cluster: +elif args.cluster == 'mem-opt': from coiled import Cluster cluster = Cluster( @@ -127,7 +126,7 @@ def rand_wx(times) -> xr.Dataset: # `time=240` produces 950 MiB chunks # `time=720` produces 2851 MiB chunks --> utilizes 30 GiBs memory per CPU. time_chunks = 96 # four day chunks. -if args.memory_opt_cluster: +if args.cluster == 'mem-opt': time_chunks = 720 # one month chunks. c.create_table('era5', era5_sst_ds, chunks=dict(time=time_chunks)) @@ -146,9 +145,9 @@ def rand_wx(times) -> xr.Dataset: ) # Store the results for visualization later on. -now = np.datetime_as_string(np.datetime64('now'), unit='s').replace(':', '') -results_name = f'global_avg_sst_{timeframe.start}_to_{timeframe.stop}.{now}' -if args.cluster or args.memory_opt_cluster: - df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/') -else: +now = np.datetime64('now', 's').astype(int) +results_name = f'global_avg_sst_{args.timeframe}_{now}' +if args.cluster == 'local': df.to_csv(results_name + '_*.csv') +else: + df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/') From 6a692849a3b5f411ee3818bd2ce85867bfce41ae Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 30 Mar 2024 19:17:51 +0530 Subject: [PATCH 11/17] Added a "small" cluster as an option. Added more docs for setting up cloud resources and how to run. --- demo/sst.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 6475e20..ecaf980 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -4,7 +4,12 @@ Please run the following to set up cloud resources: ``` gcloud auth application-default login -coiled setup +coiled login +coiled setup gcp --region us-central1 +``` +To run the demo: +``` +./demo/sst.py --timeframe month --cluster small ``` """ import argparse @@ -24,7 +29,7 @@ 'all': slice('1940-01-01', '2023-11-01'), } -CLUSTERS = ['local', 'arm', 'mem-opt'] +CLUSTERS = ['local', 'small', 'arm', 'mem-opt'] def rand_wx(times) -> xr.Dataset: @@ -75,13 +80,23 @@ def rand_wx(times) -> xr.Dataset: args = parser.parse_args() timeframe = TIMEFRAMES[args.timeframe] -if args.cluster == 'arm': +if args.cluster == 'small': from coiled import Cluster cluster = Cluster( region='us-central1', spot_policy='spot_with_fallback', - worker_vm_types='t2a-standard-16', # 4 GiBs RAM per CPU, ARM. + n_workers=8, + ) + + client = cluster.get_client() +elif args.cluster == 'arm': + from coiled import Cluster + + cluster = Cluster( + region='us-central1', + spot_policy='spot_with_fallback', + arm=True, ) client = cluster.get_client() From c86be6988305788d4539250ab8b9ecd9dc92df10 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 30 Mar 2024 19:35:37 +0530 Subject: [PATCH 12/17] Fixed bug found from testing Zarr on local cluster. --- demo/sst.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index ecaf980..1f069b6 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -47,7 +47,7 @@ def rand_wx(times) -> xr.Dataset: return xr.Dataset( data_vars=dict( sea_surface_temperature=( - ['lat', 'lon', 'time', 'level'], + ['lat', 'lon', 'time'], temperature, ), precipitation=(['lat', 'lon', 'time', 'level'], precipitation), @@ -129,10 +129,10 @@ def rand_wx(times) -> xr.Dataset: print('dataset opened.') era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=timeframe, level=1000 + time=timeframe ) -print(f'sst_size={era5_sst_ds.nbytes / 2**40:.5f}TiBs') +print(f'sst_size={era5_sst_ds.nbytes / 2**30:.5f} GiBs') c = qr.Context() # `time=48` produces 190 MiB chunks From 7c2416178dc2ff2d4deedcd0caf307175684d556 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 8 Apr 2024 16:44:53 +0530 Subject: [PATCH 13/17] Adding "fake" prefix to results file name when fake data is used. --- demo/sst.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/demo/sst.py b/demo/sst.py index 1f069b6..c179478 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -162,6 +162,8 @@ def rand_wx(times) -> xr.Dataset: # Store the results for visualization later on. now = np.datetime64('now', 's').astype(int) results_name = f'global_avg_sst_{args.timeframe}_{now}' +if args.fake: + results_name = 'fake_' + results_name if args.cluster == 'local': df.to_csv(results_name + '_*.csv') else: From b74949f873b259d0ae99f23339a243cf8b3d2299 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 13 Apr 2024 09:47:44 +0400 Subject: [PATCH 14/17] Turning on dask-expr for dask-sql to work. --- demo/sst.py | 2 ++ pyproject.toml | 11 ++++++++--- xarray_sql/df.py | 6 ++++++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index c179478..2da54b0 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -3,6 +3,7 @@ Please run the following to set up cloud resources: ``` +pip install ".[demo]" gcloud auth application-default login coiled login coiled setup gcp --region us-central1 @@ -62,6 +63,7 @@ def rand_wx(times) -> xr.Dataset: ) +# TODO(alxmrs): Make spot instances a flag. parser = argparse.ArgumentParser() parser.add_argument('--timeframe', choices=TIMEFRAMES.keys(), default='day') parser.add_argument( diff --git a/pyproject.toml b/pyproject.toml index 57153c1..ccd3306 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,12 +32,17 @@ dependencies = [ ] [project.optional-dependencies] +io = [ + "xarray[io]", + "gcsfs", +] test = [ - "pytest", - "xarray[io]", - "gcsfs", + "xarray_sql[io]", + "pytest", ] + demo = [ + "xarray_sql[io]", "coiled" ] diff --git a/xarray_sql/df.py b/xarray_sql/df.py index 8b700ad..7259b35 100644 --- a/xarray_sql/df.py +++ b/xarray_sql/df.py @@ -10,6 +10,12 @@ Block = t.Dict[t.Hashable, slice] Chunks = t.Optional[t.Dict[str, int]] +# Turn on Dask-Expr +dask.config.set({'dataframe.query-planning-warning': False}) +dask.config.set({'dataframe.query-planning': True}) +# Turn on Copy-On-Write (needs Pandas 2.0). +pd.options.mode.copy_on_write = True + # Borrowed from Xarray def _get_chunk_slicer( From 8e2498f31741856626dc62e0a2eeeb1fbf703176 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 9 Jun 2024 15:17:50 +0200 Subject: [PATCH 15/17] reformatted sst script. --- demo/sst.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 2da54b0..a0f16d3 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -96,9 +96,9 @@ def rand_wx(times) -> xr.Dataset: from coiled import Cluster cluster = Cluster( - region='us-central1', - spot_policy='spot_with_fallback', - arm=True, + region='us-central1', + spot_policy='spot_with_fallback', + arm=True, ) client = cluster.get_client() @@ -130,9 +130,7 @@ def rand_wx(times) -> xr.Dataset: print('dataset opened.') -era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=timeframe -) +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel(time=timeframe) print(f'sst_size={era5_sst_ds.nbytes / 2**30:.5f} GiBs') From 8295c65d94ec227271935c01d6c57169d3cec96c Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 13 Jul 2025 14:36:37 -0700 Subject: [PATCH 16/17] Got SST script to work with dummy data locally. --- demo/sst.py | 31 ++++++++++++++++++------------- xarray_sql/df.py | 8 -------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index a0f16d3..acff4d4 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -8,7 +8,10 @@ coiled login coiled setup gcp --region us-central1 ``` -To run the demo: +To run the local demo: +./demo/sst.py --timeframe day --fake + +To run the remote demo: ``` ./demo/sst.py --timeframe month --cluster small ``` @@ -17,7 +20,8 @@ import numpy as np import xarray as xr -import xarray_sql as qr + +import xarray_sql as xql # Instead of letting users choose arbitrary time frames, we only allow # the following choices. This design prevents users from accidentally @@ -30,7 +34,7 @@ 'all': slice('1940-01-01', '2023-11-01'), } -CLUSTERS = ['local', 'small', 'arm', 'mem-opt'] +CLUSTERS = ['none', 'local', 'small', 'arm', 'mem-opt'] def rand_wx(times) -> xr.Dataset: @@ -42,7 +46,7 @@ def rand_wx(times) -> xr.Dataset: time = xr.date_range(times.start, times.stop, freq='H') level = np.array([1000, 500], dtype=np.int32) - temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) + temperature = 15 + 8 * np.random.randn(720, 1440, len(time)) precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) return xr.Dataset( @@ -69,9 +73,9 @@ def rand_wx(times) -> xr.Dataset: parser.add_argument( '--cluster', choices=CLUSTERS, - default='local', - help='Choose the Dask cluster type. ' - 'Either: a local cluster, ARM VMs or memory-optimized VMs in GCP via Coiled.', + default='none', + help='Choose the Dask cluster type, or none at all.' + 'Either: no cluster, a local cluster, ARM VMs or memory-optimized VMs in GCP via Coiled.', ) parser.add_argument( '--fake', @@ -114,7 +118,8 @@ def rand_wx(times) -> xr.Dataset: client = cluster.get_client() cluster.adapt(minimum=1, maximum=25) -else: + +elif args.cluster == 'local': from dask.distributed import LocalCluster cluster = LocalCluster(processes=False) @@ -134,7 +139,7 @@ def rand_wx(times) -> xr.Dataset: print(f'sst_size={era5_sst_ds.nbytes / 2**30:.5f} GiBs') -c = qr.Context() +c = xql.XarrayContext() # `time=48` produces 190 MiB chunks # `time=96` produces 380 MiB chunks # `time=192` produces 760 MiB chunks @@ -143,7 +148,7 @@ def rand_wx(times) -> xr.Dataset: time_chunks = 96 # four day chunks. if args.cluster == 'mem-opt': time_chunks = 720 # one month chunks. -c.create_table('era5', era5_sst_ds, chunks=dict(time=time_chunks)) +c.from_dataset('era5', era5_sst_ds, chunks=dict(time=time_chunks)) print('beginning query.') # TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. @@ -164,7 +169,7 @@ def rand_wx(times) -> xr.Dataset: results_name = f'global_avg_sst_{args.timeframe}_{now}' if args.fake: results_name = 'fake_' + results_name -if args.cluster == 'local': - df.to_csv(results_name + '_*.csv') +if args.cluster == 'local' or args.cluster == 'none': + df.to_pandas().to_csv(results_name + '.csv') else: - df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/') + df.write_parquet(f'gs://xarray-sql-experiments/{results_name}/') diff --git a/xarray_sql/df.py b/xarray_sql/df.py index 7259b35..10031fd 100644 --- a/xarray_sql/df.py +++ b/xarray_sql/df.py @@ -5,18 +5,10 @@ import pandas as pd import pyarrow as pa import xarray as xr -from datafusion.context import ArrowStreamExportable Block = t.Dict[t.Hashable, slice] Chunks = t.Optional[t.Dict[str, int]] -# Turn on Dask-Expr -dask.config.set({'dataframe.query-planning-warning': False}) -dask.config.set({'dataframe.query-planning': True}) -# Turn on Copy-On-Write (needs Pandas 2.0). -pd.options.mode.copy_on_write = True - - # Borrowed from Xarray def _get_chunk_slicer( dim: t.Hashable, chunk_index: t.Mapping, chunk_bounds: t.Mapping From a09290e4b2626f7362bccd572a4e1c686bc025fb Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 13 Jul 2025 14:46:52 -0700 Subject: [PATCH 17/17] pyink fmt --- xarray_sql/df.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xarray_sql/df.py b/xarray_sql/df.py index 10031fd..7c40c85 100644 --- a/xarray_sql/df.py +++ b/xarray_sql/df.py @@ -9,6 +9,7 @@ Block = t.Dict[t.Hashable, slice] Chunks = t.Optional[t.Dict[str, int]] + # Borrowed from Xarray def _get_chunk_slicer( dim: t.Hashable, chunk_index: t.Mapping, chunk_bounds: t.Mapping