Skip to content

Commit 8295c65

Browse files
committed
Got SST script to work with dummy data locally.
1 parent 8e2498f commit 8295c65

File tree

2 files changed

+18
-21
lines changed

2 files changed

+18
-21
lines changed

demo/sst.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
coiled login
99
coiled setup gcp --region us-central1
1010
```
11-
To run the demo:
11+
To run the local demo:
12+
./demo/sst.py --timeframe day --fake
13+
14+
To run the remote demo:
1215
```
1316
./demo/sst.py --timeframe month --cluster small
1417
```
@@ -17,7 +20,8 @@
1720

1821
import numpy as np
1922
import xarray as xr
20-
import xarray_sql as qr
23+
24+
import xarray_sql as xql
2125

2226
# Instead of letting users choose arbitrary time frames, we only allow
2327
# the following choices. This design prevents users from accidentally
@@ -30,7 +34,7 @@
3034
'all': slice('1940-01-01', '2023-11-01'),
3135
}
3236

33-
CLUSTERS = ['local', 'small', 'arm', 'mem-opt']
37+
CLUSTERS = ['none', 'local', 'small', 'arm', 'mem-opt']
3438

3539

3640
def rand_wx(times) -> xr.Dataset:
@@ -42,7 +46,7 @@ def rand_wx(times) -> xr.Dataset:
4246
time = xr.date_range(times.start, times.stop, freq='H')
4347
level = np.array([1000, 500], dtype=np.int32)
4448

45-
temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level))
49+
temperature = 15 + 8 * np.random.randn(720, 1440, len(time))
4650
precipitation = 10 * np.random.rand(720, 1440, len(time), len(level))
4751

4852
return xr.Dataset(
@@ -69,9 +73,9 @@ def rand_wx(times) -> xr.Dataset:
6973
parser.add_argument(
7074
'--cluster',
7175
choices=CLUSTERS,
72-
default='local',
73-
help='Choose the Dask cluster type. '
74-
'Either: a local cluster, ARM VMs or memory-optimized VMs in GCP via Coiled.',
76+
default='none',
77+
help='Choose the Dask cluster type, or none at all.'
78+
'Either: no cluster, a local cluster, ARM VMs or memory-optimized VMs in GCP via Coiled.',
7579
)
7680
parser.add_argument(
7781
'--fake',
@@ -114,7 +118,8 @@ def rand_wx(times) -> xr.Dataset:
114118

115119
client = cluster.get_client()
116120
cluster.adapt(minimum=1, maximum=25)
117-
else:
121+
122+
elif args.cluster == 'local':
118123
from dask.distributed import LocalCluster
119124

120125
cluster = LocalCluster(processes=False)
@@ -134,7 +139,7 @@ def rand_wx(times) -> xr.Dataset:
134139

135140
print(f'sst_size={era5_sst_ds.nbytes / 2**30:.5f} GiBs')
136141

137-
c = qr.Context()
142+
c = xql.XarrayContext()
138143
# `time=48` produces 190 MiB chunks
139144
# `time=96` produces 380 MiB chunks
140145
# `time=192` produces 760 MiB chunks
@@ -143,7 +148,7 @@ def rand_wx(times) -> xr.Dataset:
143148
time_chunks = 96 # four day chunks.
144149
if args.cluster == 'mem-opt':
145150
time_chunks = 720 # one month chunks.
146-
c.create_table('era5', era5_sst_ds, chunks=dict(time=time_chunks))
151+
c.from_dataset('era5', era5_sst_ds, chunks=dict(time=time_chunks))
147152

148153
print('beginning query.')
149154
# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box.
@@ -164,7 +169,7 @@ def rand_wx(times) -> xr.Dataset:
164169
results_name = f'global_avg_sst_{args.timeframe}_{now}'
165170
if args.fake:
166171
results_name = 'fake_' + results_name
167-
if args.cluster == 'local':
168-
df.to_csv(results_name + '_*.csv')
172+
if args.cluster == 'local' or args.cluster == 'none':
173+
df.to_pandas().to_csv(results_name + '.csv')
169174
else:
170-
df.to_parquet(f'gs://xarray-sql-experiments/{results_name}/')
175+
df.write_parquet(f'gs://xarray-sql-experiments/{results_name}/')

xarray_sql/df.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,10 @@
55
import pandas as pd
66
import pyarrow as pa
77
import xarray as xr
8-
from datafusion.context import ArrowStreamExportable
98

109
Block = t.Dict[t.Hashable, slice]
1110
Chunks = t.Optional[t.Dict[str, int]]
1211

13-
# Turn on Dask-Expr
14-
dask.config.set({'dataframe.query-planning-warning': False})
15-
dask.config.set({'dataframe.query-planning': True})
16-
# Turn on Copy-On-Write (needs Pandas 2.0).
17-
pd.options.mode.copy_on_write = True
18-
19-
2012
# Borrowed from Xarray
2113
def _get_chunk_slicer(
2214
dim: t.Hashable, chunk_index: t.Mapping, chunk_bounds: t.Mapping

0 commit comments

Comments
 (0)