Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,48 @@ Simply execute `pip install pricecypher-sdk`

### Dataset SDK
```python
import asyncio
from pricecypher import Datasets

datasets = Datasets(BEARER_TOKEN)

datasets.index()
datasets.get_meta(DATASET_ID)
datasets.get_scopes(DATASET_ID)
datasets.get_scope_values(DATASET_ID, SCOPE_ID)
datasets.get_transaction_summary(DATASET_ID)
async def handle_page(page, page_nr, last_page):
""" This function will be called for each individual page of transactions that is received."""
print(f'Handling page {page_nr}/{last_page}')
print(f'Nr of transactions in page: {len(page)}')
print(f'Handling page {page_nr}/{last_page} done')


async def main():
async with Datasets(BEARER_TOKEN) as ds:
# Specify desired columns to be requested for transactions
columns = [
{'name_dataset': 'cust_group', 'filter': ['Big', 'Small'], 'key': 'group'},
{'representation': 'cost_price', 'aggregate': 'sum', 'key': 'cost_price'}
]

index = asyncio.create_task(ds.index())
meta = asyncio.create_task(ds.get_meta(DATASET_ID))
scopes = asyncio.create_task(ds.get_scopes(DATASET_ID))
values = asyncio.create_task(ds.get_scope_values(DATASET_ID, SCOPE_ID))
summary = asyncio.create_task(ds.get_transaction_summary(DATASET_ID))
transactions = asyncio.create_task(ds.get_transactions(DATASET_ID, AGGREGATE, columns))

print('datasets', await index)
print('transactions', await transactions)
print('meta', await meta)
print('summary', await summary)
print('scopes', await scopes)
print('scope values', await values)

asyncio.run(main())
```

### Printing debug logs
By default, debug log messages are not printed. Debug logs can be enabled using the following.
```python
import logging

columns = [
{'name_dataset': 'cust_group', 'filter': ['Big', 'Small'], 'key': 'group'},
{'representation': 'cost_price', 'aggregate': 'sum', 'key': 'cost_price'}
]
datasets.get_transactions(DATASET_ID, AGGREGATE, columns)
logging.basicConfig(level=logging.DEBUG)
```

### Contracts
Expand Down Expand Up @@ -51,8 +78,14 @@ Similarly, each file in the `models` module defines the models that are provided
The SDK that this package provides is contained in the top-level package contents.

## Deployment
1. Execute `python3 -m build` to build the source archive and a built distribution.
2. Execute `python3 -m twine upload dist/*` to upload the package to PyPi.
1. Execute `python -m build` to build the source archive and a built distribution.
2. Execute `python -m twine upload dist/*` to upload the package to PyPi.

### Snapshot
To deploy a snapshot release, follow the next steps instead.
1. Add `-pre` to the version in `setup.cfg`.
2. Execute `python -m build -C--global-option=egg_info -C--global-option=--tag-build=dev`.
3. Execute `python -m twine upload dist/*`.

## Authors

Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = pricecypher-sdk
version = 0.5.0
version = 0.6.0
author = Deloitte Consulting B.V.
description = Python wrapper around the different PriceCypher APIs
long_description = file: README.md
Expand All @@ -25,6 +25,7 @@ install_requires =
pandas>=1.4.1
numpy>=1.18.5
typeguard>=2.13.3
aiohttp>=3.8.3

[options.packages.find]
where = src
Expand Down
81 changes: 60 additions & 21 deletions src/pricecypher/datasets.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import asyncio
import logging
import time

from datetime import datetime
from pandas import DataFrame

from pricecypher.collections import ScopeCollection, ScopeValueCollection
from pricecypher.endpoints import DatasetsEndpoint, UsersEndpoint
from pricecypher.rest import RestClient


