diff --git a/jsonpath_rfc9535/exceptions.py b/jsonpath_rfc9535/exceptions.py index 4aef798..86fa735 100644 --- a/jsonpath_rfc9535/exceptions.py +++ b/jsonpath_rfc9535/exceptions.py @@ -27,6 +27,7 @@ def __str__(self) -> str: if not self.token: return msg + # TODO: Pretty error messages with current line and visual pointer. line, column = self.token.position() return f"{msg}, line {line}, column {column}" diff --git a/jsonpath_rfc9535/lex.py b/jsonpath_rfc9535/lex.py index 5aa2417..61a2bd7 100644 --- a/jsonpath_rfc9535/lex.py +++ b/jsonpath_rfc9535/lex.py @@ -14,6 +14,8 @@ from .tokens import Token from .tokens import TokenType +# ruff: noqa: D102 + RE_WHITESPACE = re.compile(r"[ \n\r\t]+") RE_PROPERTY = re.compile(r"[\u0080-\uFFFFa-zA-Z_][\u0080-\uFFFFa-zA-Z0-9_-]*") RE_INDEX = re.compile(r"-?[0-9]+") @@ -24,6 +26,9 @@ ESCAPES = frozenset(["b", "f", "n", "r", "t", "u", "/", "\\"]) +StateFn = Callable[[], Optional["StateFn"]] + + class Lexer: """JSONPath expression lexical scanner.""" @@ -65,9 +70,9 @@ def __init__(self, query: str) -> None: def run(self) -> None: """Start scanning this lexer's JSONPath expression.""" - state: Optional[StateFn] = lex_root + state: Optional[StateFn] = self.lex_root while state is not None: - state = state(self) + state = state() def emit(self, t: TokenType) -> None: """Append a token of type _t_ to the output tokens list.""" @@ -126,6 +131,38 @@ def accept_match(self, pattern: Pattern[str]) -> bool: return True return False + def accept_string_literal(self, quote: str, token_type: TokenType) -> bool: + """Scan and emit a string literal token. + + Assumes the next character is equal to `quote`. + + Return `True` is successful or `False` otherwise, in which case an error token + will have been emitted. The caller should treat `False` as an error condition. + """ + self.ignore() # ignore opening quote + + while True: + c = self.next() + + if c == "\\": + peeked = self.peek() + if peeked in ESCAPES or peeked == quote: + self.next() + else: + self.error("invalid escape") + return False + + if not c: + self.error(f"unclosed string starting at index {self.start}") + return False + + if c == quote: + self.backup() # don't emit the closing quote + self.emit(token_type) + self.next() + self.ignore() # ignore closing quote + return True + def ignore_whitespace(self) -> bool: """Move the pointer past any whitespace.""" if self.pos != self.start: @@ -144,7 +181,6 @@ def ignore_whitespace(self) -> bool: def error(self, msg: str) -> None: """Emit an error token.""" - # TODO: better error messages. self.tokens.append( Token( TokenType.ERROR, @@ -155,348 +191,291 @@ def error(self, msg: str) -> None: ) ) + def lex_root(self) -> Optional[StateFn]: + c = self.next() -StateFn = Callable[[Lexer], Optional["StateFn"]] - - -def lex_root(l: Lexer) -> Optional[StateFn]: # noqa: D103 - c = l.next() - - if c != "$": - l.error(f"expected '$', found {c!r}") - return None + if c != "$": + self.error(f"expected '$', found {c!r}") + return None - l.emit(TokenType.ROOT) - return lex_segment + self.emit(TokenType.ROOT) + return self.lex_segment + def lex_segment(self) -> Optional[StateFn]: # noqa: PLR0911 + if self.ignore_whitespace() and not self.peek(): + self.error("unexpected trailing whitespace") + return None -def lex_segment(l: Lexer) -> Optional[StateFn]: # noqa: D103, PLR0911 - if l.ignore_whitespace() and not l.peek(): - l.error("unexpected trailing whitespace") - return None + c = self.next() - c = l.next() + if c == "": + self.emit(TokenType.EOF) + return None - if c == "": - l.emit(TokenType.EOF) + if c == ".": + if self.peek() == ".": + self.next() + self.emit(TokenType.DOUBLE_DOT) + return self.lex_descendant_segment + return self.lex_shorthand_selector + + if c == "[": + self.emit(TokenType.LBRACKET) + self.bracket_stack.append((c, self.pos - 1)) + return self.lex_inside_bracketed_segment + + if self.filter_depth: + self.backup() + return self.lex_inside_filter + + self.error(f"expected '.', '..' or a bracketed selection, found {c!r}") return None - if c == ".": - if l.peek() == ".": - l.next() - l.emit(TokenType.DOUBLE_DOT) - return lex_descendant_segment - return lex_shorthand_selector + def lex_descendant_segment(self) -> Optional[StateFn]: + c = self.next() - if c == "[": - l.emit(TokenType.LBRACKET) - l.bracket_stack.append((c, l.pos - 1)) - return lex_inside_bracketed_segment + if c == "": + self.error("bald descendant segment") + return None - if l.filter_depth: - l.backup() - return lex_inside_filter + if c == "*": + self.emit(TokenType.WILD) + return self.lex_segment - l.error(f"expected '.', '..' or a bracketed selection, found {c!r}") - return None + if c == "[": + self.emit(TokenType.LBRACKET) + self.bracket_stack.append((c, self.pos - 1)) + return self.lex_inside_bracketed_segment + self.backup() -def lex_descendant_segment(l: Lexer) -> Optional[StateFn]: # noqa: D103 - c = l.next() + if self.accept_match(RE_PROPERTY): + self.emit(TokenType.PROPERTY) + return self.lex_segment - if c == "": - l.error("bald descendant segment") + self.next() + self.error(f"unexpected descendant selection token {c!r}") return None - if c == "*": - l.emit(TokenType.WILD) - return lex_segment + def lex_shorthand_selector(self) -> Optional[StateFn]: + self.ignore() # ignore dot - if c == "[": - l.emit(TokenType.LBRACKET) - l.bracket_stack.append((c, l.pos - 1)) - return lex_inside_bracketed_segment - - l.backup() + if self.accept_match(RE_WHITESPACE): + self.error("unexpected whitespace after dot") + return None - if l.accept_match(RE_PROPERTY): - l.emit(TokenType.PROPERTY) - return lex_segment + c = self.next() - l.next() - l.error(f"unexpected descendant selection token {c!r}") - return None + if c == "*": + self.emit(TokenType.WILD) + return self.lex_segment + self.backup() -def lex_shorthand_selector(l: Lexer) -> Optional[StateFn]: # noqa: D103 - l.ignore() # ignore dot + if self.accept_match(RE_PROPERTY): + self.emit(TokenType.PROPERTY) + return self.lex_segment - if l.accept_match(RE_WHITESPACE): - l.error("unexpected whitespace after dot") + self.error(f"unexpected shorthand selector {c!r}") return None - c = l.next() - - if c == "*": - l.emit(TokenType.WILD) - return lex_segment - - l.backup() - - if l.accept_match(RE_PROPERTY): - l.emit(TokenType.PROPERTY) - return lex_segment - - l.error(f"unexpected shorthand selector {c!r}") - return None + def lex_inside_bracketed_segment(self) -> Optional[StateFn]: # noqa: PLR0911, PLR0912 + while True: + self.ignore_whitespace() + c = self.next() + if c == "]": + if not self.bracket_stack or self.bracket_stack[-1][0] != "[": + self.backup() + self.error("unbalanced brackets") + return None -def lex_inside_bracketed_segment(l: Lexer) -> Optional[StateFn]: # noqa: PLR0911, D103 - while True: - l.ignore_whitespace() - c = l.next() + self.bracket_stack.pop() + self.emit(TokenType.RBRACKET) + return self.lex_segment - if c == "]": - if not l.bracket_stack or l.bracket_stack[-1][0] != "[": - l.backup() - l.error("unbalanced brackets") + if c == "": + self.error("unbalanced brackets") return None - l.bracket_stack.pop() - l.emit(TokenType.RBRACKET) - return lex_segment - - if c == "": - l.error("unbalanced brackets") - return None - - if c == "*": - l.emit(TokenType.WILD) - continue - - if c == "?": - l.emit(TokenType.FILTER) - l.filter_depth += 1 - return lex_inside_filter - - if c == ",": - l.emit(TokenType.COMMA) - continue - - if c == ":": - l.emit(TokenType.COLON) - continue + if c == "*": + self.emit(TokenType.WILD) + continue - if c == "'": - # Quoted dict/object key/property name - return lex_single_quoted_string_inside_bracket_segment + if c == "?": + self.emit(TokenType.FILTER) + self.filter_depth += 1 + return self.lex_inside_filter - if c == '"': - # Quoted dict/object key/property name - return lex_double_quoted_string_inside_bracket_segment + if c == ",": + self.emit(TokenType.COMMA) + continue - # default - l.backup() + if c == ":": + self.emit(TokenType.COLON) + continue - if l.accept_match(RE_INDEX): - # Index selector or part of a slice selector. - l.emit(TokenType.INDEX) - continue + if c == "'": + # Quoted dict/object key/property name + if self.accept_string_literal( + c, token_type=TokenType.SINGLE_QUOTE_STRING + ): + continue + return None - l.error(f"unexpected token {c!r} in bracketed selection") - return None + if c == '"': + # Quoted dict/object key/property name + if self.accept_string_literal( + c, token_type=TokenType.DOUBLE_QUOTE_STRING + ): + continue + return None + # default + self.backup() -def lex_inside_filter(l: Lexer) -> Optional[StateFn]: # noqa: D103, PLR0915, PLR0912, PLR0911 - while True: - l.ignore_whitespace() - c = l.next() + if self.accept_match(RE_INDEX): + # Index selector or part of a slice selector. + self.emit(TokenType.INDEX) + continue - if c == "": - l.error("unclosed bracketed selection") + self.error(f"unexpected token {c!r} in bracketed selection") return None - if c == "]": - l.filter_depth -= 1 - l.backup() - return lex_inside_bracketed_segment + def lex_inside_filter(self) -> Optional[StateFn]: # noqa: PLR0911, PLR0912, PLR0915 + while True: + self.ignore_whitespace() + c = self.next() - if c == ",": - l.emit(TokenType.COMMA) - # If we have unbalanced parens, we are inside a function call and a - # comma separates arguments. Otherwise a comma separates selectors. - if l.func_call_stack: - continue - l.filter_depth -= 1 - return lex_inside_bracketed_segment - - if c == "'": - return lex_single_quoted_string_inside_filter_expression - - if c == '"': - return lex_double_quoted_string_inside_filter_expression - - if c == "(": - l.emit(TokenType.LPAREN) - l.bracket_stack.append((c, l.pos - 1)) - # Are we in a function call? If so, a function argument contains parens. - if l.func_call_stack: - l.func_call_stack[-1] += 1 - continue - - if c == ")": - if not l.bracket_stack or l.bracket_stack[-1][0] != "(": - l.backup() - l.error("unbalanced parentheses") + if c == "": + self.error("unclosed bracketed selection") return None - l.bracket_stack.pop() - l.emit(TokenType.RPAREN) - # Are we closing a function call or a parenthesized expression? - if l.func_call_stack: - if l.func_call_stack[-1] == 1: - l.func_call_stack.pop() - else: - l.func_call_stack[-1] -= 1 - continue - - if c == "$": - l.emit(TokenType.ROOT) - return lex_segment + if c == "]": + self.filter_depth -= 1 + self.backup() + return self.lex_inside_bracketed_segment + + if c == ",": + self.emit(TokenType.COMMA) + # If we have unbalanced parens, we are inside a function call and a + # comma separates arguments. Otherwise a comma separates selectors. + if self.func_call_stack: + continue + self.filter_depth -= 1 + return self.lex_inside_bracketed_segment + + if c == "'": + if self.accept_string_literal( + c, token_type=TokenType.SINGLE_QUOTE_STRING + ): + continue + return None - if c == "@": - l.emit(TokenType.CURRENT) - return lex_segment + if c == '"': + if self.accept_string_literal( + c, token_type=TokenType.DOUBLE_QUOTE_STRING + ): + continue + return None - if c == ".": - l.backup() - return lex_segment + if c == "(": + self.emit(TokenType.LPAREN) + self.bracket_stack.append((c, self.pos - 1)) + # Are we in a function call? If so, a function argument contains parens. + if self.func_call_stack: + self.func_call_stack[-1] += 1 + continue - if c == "!": - if l.peek() == "=": - l.next() - l.emit(TokenType.NE) - else: - l.emit(TokenType.NOT) - continue + if c == ")": + if not self.bracket_stack or self.bracket_stack[-1][0] != "(": + self.backup() + self.error("unbalanced parentheses") + return None - if c == "=": - if l.peek() == "=": - l.next() - l.emit(TokenType.EQ) + self.bracket_stack.pop() + self.emit(TokenType.RPAREN) + # Are we closing a function call or a parenthesized expression? + if self.func_call_stack: + if self.func_call_stack[-1] == 1: + self.func_call_stack.pop() + else: + self.func_call_stack[-1] -= 1 continue - l.backup() - l.error(f"unexpected filter selector token {c!r}") - return None - - if c == "<": - if l.peek() == "=": - l.next() - l.emit(TokenType.LE) - else: - l.emit(TokenType.LT) - continue + if c == "$": + self.emit(TokenType.ROOT) + return self.lex_segment - if c == ">": - if l.peek() == "=": - l.next() - l.emit(TokenType.GE) - else: - l.emit(TokenType.GT) - continue - - l.backup() - - if l.accept("&&"): - l.emit(TokenType.AND) - elif l.accept("||"): - l.emit(TokenType.OR) - elif l.accept("true"): - l.emit(TokenType.TRUE) - elif l.accept("false"): - l.emit(TokenType.FALSE) - elif l.accept("null"): - l.emit(TokenType.NULL) - elif l.accept_match(RE_FLOAT): - l.emit(TokenType.FLOAT) - elif l.accept_match(RE_INT): - l.emit(TokenType.INT) - elif l.accept_match(RE_FUNCTION_NAME) and l.peek() == "(": - # Keep track of parentheses for this function call. - l.func_call_stack.append(1) - l.emit(TokenType.FUNCTION) - l.bracket_stack.append(("(", l.pos)) - l.next() - l.ignore() # ignore LPAREN - else: - l.error(f"unexpected filter selector token {c!r}") - return None + if c == "@": + self.emit(TokenType.CURRENT) + return self.lex_segment + if c == ".": + self.backup() + return self.lex_segment -def lex_string_factory(quote: str, state: StateFn) -> StateFn: - """Return a state function for scanning string literals. - - Arguments: - quote: One of `'` or `"`. The string delimiter. - state: The state function to return control to after scanning the string. - """ - tt = ( - TokenType.SINGLE_QUOTE_STRING if quote == "'" else TokenType.DOUBLE_QUOTE_STRING - ) + if c == "!": + if self.peek() == "=": + self.next() + self.emit(TokenType.NE) + else: + self.emit(TokenType.NOT) + continue - def _lex_string(l: Lexer) -> Optional[StateFn]: - l.ignore() # ignore opening quote + if c == "=": + if self.peek() == "=": + self.next() + self.emit(TokenType.EQ) + continue - if l.peek() == "": - # an empty string - l.emit(tt) - l.next() - l.ignore() - return state + self.backup() + self.error(f"unexpected filter selector token {c!r}") + return None - while True: - c = l.next() + if c == "<": + if self.peek() == "=": + self.next() + self.emit(TokenType.LE) + else: + self.emit(TokenType.LT) + continue - if c == "\\": - peeked = l.peek() - if peeked in ESCAPES or peeked == quote: - l.next() + if c == ">": + if self.peek() == "=": + self.next() + self.emit(TokenType.GE) else: - l.error("invalid escape") - return None + self.emit(TokenType.GT) + continue - if not c: - l.error(f"unclosed string starting at index {l.start}") + self.backup() + + if self.accept("&&"): + self.emit(TokenType.AND) + elif self.accept("||"): + self.emit(TokenType.OR) + elif self.accept("true"): + self.emit(TokenType.TRUE) + elif self.accept("false"): + self.emit(TokenType.FALSE) + elif self.accept("null"): + self.emit(TokenType.NULL) + elif self.accept_match(RE_FLOAT): + self.emit(TokenType.FLOAT) + elif self.accept_match(RE_INT): + self.emit(TokenType.INT) + elif self.accept_match(RE_FUNCTION_NAME) and self.peek() == "(": + # Keep track of parentheses for this function call. + self.func_call_stack.append(1) + self.emit(TokenType.FUNCTION) + self.bracket_stack.append(("(", self.pos)) + self.next() + self.ignore() # ignore LPAREN + else: + self.error(f"unexpected filter selector token {c!r}") return None - if c == quote: - l.backup() - l.emit(tt) - l.next() - l.ignore() # ignore closing quote - return state - - return _lex_string - - -lex_single_quoted_string_inside_bracket_segment = lex_string_factory( - "'", lex_inside_bracketed_segment -) - -lex_double_quoted_string_inside_bracket_segment = lex_string_factory( - '"', lex_inside_bracketed_segment -) - - -lex_single_quoted_string_inside_filter_expression = lex_string_factory( - "'", lex_inside_filter -) - -lex_double_quoted_string_inside_filter_expression = lex_string_factory( - '"', lex_inside_filter -) - def lex(query: str) -> Tuple[Lexer, List[Token]]: """Return a lexer for _query_ and an array to be populated with Tokens.""" diff --git a/pyproject.toml b/pyproject.toml index e616d40..439715c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,7 +161,17 @@ select = [ "YTT", ] -ignore = ["S105", "S101", "D107", "D105", "PLR0913", "SIM108", "PT001", "A005"] +ignore = [ + "S105", + "S101", + "D107", + "D105", + "PLR0913", + "SIM108", + "PT001", + "A005", + "PLW1641", +] fixable = ["I"] unfixable = []