Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.

Commit 76d588d

Browse files
authored
Make PyArrow cpu_count to correspond NUMBA_NUM_THREADS (#349)
* Make PyArrow cpu_count to correspond NUMBA_NUM_THREADS Add pyarrow_cpu_count context manager which always returns cpu_count to previous value. * Improve benchmark test for read_csv() Use single config data in all places Add PyArrow benchmark record read_csv data size (1m,10) Show size [rows,cols] Implement data file caching and move functions for generating to gen_csv.py * Use functools.wraps * Use contextlib.contextmanager
1 parent 0648fee commit 76d588d

File tree

3 files changed

+103
-23
lines changed

3 files changed

+103
-23
lines changed

sdc/io/csv_ext.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
# *****************************************************************************
2626

2727

28+
import contextlib
29+
import functools
30+
2831
import llvmlite.binding as ll
2932
from llvmlite import ir as lir
3033
from .. import hio
@@ -398,6 +401,26 @@ def to_varname(string):
398401
return re.sub(r'\W|^(?=\d)','_', string)
399402

400403

404+
@contextlib.contextmanager
405+
def pyarrow_cpu_count(cpu_count=pyarrow.cpu_count()):
406+
old_cpu_count = pyarrow.cpu_count()
407+
pyarrow.set_cpu_count(cpu_count)
408+
try:
409+
yield
410+
finally:
411+
pyarrow.set_cpu_count(old_cpu_count)
412+
413+
414+
def pyarrow_cpu_count_equal_numba_num_treads(func):
415+
"""Decorator. Set pyarrow cpu_count the same as NUMBA_NUM_THREADS."""
416+
@functools.wraps(func)
417+
def wrapper(*args, **kwargs):
418+
with pyarrow_cpu_count(numba.config.NUMBA_NUM_THREADS):
419+
return func(*args, **kwargs)
420+
return wrapper
421+
422+
423+
@pyarrow_cpu_count_equal_numba_num_treads
401424
def pandas_read_csv(
402425
filepath_or_buffer,
403426
sep=",",

sdc/tests/tests_perf/gen_csv.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,46 @@ def generate(rows, headers, providers, file_name):
4343
writer.writeheader()
4444
for i in range(rows):
4545
writer.writerow({k: p() for k, p in zip(headers, providers)})
46+
47+
48+
def md5(filename):
49+
"""Return MD5 sum of the file."""
50+
import hashlib
51+
hash_md5 = hashlib.md5()
52+
with open(filename, "rb") as f:
53+
for chunk in iter(lambda: f.read(4096), b""):
54+
hash_md5.update(chunk)
55+
return hash_md5.hexdigest()
56+
57+
58+
def csv_file_name(rows=10**5, columns=10, seed=0):
59+
"""Return file name for given parameters."""
60+
return f"data_{rows}_{columns}_{seed}.csv"
61+
62+
63+
def generate_csv(rows=10**5, columns=10, seed=0):
64+
"""Generate CSV file and return file name."""
65+
import random
66+
67+
md5_sums = {
68+
(10**6, 10, 0): "6fa2a115dfeaee4f574106b513ad79e6"
69+
}
70+
71+
file_name = csv_file_name(rows, columns, seed)
72+
73+
try:
74+
if md5_sums.get((rows, columns, seed)) == md5(file_name):
75+
return file_name
76+
except:
77+
pass
78+
79+
r = random.Random(seed)
80+
generate(rows,
81+
[f"f{c}" for c in range(columns)],
82+
[lambda: r.uniform(-1.0, 1.0) for _ in range(columns)],
83+
file_name
84+
)
85+
86+
md5_sums[(rows, columns, seed)] = md5(file_name)
87+
88+
return file_name

sdc/tests/tests_perf/test_perf_read_csv.py

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,32 +26,15 @@
2626
# *****************************************************************************
2727

2828
import time
29-
import random
3029

3130
import pandas
31+
import pyarrow.csv
3232
import sdc
3333

34-
from .test_perf_base import TestBase
35-
from .test_perf_utils import calc_compilation, get_times
34+
from sdc.tests.tests_perf.test_perf_base import TestBase
35+
from sdc.tests.tests_perf.test_perf_utils import calc_compilation, get_times
3636

37-
from .gen_csv import generate
38-
39-
40-
def generate_csv():
41-
"""Generate CSV file and return file name."""
42-
rows = 10**5
43-
file_name = f"data_{rows}.csv"
44-
r = random.Random(0) # seed=0
45-
generate(rows,
46-
['A', 'B', 'C'],
47-
[
48-
lambda: int(r.random() * 10000),
49-
lambda: r.uniform(-1.0, 1.0),
50-
lambda: r.uniform(-1.0, 1.0)
51-
],
52-
file_name
53-
)
54-
return file_name
37+
from sdc.tests.tests_perf.gen_csv import generate_csv
5538

5639

5740
def make_func(file_name):
@@ -65,12 +48,23 @@ def _function():
6548
return _function
6649

6750

51+
def make_func_pyarrow(file_name):
52+
"""Create function implemented via PyArrow."""
53+
def _function():
54+
start = time.time()
55+
df = sdc.io.csv_ext.pandas_read_csv(file_name)
56+
return time.time() - start, df
57+
return _function
58+
59+
6860
class TestPandasReadCSV(TestBase):
6961

7062
@classmethod
7163
def setUpClass(cls):
7264
super().setUpClass()
73-
cls.generated_file = generate_csv()
65+
cls.rows = 10**6
66+
cls.columns = 10
67+
cls.generated_file = generate_csv(cls.rows, cls.columns)
7468

7569
def _test_jitted(self, pyfunc, record, *args, **kwargs):
7670
# compilation time
@@ -92,7 +86,7 @@ def _test_python(self, pyfunc, record, *args, **kwargs):
9286
def _test_case(self, pyfunc, name):
9387
base = {
9488
"test_name": name,
95-
"data_size": 10**5,
89+
"data_size": f"[{self.rows},{self.columns}]",
9690
}
9791

9892
record = base.copy()
@@ -107,3 +101,23 @@ def _test_case(self, pyfunc, name):
107101

108102
def test_read_csv(self):
109103
self._test_case(make_func(self.generated_file), 'read_csv')
104+
105+
def test_read_csv_pyarrow(self):
106+
pyfunc = make_func_pyarrow(self.generated_file)
107+
name = 'read_csv'
108+
109+
base = {
110+
"test_name": name,
111+
"data_size": f"[{self.rows},{self.columns}]",
112+
}
113+
114+
record = base.copy()
115+
record["test_type"] = 'PyArrow'
116+
self._test_python(pyfunc, record)
117+
self.test_results.add(**record)
118+
119+
120+
if __name__ == "__main__":
121+
print("Gererate data files...")
122+
generate_csv(rows=10**6, columns=10)
123+
print("Data files generated!")

0 commit comments

Comments
 (0)