class Datasets(object):
Expand Down Expand Up @@ -35,8 +40,15 @@ def __init__(self, bearer_token, users_base=None, dss_base=None, rest_options=No
self._dss_base = dss_base
self._rest_options = rest_options
self._all_meta = None
self._client = RestClient(jwt=bearer_token, options=rest_options)

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._client.close()

def _get_dss_base(self, dataset_id):
async def _get_dss_base(self, dataset_id):
"""
Get dataset service url base for the given dataset ID.
Will be fetched from dataset META if no dss_base present.
Expand All @@ -49,9 +61,10 @@ def _get_dss_base(self, dataset_id):
if self._dss_base is not None:
return self._dss_base

return self.get_meta(dataset_id).dss_url
meta = await self.get_meta(dataset_id)
return meta.dss_url

def index(self):
async def index(self):
"""
List all available datasets the user has access to.
Response is cached in this instance for as long as this instance lives.
Expand All @@ -60,20 +73,20 @@ def index(self):
:rtype list[Dataset]
"""
if self._all_meta is None:
self._all_meta = UsersEndpoint(self._bearer, self._users_base, self._rest_options).datasets().index()
self._all_meta = await UsersEndpoint(self._client, self._users_base).datasets().index()

return self._all_meta

def get_meta(self, dataset_id):
async def get_meta(self, dataset_id):
"""
Get metadata like the dataset service url and time of creation of a dataset

:param dataset_id: Dataset to get metadata for.
:rtype: Dataset
"""
return next((d for d in self.index() if d.id == dataset_id), None)
return next((d for d in await self.index() if d.id == dataset_id), None)

def get_scopes(self, dataset_id, bc_id='all'):
async def get_scopes(self, dataset_id, bc_id='all'):
"""
Get all scopes for the given dataset.

Expand All @@ -83,14 +96,15 @@ def get_scopes(self, dataset_id, bc_id='all'):
:return: Collection of scopes for the given dataset.
:rtype: ScopeCollection
"""
dss_base = await self._get_dss_base(dataset_id)
return ScopeCollection(
DatasetsEndpoint(self._bearer, dataset_id, self._get_dss_base(dataset_id), self._rest_options)
await DatasetsEndpoint(self._client, dataset_id, dss_base)
.business_cell(bc_id)
.scopes()
.index()
)

def get_scope_values(self, dataset_id, scope_id, bc_id='all'):
async def get_scope_values(self, dataset_id, scope_id, bc_id='all'):
"""
Get all scopes values for the given scope within the given dataset.

Expand All @@ -101,15 +115,15 @@ def get_scope_values(self, dataset_id, scope_id, bc_id='all'):
:return: Collection of scope values for the given scope within the given dataset.
:rtype: ScopeValueCollection
"""
dss_base = self._get_dss_base(dataset_id)
dss_base = await self._get_dss_base(dataset_id)
return ScopeValueCollection(
DatasetsEndpoint(self._bearer, dataset_id, dss_base, self._rest_options)
await DatasetsEndpoint(self._client, dataset_id, dss_base)
.business_cell(bc_id)
.scopes()
.scope_values(scope_id)
)

def get_transaction_summary(self, dataset_id, bc_id='all', intake_status=None):
async def get_transaction_summary(self, dataset_id, bc_id='all', intake_status=None):
"""
Get a summary of the transactions. Contains the first and last date of any transaction in the dataset.

Expand All @@ -122,13 +136,13 @@ def get_transaction_summary(self, dataset_id, bc_id='all', intake_status=None):
"""
if intake_status is None:
intake_status = self.default_dss_intake_status
dss_base = self._get_dss_base(dataset_id)
return DatasetsEndpoint(self._bearer, dataset_id, dss_base, self._rest_options) \
dss_base = await self._get_dss_base(dataset_id)
return await DatasetsEndpoint(self._client, dataset_id, dss_base) \
.business_cell(bc_id) \
.transactions() \
.summary(intake_status)

def get_transactions(
async def get_transactions(
self,
dataset_id,
aggregate,
Expand All @@ -138,6 +152,7 @@ def get_transactions(
bc_id='all',
intake_status=None,
filter_transaction_ids=None,
page_cb=None,
):
"""
Display a listing of transactions as a dataframe. The transactions can be grouped or not, using the aggregate
Expand All @@ -159,12 +174,14 @@ def get_transactions(
(defaults to 'all')
:param str intake_status: (Optional) If specified, transactions are fetched from the last intake of this status.
:param list filter_transaction_ids: (Optional) If specified, only transactions with these IDs are considered.
:param page_cb: (Optional) Callback function with as input a single page of transactions,
which gets called for each individual page.
:return: Dataframe of transactions.
:rtype: DataFrame
"""
dss_base = self._get_dss_base(dataset_id)
dss_base = await self._get_dss_base(dataset_id)
# Find scopes for the provided columns.
columns_with_scopes = self._add_scopes(dataset_id, columns, bc_id)
columns_with_scopes = await self._add_scopes(dataset_id, columns, bc_id)
# Map each scope to the provided column key.
scope_keys = self._find_scope_keys(columns_with_scopes)
# Find the scope IDs that should be selected.
Expand Down Expand Up @@ -211,16 +228,38 @@ def get_transactions(
elif end_date_time is not None:
raise ValueError('end_date_time should be an instance of datetime.')

async def local_page_cb(page, page_nr, last_page):
""" This callback function will be executed for each received page of transactions. """
if page_cb is not None:
logging.debug(f'Scheduling transaction page handler for page {page_nr}/{last_page}.')
await page_cb(self._transactions_to_df(page, scope_keys), page_nr, last_page)

# Fetch transactions from the dataset service.
transactions = DatasetsEndpoint(self._bearer, dataset_id, dss_base, self._rest_options) \
transactions = await DatasetsEndpoint(self._client, dataset_id, dss_base) \
.business_cell(bc_id) \
.transactions() \
.index(request_data)
.index(request_data, local_page_cb)

logging.debug(f"Received all transactions at {time.strftime('%X')}, creating data frame...")
# Map transactions to dicts based on the provided column keys and convert to pandas dataframe.
df = self._transactions_to_df(transactions, scope_keys)
logging.debug(f"Data frame created at {time.strftime('%X')}.")

return df

def _transactions_to_df(self, transactions, scope_keys):
"""
Transform the given list of transactions to a DataFrame, using the given `scope_keys` mapping from scope IDs to
column names.

:param list[Transaction] transactions: List of transactions as re
:param dict scope_keys: Dictionary of scope IDs to column keys.
:return: DataFrame containing the given transactions.
:rtype: DataFrame
"""
return DataFrame.from_records([t.to_dict(scope_keys) for t in transactions])

def _add_scopes(self, dataset_id, columns, bc_id='all'):
async def _add_scopes(self, dataset_id, columns, bc_id='all'):
"""
Find the scope for each provided column and return new list of columns with scope information stored inside.

Expand All @@ -232,7 +271,7 @@ def _add_scopes(self, dataset_id, columns, bc_id='all'):
:return: New list of columns, with for each column an added `scope` property.
:rtype list[dict]
"""
all_scopes = self.get_scopes(dataset_id, bc_id)
all_scopes = await self.get_scopes(dataset_id, bc_id)

def add_scope(column: dict):
if ('scope_id' in column) + ('representation' in column) + ('name_dataset' in column) != 1:
Expand Down
Loading