Skip to content

Commit

Permalink
add test and codecov (#859)
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod authored Mar 11, 2021
1 parent f629456 commit 40427e6
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 1 deletion.
32 changes: 32 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
codecov:
branch: master

coverage:
range: "95..100"

status:
project: no

flags:
library:
paths:
- aiobotocore/
configs:
paths:
- requirements/
- ".git*"
- "*.toml"
- "*.yml"
changelog:
paths:
- CHANGES/
- CHANGES.rst
docs:
paths:
- docs/
- "*.md"
- "*.rst"
- "*.txt"
tests:
paths:
- tests/
49 changes: 49 additions & 0 deletions tests/botocore/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from contextlib import asynccontextmanager, AsyncExitStack

import aiobotocore.session
from botocore.stub import Stubber


class StubbedSession(aiobotocore.session.AioSession):
def __init__(self, *args, **kwargs):
super(StubbedSession, self).__init__(*args, **kwargs)
self._cached_clients = {}
self._client_stubs = {}

@asynccontextmanager
async def create_client(self, service_name, *args, **kwargs):
async with AsyncExitStack() as es:
es: AsyncExitStack
if service_name not in self._cached_clients:
client = await es.enter_async_context(
self._create_stubbed_client(service_name, *args, **kwargs))
self._cached_clients[service_name] = client
yield self._cached_clients[service_name]

@asynccontextmanager
async def _create_stubbed_client(self, service_name, *args, **kwargs):
async with AsyncExitStack() as es:
es: AsyncExitStack
client = await es.enter_async_context(
super(StubbedSession, self).create_client(
service_name, *args, **kwargs))
stubber = Stubber(client)
self._client_stubs[service_name] = stubber
yield client

@asynccontextmanager
async def stub(self, service_name, *args, **kwargs):
async with AsyncExitStack() as es:
es: AsyncExitStack
if service_name not in self._client_stubs:
await es.enter_async_context(
self.create_client(service_name, *args, **kwargs))
yield self._client_stubs[service_name]

def activate_stubs(self):
for stub in self._client_stubs.values():
stub.activate()

def verify_stubs(self):
for stub in self._client_stubs.values():
stub.assert_no_pending_responses()
211 changes: 210 additions & 1 deletion tests/botocore/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
and adapted to work with asyncio and pytest
"""
import asyncio
import binascii
import os
import sys
import tempfile
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
import json
import subprocess
Expand All @@ -21,9 +27,18 @@

from aiobotocore.session import AioSession
from aiobotocore import credentials

from botocore.credentials import Credentials, JSONFileCache, ReadOnlyCredentials
from botocore.configprovider import ConfigValueStore
from botocore.utils import FileWebIdentityTokenLoader, SSOTokenLoader
from aiobotocore.credentials import AioSSOCredentialFetcher, AioSSOProvider
from aiobotocore.credentials import AioSSOCredentialFetcher, AioSSOProvider, \
AioInstanceMetadataProvider, AioEnvProvider, AioContainerProvider, \
AioAssumeRoleProvider, AioProfileProviderBuilder, AioCanonicalNameCredentialSourcer
from tests.botocore.helpers import StubbedSession


def random_chars(num_chars):
return binascii.hexlify(os.urandom(int(num_chars / 2))).decode('ascii')


# From class TestCredentials(BaseEnvVar):
Expand Down Expand Up @@ -335,6 +350,38 @@ async def test_assumerolefetcher_mfa():
assert call_kwargs['TokenCode'] == 'token-code'


@pytest.mark.moto
@pytest.mark.asyncio
async def test_recursive_assume_role(assume_role_setup):
self = assume_role_setup

config = (
'[profile A]\n'
'role_arn = arn:aws:iam::123456789:role/RoleA\n'
'source_profile = B\n\n'
'[profile B]\n'
'role_arn = arn:aws:iam::123456789:role/RoleB\n'
'source_profile = C\n\n'
'[profile C]\n'
'aws_access_key_id = abc123\n'
'aws_secret_access_key = def456\n'
)
self.write_config(config)

profile_b_creds = self.create_random_credentials()
profile_b_response = self.create_assume_role_response(profile_b_creds)
profile_a_creds = self.create_random_credentials()
profile_a_response = self.create_assume_role_response(profile_a_creds)

async with self.create_session(profile='A') as (session, stubber):
stubber.add_response('assume_role', profile_b_response)
stubber.add_response('assume_role', profile_a_response)

actual_creds = await session.get_credentials()
await self.assert_creds_equal(actual_creds, profile_a_creds)
stubber.assert_no_pending_responses()


# From class TestAssumeRoleWithWebIdentityCredentialFetcher(BaseEnvVar):
def assume_role_web_identity_client_creator(with_response):
class _Client(object):
Expand Down Expand Up @@ -1201,6 +1248,168 @@ async def ssl_credential_fetcher_setup():
yield self


@pytest.fixture
def base_env_var_setup():
self = Self()
self.environ = {}
with mock.patch('os.environ', self.environ):
yield self


def _some_future_time():
timeobj = datetime.now(tzlocal())
return timeobj + timedelta(hours=24)


def _create_assume_role_response(credentials, expiration=None):
if expiration is None:
expiration = _some_future_time()

response = {
'Credentials': {
'AccessKeyId': credentials.access_key,
'SecretAccessKey': credentials.secret_key,
'SessionToken': credentials.token,
'Expiration': expiration
},
'AssumedRoleUser': {
'AssumedRoleId': 'myroleid',
'Arn': 'arn:aws:iam::1234567890:user/myuser'
}
}

return response


def _create_random_credentials():
return Credentials(
'fake-%s' % random_chars(15),
'fake-%s' % random_chars(35),
'fake-%s' % random_chars(45)
)


async def _assert_creds_equal(c1, c2):
c1_frozen = c1
if not isinstance(c1_frozen, ReadOnlyCredentials):
c1_frozen = await c1.get_frozen_credentials()
c2_frozen = c2
if not isinstance(c2_frozen, ReadOnlyCredentials):
c2_frozen = c2.get_frozen_credentials()
assert c1_frozen == c2_frozen


def _write_config(self, config):
with open(self.config_file, 'w') as f:
f.write(config)


@pytest.fixture
def base_assume_role_test_setup(base_env_var_setup):
self = base_env_var_setup
with tempfile.TemporaryDirectory() as td_name:
self.tempdir = td_name
self.config_file = os.path.join(self.tempdir, 'config')
self.environ['AWS_CONFIG_FILE'] = self.config_file
self.environ['AWS_SHARED_CREDENTIALS_FILE'] = str(uuid.uuid4())

self.some_future_time = _some_future_time
self.create_assume_role_response = _create_assume_role_response
self.create_random_credentials = _create_random_credentials
self.assert_creds_equal = _assert_creds_equal
self.write_config = partial(_write_config, self)

yield self


def _mock_provider(provider_cls):
mock_instance = mock.Mock(spec=provider_cls)
mock_instance.load.return_value = None
mock_instance.METHOD = provider_cls.METHOD
mock_instance.CANONICAL_NAME = provider_cls.CANONICAL_NAME
return mock_instance


@asynccontextmanager
async def _create_session(self, profile=None):
session = StubbedSession(profile=profile)

# We have to set bogus credentials here or otherwise we'll trigger
# an early credential chain resolution.
async with session.create_client(
'sts',
aws_access_key_id='spam',
aws_secret_access_key='eggs',
) as sts:
self.mock_client_creator.return_value = sts
assume_role_provider = AioAssumeRoleProvider(
load_config=lambda: session.full_config,
client_creator=self.mock_client_creator,
cache={},
profile_name=profile,
credential_sourcer=AioCanonicalNameCredentialSourcer([
self.env_provider, self.container_provider,
self.metadata_provider
]),
profile_provider_builder=AioProfileProviderBuilder(
session,
sso_token_cache=JSONFileCache(self.tempdir),
),
)
async with session.stub('sts') as stubber:
stubber.activate()

component_name = 'credential_provider'
resolver = session.get_component(component_name)
available_methods = [p.METHOD for p in resolver.providers]
replacements = {
'env': self.env_provider,
'iam-role': self.metadata_provider,
'container-role': self.container_provider,
'assume-role': assume_role_provider
}
for name, provider in replacements.items():
try:
index = available_methods.index(name)
except ValueError:
# The provider isn't in the session
continue

resolver.providers[index] = provider

session.register_component(
'credential_provider', resolver
)
yield session, stubber


@pytest.fixture
def assume_role_setup(base_assume_role_test_setup):
self = base_assume_role_test_setup

self.environ['AWS_ACCESS_KEY_ID'] = 'access_key'
self.environ['AWS_SECRET_ACCESS_KEY'] = 'secret_key'

self.mock_provider = _mock_provider
self.create_session = partial(_create_session, self)

self.metadata_provider = self.mock_provider(AioInstanceMetadataProvider)
self.env_provider = self.mock_provider(AioEnvProvider)
self.container_provider = self.mock_provider(AioContainerProvider)
self.mock_client_creator = mock.Mock(spec=AioSession.create_client)
self.actual_client_region = None

current_dir = os.path.dirname(os.path.abspath(__file__))
credential_process = os.path.join(
current_dir, 'utils', 'credentialprocess.py'
)
self.credential_process = '%s %s' % (
sys.executable, credential_process
)

yield self


@pytest.mark.moto
@pytest.mark.asyncio
async def test_sso_credential_fetcher_can_fetch_credentials(
Expand Down

0 comments on commit 40427e6

Please sign in to comment.