Skip to content

Commit a016130

Browse files
committed
BulkResponse: Add wrapper for improved decoding of HTTP bulk responses
CrateDB HTTP bulk responses include `rowcount=` items, either signalling if a bulk operation succeeded or failed. - success means `rowcount=1` - failure means `rowcount=-2` https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
1 parent 2b6b835 commit a016130

File tree

4 files changed

+163
-0
lines changed

4 files changed

+163
-0
lines changed

CHANGES.txt

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Unreleased
1414
server stacktraces into exception messages.
1515
- Refactoring: The module namespace ``crate.client.test_util`` has been
1616
renamed to ``crate.testing.util``.
17+
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
18+
responses including ``rowcount=`` items.
1719

1820
.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
1921
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/

src/crate/client/result.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import typing as t
2+
from functools import cached_property
3+
4+
5+
class BulkResultItem(t.TypedDict):
6+
"""
7+
Define the shape of a CrateDB bulk request response item.
8+
"""
9+
10+
rowcount: int
11+
12+
13+
class BulkResponse:
14+
"""
15+
Manage a response to a CrateDB bulk request.
16+
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
17+
18+
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
19+
"""
20+
21+
def __init__(
22+
self,
23+
records: t.List[t.Dict[str, t.Any]],
24+
results: t.List[BulkResultItem]):
25+
if records is None:
26+
raise ValueError("Processing a bulk response without records is an invalid operation")
27+
if results is None:
28+
raise ValueError("Processing a bulk response without results is an invalid operation")
29+
self.records = records
30+
self.results = results
31+
32+
@cached_property
33+
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
34+
"""
35+
Compute list of failed records.
36+
37+
CrateDB signals failed inserts using `rowcount=-2`.
38+
39+
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
40+
"""
41+
errors: t.List[t.Dict[str, t.Any]] = []
42+
for record, status in zip(self.records, self.results):
43+
if status["rowcount"] == -2:
44+
errors.append(record)
45+
return errors
46+
47+
@cached_property
48+
def record_count(self) -> int:
49+
"""
50+
Compute bulk size / length of parameter list.
51+
"""
52+
if not self.records:
53+
return 0
54+
return len(self.records)
55+
56+
@cached_property
57+
def success_count(self) -> int:
58+
"""
59+
Compute number of succeeding records within a batch.
60+
"""
61+
return self.record_count - self.failed_count
62+
63+
@cached_property
64+
def failed_count(self) -> int:
65+
"""
66+
Compute number of failed records within a batch.
67+
"""
68+
return len(self.failed_records)

tests/client/test_result.py

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import sys
2+
import unittest
3+
4+
from crate import client
5+
from crate.client.exceptions import ProgrammingError
6+
from .layer import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
7+
from .settings import crate_host
8+
9+
10+
class BulkOperationTest(unittest.TestCase):
11+
12+
def setUp(self):
13+
setUpCrateLayerBaseline(self)
14+
15+
def tearDown(self):
16+
tearDownDropEntitiesBaseline(self)
17+
18+
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
19+
def test_executemany_with_bulk_response_partial(self):
20+
21+
# Import at runtime is on purpose, to permit skipping the test case.
22+
from crate.client.result import BulkResponse
23+
24+
connection = client.connect(crate_host)
25+
cursor = connection.cursor()
26+
27+
# Run SQL DDL.
28+
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")
29+
30+
# Run a batch insert that only partially succeeds.
31+
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
32+
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)
33+
34+
# Verify CrateDB response.
35+
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])
36+
37+
# Verify decoded response.
38+
bulk_response = BulkResponse(invalid_records, result)
39+
self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
40+
self.assertEqual(bulk_response.record_count, 2)
41+
self.assertEqual(bulk_response.success_count, 1)
42+
self.assertEqual(bulk_response.failed_count, 1)
43+
44+
cursor.execute("REFRESH TABLE foobar;")
45+
cursor.execute("SELECT * FROM foobar;")
46+
result = cursor.fetchall()
47+
self.assertEqual(result, [[1, "Hotzenplotz 1"]])
48+
49+
cursor.close()
50+
connection.close()
51+
52+
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
53+
def test_executemany_empty(self):
54+
55+
connection = client.connect(crate_host)
56+
cursor = connection.cursor()
57+
58+
# Run SQL DDL.
59+
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")
60+
61+
# Run a batch insert that is empty.
62+
with self.assertRaises(ProgrammingError) as cm:
63+
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
64+
self.assertEqual(
65+
str(cm.exception),
66+
"SQLParseException[The query contains a parameter placeholder $1, "
67+
"but there are only 0 parameter values]")
68+
69+
cursor.close()
70+
connection.close()
71+
72+
@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
73+
def test_bulk_response_empty_records_or_results(self):
74+
75+
# Import at runtime is on purpose, to permit skipping the test case.
76+
from crate.client.result import BulkResponse
77+
78+
with self.assertRaises(ValueError) as cm:
79+
BulkResponse(records=None, results=None)
80+
self.assertEqual(
81+
str(cm.exception),
82+
"Processing a bulk response without records is an invalid operation")
83+
84+
with self.assertRaises(ValueError) as cm:
85+
BulkResponse(records=[], results=None)
86+
self.assertEqual(
87+
str(cm.exception),
88+
"Processing a bulk response without results is an invalid operation")

tests/client/tests.py

+5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
TestDefaultSchemaHeader
99
from .layer import makeSuite, setUpWithHttps, HttpsTestServerLayer, setUpCrateLayerBaseline, \
1010
tearDownDropEntitiesBaseline, ensure_cratedb_layer
11+
from .test_result import BulkOperationTest
1112

1213

1314
def test_suite():
@@ -51,6 +52,10 @@ def test_suite():
5152
# Integration tests.
5253
layer = ensure_cratedb_layer()
5354

55+
s = makeSuite(BulkOperationTest)
56+
s.layer = layer
57+
suite.addTest(s)
58+
5459
s = doctest.DocFileSuite(
5560
'docs/by-example/http.rst',
5661
'docs/by-example/client.rst',

0 commit comments

Comments
 (0)