Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dapr/aio/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __init__(
] = None,
http_timeout_seconds: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime and via gRPC and HTTP.

Expand All @@ -79,8 +80,10 @@ def __init__(
http_timeout_seconds (int): specify a timeout for http connections
max_grpc_messsage_length (int, optional): The maximum grpc send and receive
message length in bytes.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
super().__init__(address, interceptors, max_grpc_message_length)
super().__init__(address, interceptors, max_grpc_message_length, api_token=api_token)
self.invocation_client = None

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
Expand All @@ -89,7 +92,7 @@ def __init__(
if http_timeout_seconds is None:
http_timeout_seconds = settings.DAPR_HTTP_TIMEOUT_SECONDS
self.invocation_client = DaprInvocationHttpClient(
headers_callback=headers_callback, timeout=http_timeout_seconds
headers_callback=headers_callback, timeout=http_timeout_seconds, api_token=api_token
)
elif invocation_protocol == 'GRPC':
pass
Expand Down
14 changes: 11 additions & 3 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime and initialize gRPC client stub.

Expand All @@ -152,8 +153,13 @@ def __init__(
StreamStreamClientInterceptor, optional): gRPC interceptors.
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
DaprHealth.wait_until_ready()
self._api_token = api_token
# For health check, use explicit token or fall back to global setting
health_token = api_token if api_token is not None else settings.DAPR_API_TOKEN
DaprHealth.wait_until_ready(api_token=health_token)
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
Expand Down Expand Up @@ -184,10 +190,12 @@ def __init__(
else:
interceptors.append(DaprClientTimeoutInterceptorAsync())

if settings.DAPR_API_TOKEN:
# Use explicit token if provided, otherwise fall back to global setting
token = self._api_token if self._api_token is not None else settings.DAPR_API_TOKEN
if token:
api_token_interceptor = DaprClientInterceptorAsync(
[
('dapr-api-token', settings.DAPR_API_TOKEN),
('dapr-api-token', token),
]
)
interceptors.append(api_token_interceptor)
Expand Down
7 changes: 5 additions & 2 deletions dapr/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
http_timeout_seconds: Optional[int] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime via gRPC and HTTP.

Expand All @@ -84,8 +85,10 @@ def __init__(
http_timeout_seconds (int): specify a timeout for http connections
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
super().__init__(address, interceptors, max_grpc_message_length, retry_policy)
super().__init__(address, interceptors, max_grpc_message_length, retry_policy, api_token)
self.invocation_client = None

invocation_protocol = settings.DAPR_API_METHOD_INVOCATION_PROTOCOL.upper()
Expand All @@ -94,7 +97,7 @@ def __init__(
if http_timeout_seconds is None:
http_timeout_seconds = settings.DAPR_HTTP_TIMEOUT_SECONDS
self.invocation_client = DaprInvocationHttpClient(
headers_callback=headers_callback, timeout=http_timeout_seconds
headers_callback=headers_callback, timeout=http_timeout_seconds, api_token=api_token
)
elif invocation_protocol == 'GRPC':
pass
Expand Down
14 changes: 11 additions & 3 deletions dapr/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(
] = None,
max_grpc_message_length: Optional[int] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Connects to Dapr Runtime and initializes gRPC client stub.

Expand All @@ -144,8 +145,13 @@ def __init__(
max_grpc_message_length (int, optional): The maximum grpc send and receive
message length in bytes.
retry_policy (RetryPolicy optional): Specifies retry behaviour
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
DaprHealth.wait_until_ready()
self._api_token = api_token
# For health check, use explicit token or fall back to global setting
health_token = api_token if api_token is not None else settings.DAPR_API_TOKEN
DaprHealth.wait_until_ready(api_token=health_token)
self.retry_policy = retry_policy or RetryPolicy()

useragent = f'dapr-sdk-python/{__version__}'
Expand Down Expand Up @@ -184,10 +190,12 @@ def __init__(

self._channel = grpc.intercept_channel(self._channel, DaprClientTimeoutInterceptor()) # type: ignore

if settings.DAPR_API_TOKEN:
# Use explicit token if provided, otherwise fall back to global setting
token = self._api_token if self._api_token is not None else settings.DAPR_API_TOKEN
if token:
api_token_interceptor = DaprClientInterceptor(
[
('dapr-api-token', settings.DAPR_API_TOKEN),
('dapr-api-token', token),
]
)
self._channel = grpc.intercept_channel( # type: ignore
Expand Down
13 changes: 7 additions & 6 deletions dapr/clients/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import urllib.request
import urllib.error
import time
import urllib.error
import urllib.request

from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, USER_AGENT_HEADER, DAPR_USER_AGENT
from dapr.clients.http.conf import DAPR_API_TOKEN_HEADER, DAPR_USER_AGENT, USER_AGENT_HEADER
from dapr.clients.http.helpers import get_api_url
from dapr.conf import settings


class DaprHealth:
@staticmethod
def wait_until_ready():
def wait_until_ready(api_token: str = None):
health_url = f'{get_api_url()}/healthz/outbound'
headers = {USER_AGENT_HEADER: DAPR_USER_AGENT}
if settings.DAPR_API_TOKEN is not None:
headers[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
token = api_token if api_token is not None else settings.DAPR_API_TOKEN
if token is not None:
headers[DAPR_API_TOKEN_HEADER] = token
timeout = float(settings.DAPR_HEALTH_TIMEOUT)

start = time.time()
Expand Down
14 changes: 11 additions & 3 deletions dapr/clients/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,21 @@ def __init__(
timeout: Optional[int] = 60,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Invokes Dapr over HTTP.

Args:
message_serializer (Serializer): Dapr serializer.
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
DaprHealth.wait_until_ready()
self._api_token = api_token
# For health check, use explicit token or fall back to global setting
health_token = api_token if api_token is not None else settings.DAPR_API_TOKEN
DaprHealth.wait_until_ready(api_token=health_token)

self._timeout = aiohttp.ClientTimeout(total=timeout)
self._serializer = message_serializer
Expand All @@ -71,8 +77,10 @@ async def send_bytes(
if not headers_map.get(CONTENT_TYPE_HEADER):
headers_map[CONTENT_TYPE_HEADER] = DEFAULT_JSON_CONTENT_TYPE

if settings.DAPR_API_TOKEN is not None:
headers_map[DAPR_API_TOKEN_HEADER] = settings.DAPR_API_TOKEN
# Use explicit token if provided, otherwise fall back to global setting
token = self._api_token if self._api_token is not None else settings.DAPR_API_TOKEN
if token is not None:
headers_map[DAPR_API_TOKEN_HEADER] = token

if self._headers_callback is not None:
trace_headers = self._headers_callback()
Expand Down
7 changes: 6 additions & 1 deletion dapr/clients/http/dapr_actor_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
timeout: int = 60,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Invokes Dapr Actors over HTTP.

Expand All @@ -44,8 +45,12 @@ def __init__(
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
retry_policy (RetryPolicy optional): Specifies retry behaviour
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
self._client = DaprHttpClient(message_serializer, timeout, headers_callback, retry_policy)
self._client = DaprHttpClient(
message_serializer, timeout, headers_callback, retry_policy, api_token=api_token
)

async def invoke_method(
self, actor_type: str, actor_id: str, method: str, data: Optional[bytes] = None
Expand Down
9 changes: 8 additions & 1 deletion dapr/clients/http/dapr_invocation_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,23 @@ def __init__(
timeout: int = 60,
headers_callback: Optional[Callable[[], Dict[str, str]]] = None,
retry_policy: Optional[RetryPolicy] = None,
api_token: Optional[str] = None,
):
"""Invokes Dapr's API for method invocation over HTTP.

Args:
timeout (int, optional): Timeout in seconds, defaults to 60.
headers_callback (lambda: Dict[str, str]], optional): Generates header for each request.
retry_policy (RetryPolicy optional): Specifies retry behaviour
api_token (str, optional): Dapr API token for authentication. If not provided,
falls back to DAPR_API_TOKEN environment variable.
"""
self._client = DaprHttpClient(
DefaultJSONSerializer(), timeout, headers_callback, retry_policy=retry_policy
DefaultJSONSerializer(),
timeout,
headers_callback,
retry_policy=retry_policy,
api_token=api_token,
)

async def invoke_method_async(
Expand Down
34 changes: 34 additions & 0 deletions tests/clients/test_dapr_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,40 @@ def test_dapr_api_token_insertion(self):
self.assertEqual(['value1'], resp.headers['hkey1'])
self.assertEqual(['test-token'], resp.headers['hdapr-api-token'])

def test_explicit_api_token(self):
"""Test that explicit api_token parameter is used in client"""
dapr = DaprGrpcClient(
f'{self.scheme}localhost:{self.grpc_port}', api_token='explicit-token'
)
resp = dapr.invoke_method(
app_id='targetId',
method_name='bytes',
data=b'test',
content_type='text/plain',
metadata=(('key1', 'value1'),),
)

self.assertEqual(b'test', resp.data)
self.assertEqual('text/plain', resp.content_type)
self.assertEqual(['explicit-token'], resp.headers['hdapr-api-token'])

@patch.object(settings, 'DAPR_API_TOKEN', 'global-token')
def test_explicit_api_token_overrides_global(self):
"""Test that explicit api_token parameter overrides global setting"""
dapr = DaprGrpcClient(
f'{self.scheme}localhost:{self.grpc_port}', api_token='override-token'
)
resp = dapr.invoke_method(
app_id='targetId',
method_name='bytes',
data=b'test',
content_type='text/plain',
)

self.assertEqual(b'test', resp.data)
# Should use explicit token, not global
self.assertEqual(['override-token'], resp.headers['hdapr-api-token'])

def test_get_save_delete_state(self):
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
key = 'key_1'
Expand Down
34 changes: 34 additions & 0 deletions tests/clients/test_dapr_grpc_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,40 @@ async def test_dapr_api_token_insertion(self):
self.assertEqual(['value1'], resp.headers['hkey1'])
self.assertEqual(['test-token'], resp.headers['hdapr-api-token'])

async def test_explicit_api_token(self):
"""Test that explicit api_token parameter is used in client"""
dapr = DaprGrpcClientAsync(
f'{self.scheme}localhost:{self.grpc_port}', api_token='explicit-token'
)
resp = await dapr.invoke_method(
app_id='targetId',
method_name='bytes',
data=b'test',
content_type='text/plain',
metadata=(('key1', 'value1'),),
)

self.assertEqual(b'test', resp.data)
self.assertEqual('text/plain', resp.content_type)
self.assertEqual(['explicit-token'], resp.headers['hdapr-api-token'])

@patch.object(settings, 'DAPR_API_TOKEN', 'global-token')
async def test_explicit_api_token_overrides_global(self):
"""Test that explicit api_token parameter overrides global setting"""
dapr = DaprGrpcClientAsync(
f'{self.scheme}localhost:{self.grpc_port}', api_token='override-token'
)
resp = await dapr.invoke_method(
app_id='targetId',
method_name='bytes',
data=b'test',
content_type='text/plain',
)

self.assertEqual(b'test', resp.data)
# Should use explicit token, not global
self.assertEqual(['override-token'], resp.headers['hdapr-api-token'])

async def test_get_save_delete_state(self):
dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')
key = 'key_1'
Expand Down
18 changes: 18 additions & 0 deletions tests/clients/test_heatlhcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ def test_wait_until_ready_success_with_api_token(self, mock_urlopen):
self.assertIn('Dapr-api-token', headers)
self.assertEqual(headers['Dapr-api-token'], 'mytoken')

@patch.object(settings, 'DAPR_HTTP_ENDPOINT', 'http://domain.com:3500')
@patch('urllib.request.urlopen')
def test_wait_until_ready_with_explicit_token(self, mock_urlopen):
"""Test that explicit api_token parameter overrides global setting"""
mock_urlopen.return_value.__enter__.return_value = MagicMock(status=200)

try:
DaprHealth.wait_until_ready(api_token='explicit-token')
except Exception as e:
self.fail(f'wait_until_ready() raised an exception unexpectedly: {e}')

mock_urlopen.assert_called_once()

# Check headers are properly set
headers = mock_urlopen.call_args[0][0].headers
self.assertIn('Dapr-api-token', headers)
self.assertEqual(headers['Dapr-api-token'], 'explicit-token')

@patch.object(settings, 'DAPR_HEALTH_TIMEOUT', '2.5')
@patch('urllib.request.urlopen')
def test_wait_until_ready_timeout(self, mock_urlopen):
Expand Down
Loading