Skip to content

Commit 8daaf25

Browse files
authored
Merge pull request #161 from poissoncorp/v5.2
Bulk Insert
2 parents 510a12d + ee53910 commit 8daaf25

File tree

19 files changed

+790
-57
lines changed

19 files changed

+790
-57
lines changed

ravendb/documents/bulk_insert_operation.py

Lines changed: 443 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import json
2+
from typing import Optional
3+
4+
import requests
5+
6+
from ravendb.http.server_node import ServerNode
7+
from ravendb.http.raven_command import RavenCommand, VoidRavenCommand
8+
9+
10+
class GetNextOperationIdCommand(RavenCommand[int]):
11+
def __init__(self):
12+
super(GetNextOperationIdCommand, self).__init__(int)
13+
self._node_tag = 0
14+
15+
def is_read_request(self) -> bool:
16+
return False # disable caching
17+
18+
def create_request(self, node: ServerNode) -> requests.Request:
19+
return requests.Request("GET", f"{node.url}/databases/{node.database}/operations/next-operation-id")
20+
21+
def set_response(self, response: Optional[str], from_cache: bool) -> None:
22+
json_node = json.loads(response)
23+
self.result = json_node.get("Id", None)
24+
self._node_tag = json_node.get("NodeTag", None)
25+
26+
27+
class KillOperationCommand(VoidRavenCommand):
28+
def __init__(self, operation_id: int, node_tag: Optional[str] = None):
29+
super(KillOperationCommand, self).__init__()
30+
self._id = operation_id
31+
self._selected_node_tag = node_tag
32+
33+
def create_request(self, node: ServerNode) -> requests.Request:
34+
return requests.Request("POST", f"{node.url}/databases/{node.database}/operations/kill?id={self._id}")

ravendb/documents/conventions.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,10 @@ def json_default(o):
219219
elif isinstance(o, (int, float)):
220220
return str(o)
221221
else:
222-
raise TypeError(repr(o) + " is not JSON serializable (Try add a json default method to convention)")
222+
raise TypeError(
223+
repr(o) + " is not JSON serializable (Try add a json default method to convention"
224+
" or try to add methods - to_json & classmethod from_json - to object class)"
225+
)
223226

224227
@staticmethod
225228
def default_transform_plural(name):

