diff --git a/core/ast/annotations.py b/core/ast/annotations.py new file mode 100644 index 0000000..78a7ce6 --- /dev/null +++ b/core/ast/annotations.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Generic, Optional, TypeVar + +from lexer.token import Token + +T = TypeVar("T") + + +@dataclass(frozen=True) +class Expr(ABC): + @abstractmethod + def accept(self, visitor: Visitor[T]) -> T: ... + + class Visitor(ABC, Generic[T]): + @abstractmethod + def visit_type_expr(self, expr: TypeExpr) -> T: ... + + @abstractmethod + def visit_schema_expr(self, expr: SchemaExpr) -> T: ... + + @abstractmethod + def visit_schema_element_expr(self, expr: SchemaElementExpr) -> T: ... + + +@dataclass(frozen=True) +class TypeExpr(Expr): + name: Token + schema: Optional[SchemaExpr] + + def accept(self, visitor: Expr.Visitor[T]) -> T: + return visitor.visit_type_expr(self) + + +@dataclass(frozen=True) +class SchemaExpr(Expr): + left: Token + elements: list[Expr] + right: Token + + def accept(self, visitor: Expr.Visitor[T]) -> T: + return visitor.visit_schema_expr(self) + + +@dataclass(frozen=True) +class SchemaElementExpr(Expr): + name: Optional[Token] + type: Optional[Expr] + + def accept(self, visitor: Expr.Visitor[T]) -> T: + return visitor.visit_schema_element_expr(self) diff --git a/parser/annotations.py b/parser/annotations.py new file mode 100644 index 0000000..4b1228d --- /dev/null +++ b/parser/annotations.py @@ -0,0 +1,64 @@ +from typing import Optional + +from core.ast.annotations import Expr, SchemaElementExpr, SchemaExpr, TypeExpr +from lexer.token import Token, TokenType +from parser.base import Parser +from parser.errors import ParsingError + + +class AnnotationParser(Parser): + SYNC_BOUNDARY: set[TokenType] = set() + + def parse(self) -> Optional[Expr]: + expression: Optional[Expr] = self.annotation() + if not self.is_at_end(): + self.error(self.peek(), "Extra tokens") + return expression + + def synchronize(self): + self.advance() + while not self.is_at_end(): + if self.peek().type in self.SYNC_BOUNDARY: + return + self.advance() + + def annotation(self) -> Optional[Expr]: + try: + return self.type() + except ParsingError: + self.synchronize() + return None + + def type(self) -> TypeExpr: + name: Token = self.consume(TokenType.IDENTIFIER, "Expected type identifier") + schema: Optional[SchemaExpr] = None + if self.match(TokenType.LEFT_BRACKET): + schema = self.schema() + return TypeExpr(name=name, schema=schema) + + def schema(self) -> SchemaExpr: + left: Token = self.previous() + elements: list[Expr] = [] + while not self.check(TokenType.RIGHT_BRACKET) and not self.is_at_end(): + elements.append(self.schema_element()) + if not self.check(TokenType.RIGHT_BRACKET): + self.consume(TokenType.COMMA, "Expected ',' between schema elements") + + right: Token = self.consume(TokenType.RIGHT_BRACKET, "Unclosed schema") + return SchemaExpr(left=left, elements=elements, right=right) + + def schema_element(self) -> Expr: + if self.match(TokenType.UNDERSCORE): + return SchemaElementExpr(name=None, type=None) + + if not self.check(TokenType.IDENTIFIER): + raise self.error(self.peek(), "Expected schema element") + + name: Optional[Token] = None + type: Optional[TypeExpr] = None + if self.check_next(TokenType.COLON): + name = self.advance() + self.advance() + if not self.match(TokenType.UNDERSCORE): + type = self.type() + return SchemaElementExpr(name=name, type=type) diff --git a/parser/base.py b/parser/base.py index 2195f72..74962db 100644 --- a/parser/base.py +++ b/parser/base.py @@ -54,7 +54,7 @@ class Parser(ABC, Generic[T]): ) self.current: int = 0 self.length: int = len(self.tokens) - self.errors: list[TokenError] + self.errors: list[TokenError] = [] def error(self, token: Token, message: str): """Record an error @@ -120,6 +120,26 @@ class Parser(ABC, Generic[T]): return False return self.peek().type == token_type + def check_next(self, token_type: TokenType) -> bool: + """Check whether the next token is of the given type + + This function always returns False if the parser is at the EOF token + + Args: + token_type (TokenType): the type of token to check + + Returns: + bool: True if the current token is of the given type and not EOF + """ + if self.is_at_end(): + return False + if self.current + 1 >= self.length: + return False + token: Token = self.tokens[self.current + 1] + if token.type == TokenType.EOF: + return False + return token.type == token_type + def advance(self) -> Token: """Consume and return the current token, if not at the EOF