diff --git a/splitgpg2/__init__.py b/splitgpg2/__init__.py index 7fc50a7..9264ce2 100755 --- a/splitgpg2/__init__.py +++ b/splitgpg2/__init__.py @@ -115,7 +115,6 @@ class OptionHandlingType(enum.Enum): # pylint: disable=invalid-name fake = 1 verify = 2 - override = 3 class HashAlgo: @@ -172,7 +171,8 @@ def extract_args(untrusted_line: bytes, sep: bytes = b' ') -> Tuple[bytes, Optio return untrusted_line, None # none of our uses allow 0, so do not allow it -_int_re = re.compile(rb'\A[1-9][0-9]*\Z') +_int_re: re.Pattern[bytes] = re.compile(rb'\A[1-9][0-9]*\Z') +_hash_regex = re.compile(rb'\A[0-9A-F]+\Z') def sanitize_int(untrusted_arg: bytes, min_value: int, max_value: int) -> int: """ @@ -218,6 +218,7 @@ class GpgServer: commands: Dict[bytes, 'NoneCallback'] seen_data: bool config_loaded: bool + agent_unrestricted_socket_path: Optional[str] agent_socket_path: Optional[str] agent_reader: Optional[asyncio.StreamReader] agent_writer: Optional[asyncio.StreamWriter] @@ -225,6 +226,8 @@ class GpgServer: log: logging.Logger cache_nonce_regex: re.Pattern[bytes] = re.compile(rb'\A[0-9A-F]{24}\Z') + # Any command argument ever sent to the agent should match this pattern. + command_argument_regex: re.Pattern[bytes] = re.compile(rb'\A[0-9A-Za-z_=. -]*\Z') __slots__ = ('verbose_notifications', 'timer_delay', @@ -243,6 +246,7 @@ class GpgServer: 'seen_data', 'config_loaded', 'agent_socket_path', + 'agent_unrestricted_socket_path', 'agent_reader', 'agent_writer', 'source_keyring_dir', @@ -273,6 +277,7 @@ def __init__(self, reader: asyncio.StreamReader, self.log = logging.getLogger('splitgpg2.Server') self.agent_socket_path = None + self.agent_unrestricted_socket_path = None self.agent_reader: Optional[asyncio.StreamReader] = None self.agent_writer: Optional[asyncio.StreamWriter] = None @@ -454,14 +459,20 @@ async def connect_agent(self) -> None: dirs = subprocess.check_output( ['gpgconf', *self.homedir_opts(), '--list-dirs', '-o/dev/stdout']) - if self.allow_keygen: - socket_field = b'agent-socket:' - else: - socket_field = b'agent-extra-socket:' + unrestricted_socket_field = b'agent-socket' + socket_field = unrestricted_socket_field if self.allow_keygen else b'agent-extra-socket' # search for agent-socket:/run/user/1000/gnupg/S.gpg-agent - agent_socket_path = [d.split(b':', 1)[1] for d in dirs.splitlines() - if d.startswith(socket_field)][0] - self.agent_socket_path = agent_socket_path.decode() + for d in dirs.splitlines(): + key, value = d.split(b':') + if key == socket_field: + self.agent_socket_path = value.decode("UTF-8", "surrogateescape") + if key == unrestricted_socket_field: + self.agent_unrestricted_socket_path = value.decode("UTF-8", "surrogateescape") + if ((self.agent_unrestricted_socket_path is not None) and + (self.agent_socket_path is not None)): + break + else: + raise RuntimeError("bad output from gpgconf") self.agent_reader, self.agent_writer = await asyncio.open_unix_connection( path=self.agent_socket_path) @@ -470,7 +481,7 @@ async def connect_agent(self) -> None: self.notify('connected') # wait for agent hello - await self.handle_agent_response() + self.client_write(await self.read_hello(self.agent_reader)) def close(self, reason: str, log_level: int = logging.ERROR) -> None: self.log.log(log_level, '%s; Closing!', reason) @@ -544,6 +555,7 @@ def default_commands(self) -> Dict[bytes, 'NoneCallback']: b'BYE': self.command_BYE, b'SCD': self.command_SCD, b'READKEY': self.command_READKEY, + b'NOP': self.command_NOP, } @staticmethod @@ -551,13 +563,13 @@ def default_options() -> Dict[bytes, Tuple[OptionHandlingType, Optional[bytes]]] return { b'ttyname': (OptionHandlingType.fake, b'OK'), b'ttytype': (OptionHandlingType.fake, b'OK'), - b'display': (OptionHandlingType.override, b':0'), + b'display': (OptionHandlingType.fake, b'OK'), b'lc-ctype': (OptionHandlingType.fake, b'OK'), b'lc-messages': (OptionHandlingType.fake, b'OK'), b'putenv': (OptionHandlingType.fake, b'OK'), b'pinentry-mode': (OptionHandlingType.fake, b'ERR 67108924 Not supported '), - b'allow-pinentry-notify': (OptionHandlingType.verify, None), - b'agent-awareness': (OptionHandlingType.verify, b'2.1.0') + b'allow-pinentry-notify': (OptionHandlingType.fake, b'OK'), + b'agent-awareness': (OptionHandlingType.verify, b'2.1.0'), } @staticmethod @@ -640,15 +652,24 @@ def fake_respond(self, response: bytes) -> None: @staticmethod def verify_keygrip_arguments(min_count: int, max_count: int, - untrusted_args: Optional[bytes]) -> bytes: + untrusted_args: Optional[bytes], + allow_list: bool) -> bytes: if untrusted_args is None: raise Filtered - args_regex = re.compile(rb'\A[0-9A-F]{40}( [0-9A-F]{40}){%d,%d}\Z' % - (min_count-1, max_count-1)) - - if args_regex.match(untrusted_args) is None: + if allow_list and untrusted_args.startswith(b'--list'): + if untrusted_args == b'--list': + return b'--list' + if untrusted_args[6] == 61: # ASCII '=' + # 1000 is the default value used by gpg2 + return b'--list=%d' % sanitize_int(untrusted_args[7:], 1, 1000) + raise Filtered + untrusted_args_list: List[bytes] = untrusted_args.split(b' ') + if not (min_count <= len(untrusted_args_list) <= max_count): raise Filtered - return untrusted_args + for untrusted_arg in untrusted_args_list: + if len(untrusted_arg) != 40 or not _hash_regex.match(untrusted_arg): + raise Filtered + return b' '.join(untrusted_args_list) def sanitize_key_desc(self, untrusted_args: bytes) -> bytes: untrusted_args = untrusted_args.replace(b'+', b' ') @@ -678,6 +699,11 @@ async def command_OPTION(self, untrusted_args: Optional[bytes]) -> None: if not untrusted_args: raise Filtered + if untrusted_args == b'pinentry-mode=ask': + # This is the default and a no-op + self.fake_respond(b'OK') + return + untrusted_name, untrusted_value = extract_args(untrusted_args, b'=') try: action, opts = self.options[untrusted_name] @@ -685,12 +711,7 @@ async def command_OPTION(self, untrusted_args: Optional[bytes]) -> None: except KeyError as e: raise Filtered from e - if action == OptionHandlingType.override: - if opts is not None: - option_arg = b'%s=%s' % (name, opts) - else: - option_arg = name - elif action == OptionHandlingType.verify: + if action == OptionHandlingType.verify: if callable(opts): verified = opts(untrusted_value=untrusted_value) elif isinstance(opts, Pattern): @@ -725,22 +746,14 @@ async def command_HAVEKEY(self, untrusted_args: Optional[bytes]) -> None: if untrusted_args is None: raise Filtered # upper keygrip limit is arbitary - if untrusted_args.startswith(b'--list'): - if b'=' in untrusted_args: - # 1000 is the default value used by gpg2 - limit = sanitize_int(untrusted_args[len(b'--list='):], 1, 1000) - args = b'--list=%d' % limit - else: - if untrusted_args != b'--list': - raise Filtered - args = b'--list' - else: - args = self.verify_keygrip_arguments(1, 200, untrusted_args) - await self.send_agent_command(b'HAVEKEY', args) + args = self.verify_keygrip_arguments(1, 200, untrusted_args, True) + unrestricted = args.startswith(b'--list') and not self.allow_keygen + await self.send_agent_command(b'HAVEKEY', args, unrestricted) async def command_KEYINFO(self, untrusted_args: Optional[bytes]) -> None: - args = self.verify_keygrip_arguments(1, 1, untrusted_args) - await self.send_agent_command(b'KEYINFO', args) + args = self.verify_keygrip_arguments(1, 1, untrusted_args, True) + unrestricted = args.startswith(b'--list') and not self.allow_keygen + await self.send_agent_command(b'KEYINFO', args, unrestricted) async def command_GENKEY(self, untrusted_args: Optional[bytes]) -> None: if not self.allow_keygen: @@ -779,12 +792,12 @@ async def command_GENKEY(self, untrusted_args: Optional[bytes]) -> None: await self.send_agent_command(b'GENKEY', b' '.join(args)) async def command_SIGKEY(self, untrusted_args: Optional[bytes]) -> None: - args = self.verify_keygrip_arguments(1, 1, untrusted_args) + args = self.verify_keygrip_arguments(1, 1, untrusted_args, False) await self.send_agent_command(b'SIGKEY', args) await self.setkeydesc(args) async def command_SETKEY(self, untrusted_args: Optional[bytes]) -> None: - args = self.verify_keygrip_arguments(1, 1, untrusted_args) + args = self.verify_keygrip_arguments(1, 1, untrusted_args, False) await self.send_agent_command(b'SETKEY', args) await self.setkeydesc(args) @@ -809,11 +822,14 @@ async def setkeydesc(self, keygrip: bytes) -> None: assert key is not None, 'no key?' desc = b'%s\nFingerprint: %s%s' % ( - (b'UID: ' + key.first_uid.split(b'\n')[0]) if key.first_uid is not None else '', + (b'UID: ' + key.first_uid.split(b'\n')[0]) + if key.first_uid is not None + else b'', key.fingerprint, subkey_desc) - self.agent_write(b'SETKEYDESC %s\n' % self.percent_plus_escape(desc)) + assert self.agent_writer is not None, "no writer?" + self.agent_write(b'SETKEYDESC %s\n' % self.percent_plus_escape(desc), self.agent_writer) assert self.agent_reader is not None untrusted_line = await self.agent_reader.readline() @@ -909,6 +925,11 @@ async def command_SETKEYDESC(self, untrusted_args: Optional[bytes]) -> None: # pylint: disable=unused-argument self.fake_respond(b'OK') + async def command_NOP(self, untrusted_args: Optional[bytes]) -> None: + # Ignores all arguments. + # pylint: disable=unused-argument + self.fake_respond(b'OK') + async def command_PKDECRYPT(self, untrusted_args: Optional[bytes]) -> None: if untrusted_args is not None: raise Filtered @@ -930,30 +951,27 @@ async def command_SETHASH(self, untrusted_args: Optional[bytes]) -> None: except KeyError as e: raise Filtered from e - if not untrusted_hash: + if len(untrusted_hash) != alg_param.len: raise Filtered - hash_regex = re.compile(rb'\A[0-9A-F]{%d}\Z' % alg_param.len) - if hash_regex.match(untrusted_hash) is None: + if not _hash_regex.match(untrusted_hash): raise Filtered hash_value = untrusted_hash - await self.send_agent_command( - b'SETHASH', b'%d %s' % (alg, hash_value)) + # Hash values and ASCII decimal numbers are safe to pass. + await self.send_agent_command(b'SETHASH', b'%d %s' % (alg, hash_value)) async def command_PKSIGN(self, untrusted_args: Optional[bytes]) -> None: if untrusted_args is not None: if not untrusted_args.startswith(b'-- '): raise Filtered - untrusted_args = untrusted_args[3:] - if self.cache_nonce_regex.match(untrusted_args) is None: + if self.cache_nonce_regex.match(untrusted_args[3:]) is None: raise Filtered - args = b'-- ' + untrusted_args - else: - args = None + args = untrusted_args self.request_timer('PKSIGN') + # String checked to be '-- ' followed by a cache nonce await self.send_agent_command(b'PKSIGN', args) async def command_GETINFO(self, untrusted_args: Optional[bytes]) -> None: @@ -985,7 +1003,7 @@ async def command_READKEY(self, untrusted_args: Optional[bytes]) -> None: raise Filtered if untrusted_args.startswith(b'-- '): untrusted_args = untrusted_args[3:] - args = self.verify_keygrip_arguments(1, 1, untrusted_args) + args = self.verify_keygrip_arguments(1, 1, untrusted_args, False) await self.send_agent_command(b'READKEY', b'-- ' + args) @@ -1008,46 +1026,73 @@ def get_inquires_for_command(self, command: bytes) -> Dict[bytes, 'ArgCallback'] } return {} - async def send_agent_command(self, command: bytes, args: Optional[bytes]) -> None: + async def send_agent_command(self, command: bytes, args: Optional[bytes], + unrestricted: bool=False) -> None: """ Sends command to local gpg agent and handle the response """ expected_inquires = self.get_inquires_for_command(command) - if args: - cmd_with_args = command + b' ' + args + b'\n' + assert self.agent_reader is not None, "no reader?" + assert self.agent_writer is not None, "no writer?" + if unrestricted and not self.allow_keygen: + reader, writer = await asyncio.open_unix_connection( + self.agent_unrestricted_socket_path) + await self.read_hello(reader) else: - cmd_with_args = command + b'\n' - self.agent_write(cmd_with_args) - while True: - more_expected = await self.handle_agent_response( - expected_inquires=expected_inquires) - if not more_expected: - break + reader, writer = self.agent_reader, self.agent_writer + try: + if args: + if not self.command_argument_regex.match(args): + raise AssertionError("BUG: corrupt command about to be sent to agent!") + cmd_with_args = command + b' ' + args + b'\n' + else: + cmd_with_args = command + b'\n' + self.agent_write(cmd_with_args, writer) + while True: + more_expected = await self.handle_agent_response(expected_inquires, reader) + if not more_expected: + break + finally: + if reader is not self.agent_reader: + writer.close() - def agent_write(self, data: bytes) -> None: - writer = self.agent_writer + async def read_hello(self, agent_reader: asyncio.StreamReader) -> bytes: + while True: + line = await agent_reader.readline() + if not line.endswith(b'\n'): + raise ProtocolError("premature EOF from agent connection") + if b'\n' in line[:-1]: + raise ProtocolError("newline in readline() result???") + if line.startswith(b'#'): + continue + if line == b'OK' or line.startswith(b'OK '): + return line + raise ProtocolError("agent responded with something other than 'OK'" + " to initial connection") + + def agent_write(self, data: bytes, writer: asyncio.StreamWriter) -> None: assert writer is not None, 'agent_write called with no agent writer?' self.log_io('A <<<', data) writer.write(data) async def handle_agent_response(self, - expected_inquires: Optional[Dict[bytes, 'ArgCallback']] = None) \ - -> bool: + expected_inquires: Dict[bytes, 'ArgCallback'], + agent_reader: asyncio.StreamReader) -> bool: """ Receive and handle one agent response. Return whether there are more expected """ - assert self.agent_reader is not None assert self.client_writer is not None if self.client_writer.is_closing(): # If something went wrong, agent might send back junk. # Discard all remaining data from agent and return. - while await self.agent_reader.read(1024): + while await agent_reader.read(1024): pass return False - expected_inquires = (expected_inquires if expected_inquires is not None - else {}) # We generally consider the agent as trusted. But since the client can # determine part of the response we handle this here as untrusted. - untrusted_line = await self.agent_reader.readline() + untrusted_line = await agent_reader.readline() untrusted_line = untrusted_line.rstrip(b'\n') self.log_io('A >>>', untrusted_line) + if untrusted_line.startswith(b'#'): + # Comment, ignore + return True untrusted_res, untrusted_args = extract_args(untrusted_line) if untrusted_res in (b'D', b'S'): # passthrough to the client @@ -1269,7 +1314,7 @@ def validate_keygen_sexp(*, untrusted_sexp: 'SExpr') -> None: untrusted_args=untrusted_args) async def inquire_command_D(self, validate_sexp: 'SExprValidator', *, - untrusted_args: bytes) -> bool: + untrusted_args: bytes) -> bool: # We parse and then reserialize the sexpr. Currently we assume that the # sexpr fits in one assuan line. This line length also implicitly # limits the sexpr sizes. @@ -1283,7 +1328,9 @@ async def inquire_command_D(self, validate_sexp: 'SExprValidator', *, raise Filtered from e args = untrusted_sexp - self.agent_write(b'D ' + self.escape_D(self.serialize_sexpr(args)) + b'\n') + assert self.agent_writer is not None, "no writer?" + self.agent_write(b'D ' + self.escape_D(self.serialize_sexpr(args)) + b'\n', + self.agent_writer) self.seen_data = True return True @@ -1348,7 +1395,7 @@ def _parse_sexpr(cls, untrusted_arg: bytes, nesting: int) -> Tuple[List['SExpr'] rest: bytes value: Union[List['SExpr'], bytes] - if untrusted_arg[0] in range(0x30, 0x40): + if 0x30 <= untrusted_arg[0] <= 0x40: length_s, rest = untrusted_arg.split(b':', 1) length = sanitize_int(length_s, 1, len(rest)) value, rest = rest[0:length], rest[length:] @@ -1381,7 +1428,8 @@ def serialize_item(item: 'SExpr') -> bytes: async def inquire_command_END(self, *, untrusted_args: bytes) -> bool: if untrusted_args: raise Filtered('unexpected arguments to END') - self.agent_write(b'END\n') + assert self.agent_writer is not None, "no writer?" + self.agent_write(b'END\n', self.agent_writer) return False # endregion diff --git a/splitgpg2/test_server.py b/splitgpg2/test_server.py index 75a9a77..517c542 100644 --- a/splitgpg2/test_server.py +++ b/splitgpg2/test_server.py @@ -171,7 +171,7 @@ def generate_key(self, *, subkey_usage: str = 'encrypt', client: bool = True) -> Tuple[bytes, bytes]: - fpr_re = re.compile(rb'\A[0-9A-F]{40}\Z') + fpr_re = re.compile(rb'\A[0-9A-F]{40}(?:[0-9A-F]{24})?\Z') email = 'a' + str(self.counter) + self.key_uid self.counter += 1 handle = base64.b64encode(os.urandom(32)).decode('ascii', 'strict')