1
1
import base64
2
- from langflow .custom import Component
3
- from langflow .inputs import MessageTextInput
4
- from langflow .template import Output
5
- from langflow .schema .message import Message
2
+
6
3
import jwt
7
4
import requests
8
5
from jwt import PyJWK
9
6
7
+ from langflow .custom import Component
8
+ from langflow .inputs import MessageTextInput
9
+ from langflow .schema .message import Message
10
+ from langflow .template import Output
11
+
10
12
11
13
class JWTValidatorComponent (Component ):
12
14
display_name = "JWT Validator"
@@ -33,7 +35,7 @@ class JWTValidatorComponent(Component):
33
35
]
34
36
35
37
def validate_auth (self ) -> Message :
36
- response = requests .get (self .jwks_url )
38
+ response = requests .get (self .jwks_url , timeout = 10 )
37
39
jwks = response .json ()
38
40
headers = jwt .get_unverified_header (self .jwt_token )
39
41
@@ -48,15 +50,16 @@ def validate_auth(self) -> Message:
48
50
payload = jwt .decode (self .jwt_token , public_key , algorithms = ["RS256" ])
49
51
return Message (content = payload ["sub" ])
50
52
except KeyError as e :
51
- raise KeyError (f"Missing key in JWT or JWKS: { str (e )} " )
53
+ error_message = f"Missing key in JWT or JWKS: { e !s} "
54
+ raise KeyError (error_message ) from e
52
55
except jwt .ExpiredSignatureError :
53
56
raise
54
57
except jwt .PyJWTError as e :
55
- raise jwt .InvalidTokenError (f"JWT validation failed: { str (e )} " )
58
+ error_message = f"JWT validation failed: { e !s} "
59
+ raise jwt .InvalidTokenError (error_message ) from e
56
60
57
61
def _int_to_base64url (self , value : int ) -> str :
58
62
"""Convert an integer to a Base64URL-encoded string."""
59
63
byte_length = (value .bit_length () + 7 ) // 8
60
64
value_bytes = value .to_bytes (byte_length , byteorder = "big" )
61
- encoded = base64 .urlsafe_b64encode (value_bytes ).rstrip (b"=" ).decode ("ascii" )
62
- return encoded
65
+ return base64 .urlsafe_b64encode (value_bytes ).rstrip (b"=" ).decode ("ascii" )
0 commit comments