Skip to content

Commit

Permalink
Merge branch 'master' into #118
Browse files Browse the repository at this point in the history
  • Loading branch information
laughingman7743 authored Aug 2, 2020
2 parents 54862bd + e1f01bf commit e00d3fd
Show file tree
Hide file tree
Showing 34 changed files with 3,846 additions and 2,188 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ matrix:
env: TOXENV=py36
- python: 3.7
env: TOXENV=py37
- python: 3.8
env: TOXENV=py38
install:
- pip install tox codecov awscli
before_script:
Expand Down
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.PHONY: fmt
fmt:
pipenv run isort -rc .
pipenv run black .

.PHONY: chk
chk:
pipenv run isort -c -rc .
pipenv run black --check --diff .
5 changes: 5 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"

[pipenv]
allow_prereleases = true

[packages]
e1839a8 = {path = ".",extras = ["sqlalchemy", "pandas"],editable = true}

Expand All @@ -14,4 +17,6 @@ wheel = "*"
pytest = ">=3.5"
pytest-cov = "*"
pytest-flake8 = ">=1.0.1"
pytest-black = "*"
pytest-isort = "*"
pytest-xdist = "*"
739 changes: 482 additions & 257 deletions Pipfile.lock

Large diffs are not rendered by default.

83 changes: 78 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
.. image:: https://img.shields.io/pypi/l/PyAthena.svg
:target: https://github.com/laughingman7743/PyAthena/blob/master/LICENSE

.. image:: https://img.shields.io/pypi/dm/PyAthena.svg
:target: https://pypistats.org/packages/pyathena
.. image:: https://pepy.tech/badge/pyathena/month
:target: https://pepy.tech/project/pyathena/month

.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black

PyAthena
========
Expand All @@ -27,7 +29,7 @@ Requirements

* Python

- CPython 2,7, 3.5, 3.6, 3.7
- CPython 2,7, 3.5, 3.6, 3.7 3.8

Installation
------------
Expand Down Expand Up @@ -237,7 +239,56 @@ The ``pyathena.util`` package also has helper methods.
to_sql(df, 'YOUR_TABLE', conn, 's3://YOUR_S3_BUCKET/path/to/',
schema='YOUR_SCHEMA', index=False, if_exists='replace')
This helper method supports partitioning.

.. code:: python
import pandas as pd
from datetime import date
from pyathena import connect
from pyathena.util import to_sql
conn = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2')
df = pd.DataFrame({
'a': [1, 2, 3, 4, 5],
'dt': [
date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1),
date(2020, 1, 2),
date(2020, 1, 3)
],
})
to_sql(df, 'YOUR_TABLE', conn, 's3://YOUR_S3_BUCKET/path/to/',
schema='YOUR_SCHEMA', partitions=['dt'])
cursor = conn.cursor()
cursor.execute('SHOW PARTITIONS YOUR_TABLE')
print(cursor.fetchall())
Conversion to Parquet and upload to S3 use `ThreadPoolExecutor`_ by default.
It is also possible to use `ProcessPoolExecutor`_.

.. code:: python
import pandas as pd
from concurrent.futures.process import ProcessPoolExecutor
from pyathena import connect
from pyathena.util import to_sql
conn = connect(aws_access_key_id='YOUR_ACCESS_KEY_ID',
aws_secret_access_key='YOUR_SECRET_ACCESS_KEY',
s3_staging_dir='s3://YOUR_S3_BUCKET/path/to/',
region_name='us-west-2')
df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
to_sql(df, 'YOUR_TABLE', conn, 's3://YOUR_S3_BUCKET/path/to/',
schema='YOUR_SCHEMA', index=False, if_exists='replace',
chunksize=1, executor_class=ProcessPoolExecutor, max_workers=5)
.. _`pandas.DataFrame.to_sql`: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_sql.html
.. _`ThreadPoolExecutor`: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
.. _`ProcessPoolExecutor`: https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor

AsynchronousCursor
~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -755,7 +806,7 @@ Depends on the following environment variables:
$ export AWS_DEFAULT_REGION=us-west-2
$ export AWS_ATHENA_S3_STAGING_DIR=s3://YOUR_S3_BUCKET/path/to/
And you need to create a workgroup named ``test-pyathena``.
And you need to create a workgroup named ``test-pyathena`` with the ``Query result location`` configuration.

Run test
~~~~~~~~
Expand All @@ -776,6 +827,28 @@ Run test multiple Python versions
$ pip install pipenv
$ pipenv install --dev
$ pipenv run scripts/test_data/upload_test_data.sh
$ pyenv local 3.7.2 3.6.8 3.5.7 2.7.16
$ pyenv local 3.8.2 3.7.2 3.6.8 3.5.7 2.7.16
$ pipenv run tox
$ pipenv run scripts/test_data/delete_test_data.sh
Code formatting
---------------

The code formatting uses `black`_ and `isort`_.

Appy format
~~~~~~~~~~~

.. code:: bash
$ make fmt
Check format
~~~~~~~~~~~~

.. code:: bash
$ make chk
.. _`black`: https://github.com/psf/black
.. _`isort`: https://github.com/timothycrosley/isort
63 changes: 34 additions & 29 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import sys
import time

from pyathena import connect
from pyathenajdbc import connect as jdbc_connect

from pyathena import connect
from pyathena.pandas_cursor import PandasCursor

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler(sys.stdout))
LOGGER.setLevel(logging.INFO)

S3_STAGING_DIR = 's3://YOUR_BUCKET/path/to/'
REGION_NAME = 'us-west-2'
S3_STAGING_DIR = "s3://YOUR_BUCKET/path/to/"
REGION_NAME = "us-west-2"
COUNT = 5