ravendb/documents/queries/query.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ def __init__(
8080
self.project_into: Union[None, bool] = None
8181
self.projection_behavior: Union[None, ProjectionBehavior] = None
8282

83+
@classmethod
84+
def custom_function(cls, alias: str, func: str) -> QueryData:
85+
return cls([func], [], alias, None, None, True)
86+
8387

8488
class QueryResultBase(Generic[_TResult, _TIncludes]):
8589
@abstractmethod

ravendb/documents/session/entity_to_json.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def convert_entity_to_json_internal_static(
5454
json_node = Utils.entity_to_dict(entity, conventions.json_default_method)
5555
EntityToJson.write_metadata(json_node, document_info)
5656
if remove_identity_property:
57-
EntityToJson.try_remove_identity_property(entity)
57+
EntityToJson.try_remove_identity_property_json(json_node)
5858
return json_node
5959

6060
@staticmethod
@@ -69,7 +69,7 @@ def _convert_entity_to_json_internal(
6969
json_node = Utils.entity_to_dict(entity, self._session.conventions.json_default_method)
7070
self.write_metadata(json_node, document_info)
7171
if remove_identity_property:
72-
self.try_remove_identity_property(json_node)
72+
self.try_remove_identity_property_json(json_node)
7373
return json_node
7474

7575
# todo: refactor this method, make it more useful/simple and less ugly (like this return...[0])
@@ -100,6 +100,14 @@ def try_remove_identity_property(document):
100100
except AttributeError:
101101
return False
102102

103+
@staticmethod
104+
def try_remove_identity_property_json(document: Dict) -> bool:
105+
try:
106+
del document["Id"]
107+
return True
108+
except KeyError:
109+
return False
110+
103111
@staticmethod
104112
def write_metadata(json_node: dict, document_info: DocumentInfo):
105113
if document_info is None:
@@ -169,7 +177,7 @@ def convert_to_entity_static(
169177
# todo: Separate it into two different functions and isolate the return statements from the first part
170178

171179
# I. Extract the object type
172-
metadata = document.pop("@metadata")
180+
metadata = document.get("@metadata")
173181
document_deepcopy = deepcopy(document)
174182

175183
# 1. Get type from metadata

ravendb/documents/session/operations/lazy.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from ravendb.tools.utils import Utils, CaseInsensitiveDict
2121
from ravendb.documents.queries.query import QueryResult
2222
from ravendb.extensions.json_extensions import JsonExtensions
23-
from ravendb.documents.commands.crud import GetDocumentsResult, ConditionalGetDocumentsCommand
23+
from ravendb.documents.commands.crud import GetDocumentsResult, ConditionalGetDocumentsCommand, ConditionalGetResult
2424
from ravendb.documents.session.operations.load_operation import LoadOperation
2525
from ravendb.documents.conventions import DocumentConventions
2626
from ravendb.documents.operations.lazy.lazy_operation import LazyOperation
@@ -166,7 +166,7 @@ def handle_response(self, response: "GetResponse") -> None:
166166
if response.result is not None:
167167
etag = response.headers.get(constants.Headers.ETAG)
168168

169-
res = ConditionalGetDocumentsCommand.ConditionalGetResult.from_json(json.loads(response.result))
169+
res = ConditionalGetResult.from_json(json.loads(response.result))
170170
document_info = DocumentInfo.get_new_document_info(res.results[0])
171171
r = self.__session.track_entity_document_info(self.__object_type, document_info)
172172

@@ -227,7 +227,7 @@ def load_starting_with(
227227
def conditional_load(
228228
self, key: str, change_vector: str, object_type: Type[_T] = None
229229
) -> Lazy[ConditionalLoadResult[_T]]:
230-
if not key.isspace():
230+
if not key or key.isspace():
231231
raise ValueError("key cannot be None or whitespace")
232232

233233
if self._delegate.is_loaded(key):
@@ -236,7 +236,7 @@ def __lazy_factory():
236236
entity = self._delegate.load(key, object_type)
237237
if entity is None:
238238
return ConditionalLoadResult.create(None, None)
239-
cv = self._delegate.get_change_vector_for(entity)
239+
cv = self._delegate.advanced.get_change_vector_for(entity)
240240
return ConditionalLoadResult.create(entity, cv)
241241

242242
return Lazy(__lazy_factory)

ravendb/documents/session/operations/load_operation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ def with_time_series(self, time_series: List[TimeSeriesRange]):
9898
self._time_series_to_include = time_series
9999
return self
100100

101-
def by_keys(self, keys: List[str]):
102-
distinct = CaseInsensitiveSet(filter(lambda key: key and key.strip(), keys))
101+
def by_keys(self, keys: List[Optional[str]]):
102+
distinct = CaseInsensitiveSet(filter(lambda x: x and not x.isspace(), keys))
103103
self._keys = list(distinct)
104104
return self
105105

ravendb/documents/store/definition.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from ravendb import constants, exceptions
1010
from ravendb.changes.database_changes import DatabaseChanges
11+
from ravendb.documents.bulk_insert_operation import BulkInsertOperation
1112
from ravendb.documents.operations.executor import MaintenanceOperationExecutor, OperationExecutor
1213
from ravendb.documents.operations.indexes import PutIndexesOperation
1314
from ravendb.documents.session.event_args import (
@@ -163,14 +164,10 @@ def maintenance(self) -> MaintenanceOperationExecutor:
163164
def operations(self) -> OperationExecutor:
164165
pass
165166

166-
# todo: changes
167-
168167
# todo: aggressive_caching
169168

170169
# todo: time_series
171170

172-
# todo: bulk_insert
173-
174171
@abstractmethod
175172
def open_session(self, database: Optional[str] = None, session_options: Optional = None):
176173
pass
@@ -294,26 +291,25 @@ def get_effective_database_static(store: DocumentStoreBase, database: str) -> st
294291
if database is None:
295292
database = store.database
296293

297-
if not database.isspace():
294+
if database and not database.isspace():
298295
return database
299296

300297
raise ValueError(
301298
"Cannot determine database to operate on. "
302299
"Please either specify 'database' directly as an action parameter "
303-
"or set the default database to operate on using 'DocumentStore.setDatabase' method. "
300+
"or set the default database to operate on using 'DocumentStore.database'. "
304301
"Did you forget to pass 'database' parameter?"
305302
)
306303

307304

308305
class DocumentStore(DocumentStoreBase):
309-
def __init__(self, urls: Optional[Union[str, List[str]]] = None, database: Optional[str] = None):
306+
def __init__(self, urls: Union[str, List[str]] = None, database: Optional[str] = None):
310307
super(DocumentStore, self).__init__()
311308
self.__subscriptions = DocumentSubscriptions(self)
312309
self.__thread_pool_executor = ThreadPoolExecutor()
313310
self.urls = [urls] if isinstance(urls, str) else urls
314311
self.database = database
315312
self.__request_executors: Dict[str, Lazy[RequestExecutor]] = CaseInsensitiveDict()
316-
# todo: database changes
317313
# todo: aggressive cache
318314
self.__maintenance_operation_executor: Union[None, MaintenanceOperationExecutor] = None
319315
self.__operation_executor: Union[None, OperationExecutor] = None
@@ -519,6 +515,10 @@ def initialize(self) -> DocumentStore:
519515

520516
# todo: aggressively cache
521517

518+
def bulk_insert(self, database_name: Optional[str] = None) -> BulkInsertOperation:
519+
self.assert_initialized()
520+
return BulkInsertOperation(self.get_effective_database(database_name), self)
521+
522522
def _assert_valid_configuration(self) -> None:
523523
if not self.urls:
524524
raise ValueError("Document URLs cannot be empty.")
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from typing import Optional
2+
3+
from ravendb.exceptions.raven_exceptions import RavenException
4+
5+
6+
class BulkInsertAbortedException(RavenException):
7+
def __init__(self, message: str, cause: Optional[Exception] = None):
8+
super(BulkInsertAbortedException, self).__init__(message, cause)
9+
10+
11+
class BulkInsertProtocolViolationException(RavenException):
12+
def __init__(self, message: str, cause: Optional[Exception] = None):
13+
super(BulkInsertProtocolViolationException, self).__init__(message, cause)

ravendb/http/request_executor.py

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import datetime
4+
import inspect
45
import json
56
import logging
67
import os
@@ -579,33 +580,21 @@ def __send_request_to_server(
579580
session_info: SessionInfo,
580581
request: requests.Request,
581582
url: str,
582-
) -> requests.Response:
583+
) -> Optional[requests.Response]:
583584
try:
584585
self.number_of_server_requests += 1
585586
timeout = command.timeout if command.timeout else self.__default_timeout
586-
if timeout:
587+
588+
if not timeout:
589+
return self.__send(chosen_node, command, session_info, request)
590+
591+
else:
587592
try:
588-
# todo: create Task from lines below and call it
589-
# AggressiveCacheOptions callingTheadAggressiveCaching = aggressiveCaching.get();
590-
# CompletableFuture<CloseableHttpResponse> sendTask = CompletableFuture.supplyAsync(() ->
591-
# AggressiveCacheOptions aggressiveCacheOptionsToRestore = aggressiveCaching.get();
592-
try:
593-
return self.__send(chosen_node, command, session_info, request)
594-
except IOError:
595-
# throw ExceptionsUtils.unwrapException(e);
596-
raise
597-
# finally aggressiveCaching.set(aggressiveCacheOptionsToRestore);
593+
return self.__send(chosen_node, command, session_info, request)
598594
except requests.Timeout as t:
599-
# request.abort()
600-
# net.ravendb.client.exceptions.TimeoutException timeoutException =
601-
# new net.ravendb.client.exceptions.TimeoutException(
602-
# "The request for " + request.getURI() + " failed with timeout after " +
603-
# TimeUtils.durationToTimeSpan(timeout), e);
604-
605595
if not should_retry:
606596
if command.failed_nodes is None:
607597
command.failed_nodes = {}
608-
609598
command.failed_nodes[chosen_node] = t
610599
raise t
611600

@@ -615,10 +604,6 @@ def __send_request_to_server(
615604
self.__throw_failed_to_contact_all_nodes(command, request)
616605

617606
return None
618-
except IOError as e:
619-
raise e
620-
else:
621-
return self.__send(chosen_node, command, session_info, request)
622607
except IOError as e:
623608
if not should_retry:
624609
raise
@@ -633,7 +618,7 @@ def __send_request_to_server(
633618
def __send(
634619
self, chosen_node: ServerNode, command: RavenCommand, session_info: SessionInfo, request: requests.Request
635620
) -> requests.Response:
636-
response: requests.Response = None
621+
response: Optional[requests.Response] = None
637622

638623
if self.should_execute_on_all(chosen_node, command):
639624
response = self.__execute_on_all_to_figure_out_the_fastest(chosen_node, command)
@@ -891,7 +876,8 @@ def __supply_async(
891876

892877
def __create_request(self, node: ServerNode, command: RavenCommand) -> requests.Request:
893878
request = command.create_request(node)
894-
if request.data and not isinstance(request.data, str):
879+
# todo: optimize that if - look for the way to make less ifs each time
880+
if request.data and not isinstance(request.data, str) and not inspect.isgenerator(request.data):
895881
request.data = json.dumps(request.data, default=self.conventions.json_default_method)
896882

897883
# todo: 1117 - 1133
@@ -1125,10 +1111,10 @@ def __handle_server_down(
11251111
self,
11261112
url: str,
11271113
chosen_node: ServerNode,
1128-
node_index: int,
1114+
node_index: Optional[int],
11291115
command: RavenCommand,
11301116
request: requests.Request,
1131-
response: requests.Response,
1117+
response: Optional[requests.Response],
11321118
e: Exception,
11331119
session_info: SessionInfo,
11341120
should_retry: bool,

0 commit comments

Comments
 (0)