diff --git a/README.md b/README.md index 571be54..82b1448 100644 --- a/README.md +++ b/README.md @@ -8,21 +8,48 @@ Simply execute `pip install pricecypher-sdk` ### Dataset SDK ```python +import asyncio from pricecypher import Datasets -datasets = Datasets(BEARER_TOKEN) -datasets.index() -datasets.get_meta(DATASET_ID) -datasets.get_scopes(DATASET_ID) -datasets.get_scope_values(DATASET_ID, SCOPE_ID) -datasets.get_transaction_summary(DATASET_ID) +async def handle_page(page, page_nr, last_page): + """ This function will be called for each individual page of transactions that is received.""" + print(f'Handling page {page_nr}/{last_page}') + print(f'Nr of transactions in page: {len(page)}') + print(f'Handling page {page_nr}/{last_page} done') + + +async def main(): + async with Datasets(BEARER_TOKEN) as ds: + # Specify desired columns to be requested for transactions + columns = [ + {'name_dataset': 'cust_group', 'filter': ['Big', 'Small'], 'key': 'group'}, + {'representation': 'cost_price', 'aggregate': 'sum', 'key': 'cost_price'} + ] + + index = asyncio.create_task(ds.index()) + meta = asyncio.create_task(ds.get_meta(DATASET_ID)) + scopes = asyncio.create_task(ds.get_scopes(DATASET_ID)) + values = asyncio.create_task(ds.get_scope_values(DATASET_ID, SCOPE_ID)) + summary = asyncio.create_task(ds.get_transaction_summary(DATASET_ID)) + transactions = asyncio.create_task(ds.get_transactions(DATASET_ID, AGGREGATE, columns)) + + print('datasets', await index) + print('transactions', await transactions) + print('meta', await meta) + print('summary', await summary) + print('scopes', await scopes) + print('scope values', await values) + +asyncio.run(main()) +``` + +### Printing debug logs +By default, debug log messages are not printed. Debug logs can be enabled using the following. +```python +import logging -columns = [ - {'name_dataset': 'cust_group', 'filter': ['Big', 'Small'], 'key': 'group'}, - {'representation': 'cost_price', 'aggregate': 'sum', 'key': 'cost_price'} -] -datasets.get_transactions(DATASET_ID, AGGREGATE, columns) +logging.basicConfig(level=logging.DEBUG) ``` ### Contracts @@ -51,8 +78,14 @@ Similarly, each file in the `models` module defines the models that are provided The SDK that this package provides is contained in the top-level package contents. ## Deployment -1. Execute `python3 -m build` to build the source archive and a built distribution. -2. Execute `python3 -m twine upload dist/*` to upload the package to PyPi. +1. Execute `python -m build` to build the source archive and a built distribution. +2. Execute `python -m twine upload dist/*` to upload the package to PyPi. + +### Snapshot +To deploy a snapshot release, follow the next steps instead. +1. Add `-pre` to the version in `setup.cfg`. +2. Execute `python -m build -C--global-option=egg_info -C--global-option=--tag-build=dev`. +3. Execute `python -m twine upload dist/*`. ## Authors diff --git a/setup.cfg b/setup.cfg index 297d18a..c4a5dee 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pricecypher-sdk -version = 0.5.0 +version = 0.6.0 author = Deloitte Consulting B.V. description = Python wrapper around the different PriceCypher APIs long_description = file: README.md @@ -25,6 +25,7 @@ install_requires = pandas>=1.4.1 numpy>=1.18.5 typeguard>=2.13.3 + aiohttp>=3.8.3 [options.packages.find] where = src diff --git a/src/pricecypher/datasets.py b/src/pricecypher/datasets.py index cb8099d..e14d51b 100644 --- a/src/pricecypher/datasets.py +++ b/src/pricecypher/datasets.py @@ -1,8 +1,13 @@ +import asyncio +import logging +import time + from datetime import datetime from pandas import DataFrame from pricecypher.collections import ScopeCollection, ScopeValueCollection from pricecypher.endpoints import DatasetsEndpoint, UsersEndpoint +from pricecypher.rest import RestClient class Datasets(object): @@ -35,8 +40,15 @@ def __init__(self, bearer_token, users_base=None, dss_base=None, rest_options=No self._dss_base = dss_base self._rest_options = rest_options self._all_meta = None + self._client = RestClient(jwt=bearer_token, options=rest_options) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._client.close() - def _get_dss_base(self, dataset_id): + async def _get_dss_base(self, dataset_id): """ Get dataset service url base for the given dataset ID. Will be fetched from dataset META if no dss_base present. @@ -49,9 +61,10 @@ def _get_dss_base(self, dataset_id): if self._dss_base is not None: return self._dss_base - return self.get_meta(dataset_id).dss_url + meta = await self.get_meta(dataset_id) + return meta.dss_url - def index(self): + async def index(self): """ List all available datasets the user has access to. Response is cached in this instance for as long as this instance lives. @@ -60,20 +73,20 @@ def index(self): :rtype list[Dataset] """ if self._all_meta is None: - self._all_meta = UsersEndpoint(self._bearer, self._users_base, self._rest_options).datasets().index() + self._all_meta = await UsersEndpoint(self._client, self._users_base).datasets().index() return self._all_meta - def get_meta(self, dataset_id): + async def get_meta(self, dataset_id): """ Get metadata like the dataset service url and time of creation of a dataset :param dataset_id: Dataset to get metadata for. :rtype: Dataset """ - return next((d for d in self.index() if d.id == dataset_id), None) + return next((d for d in await self.index() if d.id == dataset_id), None) - def get_scopes(self, dataset_id, bc_id='all'): + async def get_scopes(self, dataset_id, bc_id='all'): """ Get all scopes for the given dataset. @@ -83,14 +96,15 @@ def get_scopes(self, dataset_id, bc_id='all'): :return: Collection of scopes for the given dataset. :rtype: ScopeCollection """ + dss_base = await self._get_dss_base(dataset_id) return ScopeCollection( - DatasetsEndpoint(self._bearer, dataset_id, self._get_dss_base(dataset_id), self._rest_options) + await DatasetsEndpoint(self._client, dataset_id, dss_base) .business_cell(bc_id) .scopes() .index() ) - def get_scope_values(self, dataset_id, scope_id, bc_id='all'): + async def get_scope_values(self, dataset_id, scope_id, bc_id='all'): """ Get all scopes values for the given scope within the given dataset. @@ -101,15 +115,15 @@ def get_scope_values(self, dataset_id, scope_id, bc_id='all'): :return: Collection of scope values for the given scope within the given dataset. :rtype: ScopeValueCollection """ - dss_base = self._get_dss_base(dataset_id) + dss_base = await self._get_dss_base(dataset_id) return ScopeValueCollection( - DatasetsEndpoint(self._bearer, dataset_id, dss_base, self._rest_options) + await DatasetsEndpoint(self._client, dataset_id, dss_base) .business_cell(bc_id) .scopes() .scope_values(scope_id) ) - def get_transaction_summary(self, dataset_id, bc_id='all', intake_status=None): + async def get_transaction_summary(self, dataset_id, bc_id='all', intake_status=None): """ Get a summary of the transactions. Contains the first and last date of any transaction in the dataset. @@ -122,13 +136,13 @@ def get_transaction_summary(self, dataset_id, bc_id='all', intake_status=None): """ if intake_status is None: intake_status = self.default_dss_intake_status - dss_base = self._get_dss_base(dataset_id) - return DatasetsEndpoint(self._bearer, dataset_id, dss_base, self._rest_options) \ + dss_base = await self._get_dss_base(dataset_id) + return await DatasetsEndpoint(self._client, dataset_id, dss_base) \ .business_cell(bc_id) \ .transactions() \ .summary(intake_status) - def get_transactions( + async def get_transactions( self, dataset_id, aggregate, @@ -138,6 +152,7 @@ def get_transactions( bc_id='all', intake_status=None, filter_transaction_ids=None, + page_cb=None, ): """ Display a listing of transactions as a dataframe. The transactions can be grouped or not, using the aggregate @@ -159,12 +174,14 @@ def get_transactions( (defaults to 'all') :param str intake_status: (Optional) If specified, transactions are fetched from the last intake of this status. :param list filter_transaction_ids: (Optional) If specified, only transactions with these IDs are considered. + :param page_cb: (Optional) Callback function with as input a single page of transactions, + which gets called for each individual page. :return: Dataframe of transactions. :rtype: DataFrame """ - dss_base = self._get_dss_base(dataset_id) + dss_base = await self._get_dss_base(dataset_id) # Find scopes for the provided columns. - columns_with_scopes = self._add_scopes(dataset_id, columns, bc_id) + columns_with_scopes = await self._add_scopes(dataset_id, columns, bc_id) # Map each scope to the provided column key. scope_keys = self._find_scope_keys(columns_with_scopes) # Find the scope IDs that should be selected. @@ -211,16 +228,38 @@ def get_transactions( elif end_date_time is not None: raise ValueError('end_date_time should be an instance of datetime.') + async def local_page_cb(page, page_nr, last_page): + """ This callback function will be executed for each received page of transactions. """ + if page_cb is not None: + logging.debug(f'Scheduling transaction page handler for page {page_nr}/{last_page}.') + await page_cb(self._transactions_to_df(page, scope_keys), page_nr, last_page) + # Fetch transactions from the dataset service. - transactions = DatasetsEndpoint(self._bearer, dataset_id, dss_base, self._rest_options) \ + transactions = await DatasetsEndpoint(self._client, dataset_id, dss_base) \ .business_cell(bc_id) \ .transactions() \ - .index(request_data) + .index(request_data, local_page_cb) + logging.debug(f"Received all transactions at {time.strftime('%X')}, creating data frame...") # Map transactions to dicts based on the provided column keys and convert to pandas dataframe. + df = self._transactions_to_df(transactions, scope_keys) + logging.debug(f"Data frame created at {time.strftime('%X')}.") + + return df + + def _transactions_to_df(self, transactions, scope_keys): + """ + Transform the given list of transactions to a DataFrame, using the given `scope_keys` mapping from scope IDs to + column names. + + :param list[Transaction] transactions: List of transactions as re + :param dict scope_keys: Dictionary of scope IDs to column keys. + :return: DataFrame containing the given transactions. + :rtype: DataFrame + """ return DataFrame.from_records([t.to_dict(scope_keys) for t in transactions]) - def _add_scopes(self, dataset_id, columns, bc_id='all'): + async def _add_scopes(self, dataset_id, columns, bc_id='all'): """ Find the scope for each provided column and return new list of columns with scope information stored inside. @@ -232,7 +271,7 @@ def _add_scopes(self, dataset_id, columns, bc_id='all'): :return: New list of columns, with for each column an added `scope` property. :rtype list[dict] """ - all_scopes = self.get_scopes(dataset_id, bc_id) + all_scopes = await self.get_scopes(dataset_id, bc_id) def add_scope(column: dict): if ('scope_id' in column) + ('representation' in column) + ('name_dataset' in column) != 1: diff --git a/src/pricecypher/endpoints/datasets.py b/src/pricecypher/endpoints/datasets.py index 7e8a100..521e71f 100644 --- a/src/pricecypher/endpoints/datasets.py +++ b/src/pricecypher/endpoints/datasets.py @@ -1,24 +1,23 @@ +import logging +import asyncio + from pricecypher.endpoints.base_endpoint import BaseEndpoint from pricecypher.models import Scope, ScopeValue, TransactionSummary, TransactionsPage -from pricecypher.rest import RestClient class DatasetsEndpoint(BaseEndpoint): """PriceCypher dataset endpoints in dataset service. - :param str bearer_token: Bearer token for PriceCypher (logical) API. Needs 'read:datasets' scope. + :param RestClient client: HTTP client for making API requests. :param int dataset_id: Dataset ID. :param str dss_base: (optional) Base URL for PriceCypher dataset service API. (defaults to https://datasets.pricecypher.com) - :param RestClientOptions rest_options: (optional) Set any additional options for the REST client, e.g. rate-limit. - (defaults to None) """ - def __init__(self, bearer_token, dataset_id, dss_base='https://datasets.pricecypher.com', rest_options=None): - self.bearer_token = bearer_token + def __init__(self, client, dataset_id, dss_base='https://datasets.pricecypher.com'): self.dataset_id = dataset_id self.base_url = dss_base - self.client = RestClient(jwt=bearer_token, options=rest_options) + self.client = client def business_cell(self, bc_id='all'): """ @@ -63,20 +62,20 @@ def __init__(self, client, base): self.client = client self.base_url = base - def index(self): + async def index(self): """ Show a list of all scopes of the dataset. :rtype: list[Scope] """ - return self.client.get(self._url(), schema=Scope.Schema(many=True)) + return await self.client.get(self._url(), schema=Scope.Schema(many=True)) - def scope_values(self, scope_id): + async def scope_values(self, scope_id): """ Get all scope values for the given scope of the dataset. :param scope_id: Scope to get scope values for. :rtype: list[ScopeValue] """ - return self.client.get(self._url([scope_id, 'scope_values']), schema=ScopeValue.Schema(many=True)) + return await self.client.get(self._url([scope_id, 'scope_values']), schema=ScopeValue.Schema(many=True)) class TransactionsEndpoint(BaseEndpoint): @@ -87,30 +86,50 @@ def __init__(self, client, base): self.client = client self.base_url = base - def index(self, data): + async def index(self, data, page_cb): """ Display a listing of transactions. The given data will be passed directly to the dataset service. + :param page_cb: Callback function with input a single page of transactions, called for each individual page. :param data: See documentation of dataset service on what data can be passed. + :return: A list of all returned transactions, potentially across multiple pages. :rtype: list[Transaction] """ + # Keep track of all page callback tasks, so we can ensure all tasks are finished before returning. + callbacks = [] + # Perform initial request to retrieve first page of transactions and page metadata. - init_response = self.client.post(self._url(), data=data, schema=TransactionsPage.Schema()) + logging.debug('Requesting first page of transactions...') + init_response = await self.client.post(self._url(), data=data, schema=TransactionsPage.Schema()) + logging.debug('Received first page of transactions.') + # Collect first page of transactions, which will be appended later when multiple pages are available. transactions = init_response.transactions curr_page = init_response.meta.current_page last_page = init_response.meta.last_page request_path = init_response.meta.path + # Schedule async page callback to be handled by the caller. + callbacks.append(asyncio.create_task(page_cb(transactions, curr_page, last_page))) + # Loop over all available pages. for page_nr in range(curr_page + 1, last_page + 1): + logging.debug(f'Requesting transaction page {page_nr}/{last_page}...') page_path = f'{request_path}?page={page_nr}' - page_response = self.client.post(page_path, data=data, schema=TransactionsPage.Schema()) + page_response = await self.client.post(page_path, data=data, schema=TransactionsPage.Schema()) + logging.debug(f'Received transaction page {page_nr}/{last_page}.') + + # Schedule async page callback function with this page of transactions. + callbacks.append(asyncio.create_task(page_cb(page_response.transactions, page_nr, last_page))) + # Append transactions of the current page. transactions += page_response.transactions + # Wait for all callbacks to be done executing. + await asyncio.gather(*callbacks) + return transactions - def summary(self, intake_status=None): + async def summary(self, intake_status=None): """ Get a summary of the transactions. Contains the first and last date of any transaction in the dataset. :param intake_status: (Optional) intake status to fetch the summary for. @@ -119,4 +138,4 @@ def summary(self, intake_status=None): params = {} if intake_status is not None: params['intake_status'] = intake_status - return self.client.get(self._url('summary'), params=params, schema=TransactionSummary.Schema()) + return await self.client.get(self._url('summary'), params=params, schema=TransactionSummary.Schema()) diff --git a/src/pricecypher/endpoints/users.py b/src/pricecypher/endpoints/users.py index 17e6a5d..55f4939 100644 --- a/src/pricecypher/endpoints/users.py +++ b/src/pricecypher/endpoints/users.py @@ -6,17 +6,14 @@ class UsersEndpoint(BaseEndpoint): """PriceCypher endpoints in user tool. - :param str bearer_token: Bearer token for PriceCypher (logical) API. Needs 'read:datasets' scope. + :param RestClient client: HTTP client for making API requests. :param str users_base: (optional) Base URL for PriceCypher user tool API. (defaults to https://users.pricecypher.com) - :param RestClientOptions rest_options: (optional) Set any additional options for the REST client, e.g. rate-limit. - (defaults to None) """ - def __init__(self, bearer_token, users_base='https://users.pricecypher.com', rest_options=None): + def __init__(self, client, users_base='https://users.pricecypher.com'): self.base_url = users_base - self.bearer_token = bearer_token - self.client = RestClient(jwt=bearer_token, options=rest_options) + self.client = client def datasets(self): """ @@ -34,10 +31,10 @@ def __init__(self, client, base): self.client = client self.base_url = base - def index(self) -> list[Dataset]: + async def index(self) -> list[Dataset]: """List all available datasets the user has access to. :return: list of datasets. :rtype list[Dataset] """ - return self.client.get(self._url(), schema=Dataset.Schema(many=True)) + return await self.client.get(self._url(), schema=Dataset.Schema(many=True)) diff --git a/src/pricecypher/rest.py b/src/pricecypher/rest.py index a87bd20..197c256 100644 --- a/src/pricecypher/rest.py +++ b/src/pricecypher/rest.py @@ -1,6 +1,9 @@ +import asyncio import json -import requests -from time import sleep +import logging + +import aiohttp +from aiohttp import ClientSession, ClientTimeout, TCPConnector from random import randint from datetime import datetime from marshmallow import Schema @@ -24,9 +27,10 @@ class RestClientOptions(object): (defaults to 3) """ - def __init__(self, timeout=None, retries=None): + def __init__(self, timeout=None, retries=None, tcp_limit=None): self.timeout = 300.0 self.retries = 3 + self.tcp_limit = 50 if timeout is not None: self.timeout = timeout @@ -34,6 +38,9 @@ def __init__(self, timeout=None, retries=None): if retries is not None: self.retries = retries + if tcp_limit is not None: + self.tcp_limit = tcp_limit + class RestClient(object): """ @@ -48,6 +55,10 @@ def __init__(self, jwt, options=None): if options is None: options = RestClientOptions() + timeout = ClientTimeout(total=options.timeout) + connector = TCPConnector(limit=options.tcp_limit) + + self.session = ClientSession(timeout=timeout, connector=connector) self.options = options self.jwt = jwt @@ -60,6 +71,9 @@ def __init__(self, jwt, options=None): 'Accept': 'application/json', } + async def close(self): + await self.session.close() + # Returns the maximum amount of jitter to introduce in milliseconds (100ms) def MAX_REQUEST_RETRY_JITTER(self): return 100 @@ -72,7 +86,8 @@ def MAX_REQUEST_RETRY_DELAY(self): def MIN_REQUEST_RETRY_DELAY(self): return 100 - def _retry(self, make_request): + async def _retry(self, make_request): + response = None # Track the API request attempt number attempt = 0 @@ -83,95 +98,110 @@ def _retry(self, make_request): retries = max(0, self.options.retries) while True: + wait = None # Increment attempt number attempt += 1 # Issue the request - response = make_request() + try: + response = await make_request() + + if response.status != 429: + break + + logging.debug('Got 429 Too Many Attempts response.') + + # Try to find Retry After header in response, which specifies the number of seconds we should wait. + wait = response.headers.get('Retry-After') + except aiohttp.ClientConnectionError: + logging.warning("Encountered a connection error.") # break iff no retry needed - if response.status_code != 429 or retries <= 0 or attempt > retries: + if retries <= 0 or attempt > retries: break - # Retry the request. Apply an exponential backoff for subsequent attempts, using this formula: - # max( - # MIN_REQUEST_RETRY_DELAY, - # min(MAX_REQUEST_RETRY_DELAY, (100ms * (2 ** attempt - 1)) + random_between(1, MAX_REQUEST_RETRY_JITTER)) - # ) + if wait is not None: + # Wait for at least 1 second and convert seconds to milliseconds. + wait = 1000 * (int(wait) + 1) + else: + # No Retry After header. Apply an exponential backoff for subsequent attempts, using this formula: + # max( + # MIN_REQUEST_RETRY_DELAY, + # min( + # MAX_REQUEST_RETRY_DELAY, + # (100ms * (2 ** attempt - 1)) + random_between(1, MAX_REQUEST_RETRY_JITTER) + # ) + # ) - # Increases base delay by (100ms * (2 ** attempt - 1)) - wait = 100 * 2 ** (attempt - 1) + # Increases base delay by (100ms * (2 ** attempt - 1)) + wait = 100 * 2 ** (attempt - 1) - # Introduces jitter to the base delay; increases delay between 1ms to MAX_REQUEST_RETRY_JITTER (100ms) - wait += randint(1, self.MAX_REQUEST_RETRY_JITTER()) + # Introduces jitter to the base delay; increases delay between 1ms to MAX_REQUEST_RETRY_JITTER (100ms) + wait += randint(1, self.MAX_REQUEST_RETRY_JITTER()) - # Is never more than MAX_REQUEST_RETRY_DELAY (1s) - wait = min(self.MAX_REQUEST_RETRY_DELAY(), wait) + # Is never more than MAX_REQUEST_RETRY_DELAY (1s) + wait = min(self.MAX_REQUEST_RETRY_DELAY(), wait) - # Is never less than MIN_REQUEST_RETRY_DELAY (100ms) - wait = max(self.MIN_REQUEST_RETRY_DELAY(), wait) + # Is never less than MIN_REQUEST_RETRY_DELAY (100ms) + wait = max(self.MIN_REQUEST_RETRY_DELAY(), wait) + + logging.debug(f"Retrying in {wait} ms...") self._metrics['retries'] = attempt self._metrics['backoff'].append(wait) # Skip calling sleep() when running unit tests if self._skip_sleep is False: - # sleep() functions in seconds, so convert the milliseconds formula above accordingly - sleep(wait / 1000) + # sleep expects value in seconds, so convert the milliseconds formula above accordingly + await asyncio.sleep(wait / 1000) - # Return the final Response + # Return the final response, if available. return response - def get(self, url, params=None, schema: Schema = None): + async def get(self, url, params=None, schema: Schema = None): headers = self.base_headers.copy() - response = self._retry(lambda: requests.get(url, params=params, headers=headers, timeout=self.options.timeout)) + response = await self._retry(lambda: self.session.get(url, params=params, headers=headers)) - return self._process_response(response, schema) + return await self._process_response(response, schema) - def post(self, url, data=None, schema: Schema = None): + async def post(self, url, data=None, schema: Schema = None): headers = self.base_headers.copy() - j_data = json.dumps(data, cls=JsonEncoder) - response = self._retry(lambda: requests.post(url, data=j_data, headers=headers, timeout=self.options.timeout)) + response = await self._retry(lambda: self.session.post(url, json=data, headers=headers)) - return self._process_response(response, schema) + return await self._process_response(response, schema) - def file_post(self, url, data=None, files=None): + async def patch(self, url, data=None): headers = self.base_headers.copy() - headers.pop('Content-Type', None) - response = self._retry( - lambda: requests.post(url, data=data, files=files, headers=headers, timeout=self.options.timeout)) - return self._process_response(response) + response = await self._retry(lambda: self.session.patch(url, json=data, headers=headers)) + return await self._process_response(response) - def patch(self, url, data=None): + async def put(self, url, data=None): headers = self.base_headers.copy() - response = self._retry(lambda: requests.patch(url, json=data, headers=headers, timeout=self.options.timeout)) - return self._process_response(response) + response = await self._retry(lambda: self.session.put(url, json=data, headers=headers)) + return await self._process_response(response) - def put(self, url, data=None): + async def delete(self, url, params=None, data=None): headers = self.base_headers.copy() - response = self._retry(lambda: requests.put(url, json=data, headers=headers, timeout=self.options.timeout)) - return self._process_response(response) + response = await self._retry(lambda: self.session.delete(url, headers=headers, params=params or {}, json=data)) + return await self._process_response(response) - def delete(self, url, params=None, data=None): - headers = self.base_headers.copy() + async def _process_response(self, response, schema=None): + parsed = await self._parse(response, schema) + return parsed.content() - response = self._retry( - lambda: requests.delete(url, headers=headers, params=params or {}, json=data, timeout=self.options.timeout)) - return self._process_response(response) + async def _parse(self, response, schema=None): + try: + content = await response.json() - def _process_response(self, response, schema=None): - return self._parse(response, schema).content() + if content is None: + return EmptyResponse(response.status) - def _parse(self, response, schema=None): - if not response.text: - return EmptyResponse(response.status_code) - try: - return JsonResponse(response, schema) + return JsonResponse(content, response, schema) except ValueError: - return PlainResponse(response) + return PlainResponse(response, await response.text()) class Response(object): @@ -213,12 +243,11 @@ def default(self, obj): class JsonResponse(Response): - def __init__(self, response, schema: Schema = None): - if schema is not None and response.status_code < 400: - content = schema.loads(json_data=response.text) - else: - content = json.loads(response.text) - super(JsonResponse, self).__init__(response.status_code, content, response.headers) + def __init__(self, content, response, schema: Schema = None): + if schema is not None and response.status < 400: + content = schema.load(content) + + super(JsonResponse, self).__init__(response.status, content, response.headers) def _error_code(self): if 'errorCode' in self._content: @@ -236,8 +265,8 @@ def _error_message(self): class PlainResponse(Response): - def __init__(self, response): - super(PlainResponse, self).__init__(response.status_code, response.text, response.headers) + def __init__(self, response, text): + super(PlainResponse, self).__init__(response.status, text, response.headers) def _error_code(self): return UNKNOWN_ERROR