diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 00000000..341e4255 --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,32 @@ +codecov: + branch: master + +coverage: + range: "95..100" + + status: + project: no + +flags: + library: + paths: + - aiobotocore/ + configs: + paths: + - requirements/ + - ".git*" + - "*.toml" + - "*.yml" + changelog: + paths: + - CHANGES/ + - CHANGES.rst + docs: + paths: + - docs/ + - "*.md" + - "*.rst" + - "*.txt" + tests: + paths: + - tests/ diff --git a/tests/botocore/helpers.py b/tests/botocore/helpers.py new file mode 100644 index 00000000..1872c9af --- /dev/null +++ b/tests/botocore/helpers.py @@ -0,0 +1,49 @@ +from contextlib import asynccontextmanager, AsyncExitStack + +import aiobotocore.session +from botocore.stub import Stubber + + +class StubbedSession(aiobotocore.session.AioSession): + def __init__(self, *args, **kwargs): + super(StubbedSession, self).__init__(*args, **kwargs) + self._cached_clients = {} + self._client_stubs = {} + + @asynccontextmanager + async def create_client(self, service_name, *args, **kwargs): + async with AsyncExitStack() as es: + es: AsyncExitStack + if service_name not in self._cached_clients: + client = await es.enter_async_context( + self._create_stubbed_client(service_name, *args, **kwargs)) + self._cached_clients[service_name] = client + yield self._cached_clients[service_name] + + @asynccontextmanager + async def _create_stubbed_client(self, service_name, *args, **kwargs): + async with AsyncExitStack() as es: + es: AsyncExitStack + client = await es.enter_async_context( + super(StubbedSession, self).create_client( + service_name, *args, **kwargs)) + stubber = Stubber(client) + self._client_stubs[service_name] = stubber + yield client + + @asynccontextmanager + async def stub(self, service_name, *args, **kwargs): + async with AsyncExitStack() as es: + es: AsyncExitStack + if service_name not in self._client_stubs: + await es.enter_async_context( + self.create_client(service_name, *args, **kwargs)) + yield self._client_stubs[service_name] + + def activate_stubs(self): + for stub in self._client_stubs.values(): + stub.activate() + + def verify_stubs(self): + for stub in self._client_stubs.values(): + stub.assert_no_pending_responses() diff --git a/tests/botocore/test_credentials.py b/tests/botocore/test_credentials.py index eec3deff..ece85539 100644 --- a/tests/botocore/test_credentials.py +++ b/tests/botocore/test_credentials.py @@ -4,6 +4,12 @@ and adapted to work with asyncio and pytest """ import asyncio +import binascii +import os +import sys +import tempfile +import uuid +from contextlib import asynccontextmanager from datetime import datetime, timedelta import json import subprocess @@ -21,9 +27,18 @@ from aiobotocore.session import AioSession from aiobotocore import credentials + +from botocore.credentials import Credentials, JSONFileCache, ReadOnlyCredentials from botocore.configprovider import ConfigValueStore from botocore.utils import FileWebIdentityTokenLoader, SSOTokenLoader -from aiobotocore.credentials import AioSSOCredentialFetcher, AioSSOProvider +from aiobotocore.credentials import AioSSOCredentialFetcher, AioSSOProvider, \ + AioInstanceMetadataProvider, AioEnvProvider, AioContainerProvider, \ + AioAssumeRoleProvider, AioProfileProviderBuilder, AioCanonicalNameCredentialSourcer +from tests.botocore.helpers import StubbedSession + + +def random_chars(num_chars): + return binascii.hexlify(os.urandom(int(num_chars / 2))).decode('ascii') # From class TestCredentials(BaseEnvVar): @@ -335,6 +350,38 @@ async def test_assumerolefetcher_mfa(): assert call_kwargs['TokenCode'] == 'token-code' +@pytest.mark.moto +@pytest.mark.asyncio +async def test_recursive_assume_role(assume_role_setup): + self = assume_role_setup + + config = ( + '[profile A]\n' + 'role_arn = arn:aws:iam::123456789:role/RoleA\n' + 'source_profile = B\n\n' + '[profile B]\n' + 'role_arn = arn:aws:iam::123456789:role/RoleB\n' + 'source_profile = C\n\n' + '[profile C]\n' + 'aws_access_key_id = abc123\n' + 'aws_secret_access_key = def456\n' + ) + self.write_config(config) + + profile_b_creds = self.create_random_credentials() + profile_b_response = self.create_assume_role_response(profile_b_creds) + profile_a_creds = self.create_random_credentials() + profile_a_response = self.create_assume_role_response(profile_a_creds) + + async with self.create_session(profile='A') as (session, stubber): + stubber.add_response('assume_role', profile_b_response) + stubber.add_response('assume_role', profile_a_response) + + actual_creds = await session.get_credentials() + await self.assert_creds_equal(actual_creds, profile_a_creds) + stubber.assert_no_pending_responses() + + # From class TestAssumeRoleWithWebIdentityCredentialFetcher(BaseEnvVar): def assume_role_web_identity_client_creator(with_response): class _Client(object): @@ -1201,6 +1248,168 @@ async def ssl_credential_fetcher_setup(): yield self +@pytest.fixture +def base_env_var_setup(): + self = Self() + self.environ = {} + with mock.patch('os.environ', self.environ): + yield self + + +def _some_future_time(): + timeobj = datetime.now(tzlocal()) + return timeobj + timedelta(hours=24) + + +def _create_assume_role_response(credentials, expiration=None): + if expiration is None: + expiration = _some_future_time() + + response = { + 'Credentials': { + 'AccessKeyId': credentials.access_key, + 'SecretAccessKey': credentials.secret_key, + 'SessionToken': credentials.token, + 'Expiration': expiration + }, + 'AssumedRoleUser': { + 'AssumedRoleId': 'myroleid', + 'Arn': 'arn:aws:iam::1234567890:user/myuser' + } + } + + return response + + +def _create_random_credentials(): + return Credentials( + 'fake-%s' % random_chars(15), + 'fake-%s' % random_chars(35), + 'fake-%s' % random_chars(45) + ) + + +async def _assert_creds_equal(c1, c2): + c1_frozen = c1 + if not isinstance(c1_frozen, ReadOnlyCredentials): + c1_frozen = await c1.get_frozen_credentials() + c2_frozen = c2 + if not isinstance(c2_frozen, ReadOnlyCredentials): + c2_frozen = c2.get_frozen_credentials() + assert c1_frozen == c2_frozen + + +def _write_config(self, config): + with open(self.config_file, 'w') as f: + f.write(config) + + +@pytest.fixture +def base_assume_role_test_setup(base_env_var_setup): + self = base_env_var_setup + with tempfile.TemporaryDirectory() as td_name: + self.tempdir = td_name + self.config_file = os.path.join(self.tempdir, 'config') + self.environ['AWS_CONFIG_FILE'] = self.config_file + self.environ['AWS_SHARED_CREDENTIALS_FILE'] = str(uuid.uuid4()) + + self.some_future_time = _some_future_time + self.create_assume_role_response = _create_assume_role_response + self.create_random_credentials = _create_random_credentials + self.assert_creds_equal = _assert_creds_equal + self.write_config = partial(_write_config, self) + + yield self + + +def _mock_provider(provider_cls): + mock_instance = mock.Mock(spec=provider_cls) + mock_instance.load.return_value = None + mock_instance.METHOD = provider_cls.METHOD + mock_instance.CANONICAL_NAME = provider_cls.CANONICAL_NAME + return mock_instance + + +@asynccontextmanager +async def _create_session(self, profile=None): + session = StubbedSession(profile=profile) + + # We have to set bogus credentials here or otherwise we'll trigger + # an early credential chain resolution. + async with session.create_client( + 'sts', + aws_access_key_id='spam', + aws_secret_access_key='eggs', + ) as sts: + self.mock_client_creator.return_value = sts + assume_role_provider = AioAssumeRoleProvider( + load_config=lambda: session.full_config, + client_creator=self.mock_client_creator, + cache={}, + profile_name=profile, + credential_sourcer=AioCanonicalNameCredentialSourcer([ + self.env_provider, self.container_provider, + self.metadata_provider + ]), + profile_provider_builder=AioProfileProviderBuilder( + session, + sso_token_cache=JSONFileCache(self.tempdir), + ), + ) + async with session.stub('sts') as stubber: + stubber.activate() + + component_name = 'credential_provider' + resolver = session.get_component(component_name) + available_methods = [p.METHOD for p in resolver.providers] + replacements = { + 'env': self.env_provider, + 'iam-role': self.metadata_provider, + 'container-role': self.container_provider, + 'assume-role': assume_role_provider + } + for name, provider in replacements.items(): + try: + index = available_methods.index(name) + except ValueError: + # The provider isn't in the session + continue + + resolver.providers[index] = provider + + session.register_component( + 'credential_provider', resolver + ) + yield session, stubber + + +@pytest.fixture +def assume_role_setup(base_assume_role_test_setup): + self = base_assume_role_test_setup + + self.environ['AWS_ACCESS_KEY_ID'] = 'access_key' + self.environ['AWS_SECRET_ACCESS_KEY'] = 'secret_key' + + self.mock_provider = _mock_provider + self.create_session = partial(_create_session, self) + + self.metadata_provider = self.mock_provider(AioInstanceMetadataProvider) + self.env_provider = self.mock_provider(AioEnvProvider) + self.container_provider = self.mock_provider(AioContainerProvider) + self.mock_client_creator = mock.Mock(spec=AioSession.create_client) + self.actual_client_region = None + + current_dir = os.path.dirname(os.path.abspath(__file__)) + credential_process = os.path.join( + current_dir, 'utils', 'credentialprocess.py' + ) + self.credential_process = '%s %s' % ( + sys.executable, credential_process + ) + + yield self + + @pytest.mark.moto @pytest.mark.asyncio async def test_sso_credential_fetcher_can_fetch_credentials(