SMALL_RESULT_SET_QUERY = """
Expand All @@ -31,70 +32,74 @@


def run_pyathen_pandas_cursor(query):
LOGGER.info('PyAthena PandasCursor =========================')
cursor = connect(s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME,
cursor_class=PandasCursor).cursor()
LOGGER.info("PyAthena PandasCursor =========================")
cursor = connect(
s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME,
cursor_class=PandasCursor,
).cursor()
avgs = []
for i in range(0, COUNT):
start = time.time()
df = cursor.execute(query).as_pandas()
end = time.time()
elapsed = end - start
LOGGER.info('loop:{0}\tcount:{1}\telasped:{2}'.format(i, df.shape[0], elapsed))
LOGGER.info("loop:{0}\tcount:{1}\telasped:{2}".format(i, df.shape[0], elapsed))
avgs.append(elapsed)
avg = sum(avgs) / COUNT
LOGGER.info('Avg: {0}'.format(avg))
LOGGER.info('===============================================')
LOGGER.info("Avg: {0}".format(avg))
LOGGER.info("===============================================")


def run_pyathena_cursor(query):
LOGGER.info('PyAthena Cursor ===============================')
cursor = connect(s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME).cursor()
LOGGER.info("PyAthena Cursor ===============================")
cursor = connect(s3_staging_dir=S3_STAGING_DIR, region_name=REGION_NAME).cursor()
avgs = []
for i in range(0, COUNT):
start = time.time()
result = cursor.execute(query).fetchall()
end = time.time()
elapsed = end - start
LOGGER.info('loop:{0}\tcount:{1}\telasped:{2}'.format(i, len(result), elapsed))
LOGGER.info("loop:{0}\tcount:{1}\telasped:{2}".format(i, len(result), elapsed))
avgs.append(elapsed)
avg = sum(avgs) / COUNT
LOGGER.info('Avg: {0}'.format(avg))
LOGGER.info('===============================================')
LOGGER.info("Avg: {0}".format(avg))
LOGGER.info("===============================================")


def run_pyathenajdbc_cursor(query):
LOGGER.info('PyAthenaJDBC Cursor ===========================')
cursor = jdbc_connect(s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME).cursor()
LOGGER.info("PyAthenaJDBC Cursor ===========================")
cursor = jdbc_connect(
s3_staging_dir=S3_STAGING_DIR, region_name=REGION_NAME
).cursor()
avgs = []
for i in range(0, COUNT):
start = time.time()
cursor.execute(query)
result = cursor.fetchall()
end = time.time()
elapsed = end - start
LOGGER.info('loop:{0}\tcount:{1}\telasped:{2}'.format(i, len(result), elapsed))
LOGGER.info("loop:{0}\tcount:{1}\telasped:{2}".format(i, len(result), elapsed))
avgs.append(elapsed)
avg = sum(avgs) / COUNT
LOGGER.info('Avg: {0}'.format(avg))
LOGGER.info('===============================================')
LOGGER.info("Avg: {0}".format(avg))
LOGGER.info("===============================================")


def main():
for query in [SMALL_RESULT_SET_QUERY,
MEDIUM_RESULT_SET_QUERY,
LARGE_RESULT_SET_QUERY]:
for query in [
SMALL_RESULT_SET_QUERY,
MEDIUM_RESULT_SET_QUERY,
LARGE_RESULT_SET_QUERY,
]:
LOGGER.info(query)
run_pyathenajdbc_cursor(query)
LOGGER.info('')
LOGGER.info("")
run_pyathena_cursor(query)
LOGGER.info('')
LOGGER.info("")
run_pyathen_pandas_cursor(query)
LOGGER.info('')
LOGGER.info("")


if __name__ == '__main__':
if __name__ == "__main__":
main()
38 changes: 24 additions & 14 deletions pyathena/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals

import datetime

from pyathena.error import * # noqa

__version__ = '1.10.0'
try:
from multiprocessing import cpu_count
except ImportError:

def cpu_count():
return None


__version__ = "1.11.0"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel = '2.0'
apilevel = "2.0"
threadsafety = 3
paramstyle = 'pyformat'
paramstyle = "pyformat"


class DBAPITypeObject:
"""Type Objects and Constructors
https://www.python.org/dev/peps/pep-0249/#type-objects-and-constructors
"""

def __init__(self, *values):
self.values = values

Expand All @@ -35,15 +43,16 @@ def __eq__(self, other):


# https://docs.aws.amazon.com/athena/latest/ug/data-types.html
STRING = DBAPITypeObject('char', 'varchar', 'map', 'array', 'row')
BINARY = DBAPITypeObject('varbinary')
BOOLEAN = DBAPITypeObject('boolean')
NUMBER = DBAPITypeObject('tinyint', 'smallint', 'bigint', 'integer',
'real', 'double', 'float', 'decimal')
DATE = DBAPITypeObject('date')
TIME = DBAPITypeObject('time', 'time with time zone')
DATETIME = DBAPITypeObject('timestamp', 'timestamp with time zone')
JSON = DBAPITypeObject('json')
STRING = DBAPITypeObject("char", "varchar", "map", "array", "row")
BINARY = DBAPITypeObject("varbinary")
BOOLEAN = DBAPITypeObject("boolean")
NUMBER = DBAPITypeObject(
"tinyint", "smallint", "bigint", "integer", "real", "double", "float", "decimal"
)
DATE = DBAPITypeObject("date")
TIME = DBAPITypeObject("time", "time with time zone")
DATETIME = DBAPITypeObject("timestamp", "timestamp with time zone")
JSON = DBAPITypeObject("json")

Date = datetime.date
Time = datetime.time
Expand All @@ -52,4 +61,5 @@ def __eq__(self, other):

def connect(*args, **kwargs):
from pyathena.connection import Connection

return Connection(*args, **kwargs)
Loading

0 comments on commit e00d3fd

Please sign in to comment.