feat(parser): add a basic annotation parser
This commit is contained in:
53
core/ast/annotations.py
Normal file
53
core/ast/annotations.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Generic, Optional, TypeVar
|
||||||
|
|
||||||
|
from lexer.token import Token
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Expr(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def accept(self, visitor: Visitor[T]) -> T: ...
|
||||||
|
|
||||||
|
class Visitor(ABC, Generic[T]):
|
||||||
|
@abstractmethod
|
||||||
|
def visit_type_expr(self, expr: TypeExpr) -> T: ...
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def visit_schema_expr(self, expr: SchemaExpr) -> T: ...
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def visit_schema_element_expr(self, expr: SchemaElementExpr) -> T: ...
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class TypeExpr(Expr):
|
||||||
|
name: Token
|
||||||
|
schema: Optional[SchemaExpr]
|
||||||
|
|
||||||
|
def accept(self, visitor: Expr.Visitor[T]) -> T:
|
||||||
|
return visitor.visit_type_expr(self)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SchemaExpr(Expr):
|
||||||
|
left: Token
|
||||||
|
elements: list[Expr]
|
||||||
|
right: Token
|
||||||
|
|
||||||
|
def accept(self, visitor: Expr.Visitor[T]) -> T:
|
||||||
|
return visitor.visit_schema_expr(self)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SchemaElementExpr(Expr):
|
||||||
|
name: Optional[Token]
|
||||||
|
type: Optional[Expr]
|
||||||
|
|
||||||
|
def accept(self, visitor: Expr.Visitor[T]) -> T:
|
||||||
|
return visitor.visit_schema_element_expr(self)
|
||||||
64
parser/annotations.py
Normal file
64
parser/annotations.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from core.ast.annotations import Expr, SchemaElementExpr, SchemaExpr, TypeExpr
|
||||||
|
from lexer.token import Token, TokenType
|
||||||
|
from parser.base import Parser
|
||||||
|
from parser.errors import ParsingError
|
||||||
|
|
||||||
|
|
||||||
|
class AnnotationParser(Parser):
|
||||||
|
SYNC_BOUNDARY: set[TokenType] = set()
|
||||||
|
|
||||||
|
def parse(self) -> Optional[Expr]:
|
||||||
|
expression: Optional[Expr] = self.annotation()
|
||||||
|
if not self.is_at_end():
|
||||||
|
self.error(self.peek(), "Extra tokens")
|
||||||
|
return expression
|
||||||
|
|
||||||
|
def synchronize(self):
|
||||||
|
self.advance()
|
||||||
|
while not self.is_at_end():
|
||||||
|
if self.peek().type in self.SYNC_BOUNDARY:
|
||||||
|
return
|
||||||
|
self.advance()
|
||||||
|
|
||||||
|
def annotation(self) -> Optional[Expr]:
|
||||||
|
try:
|
||||||
|
return self.type()
|
||||||
|
except ParsingError:
|
||||||
|
self.synchronize()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def type(self) -> TypeExpr:
|
||||||
|
name: Token = self.consume(TokenType.IDENTIFIER, "Expected type identifier")
|
||||||
|
schema: Optional[SchemaExpr] = None
|
||||||
|
if self.match(TokenType.LEFT_BRACKET):
|
||||||
|
schema = self.schema()
|
||||||
|
return TypeExpr(name=name, schema=schema)
|
||||||
|
|
||||||
|
def schema(self) -> SchemaExpr:
|
||||||
|
left: Token = self.previous()
|
||||||
|
elements: list[Expr] = []
|
||||||
|
while not self.check(TokenType.RIGHT_BRACKET) and not self.is_at_end():
|
||||||
|
elements.append(self.schema_element())
|
||||||
|
if not self.check(TokenType.RIGHT_BRACKET):
|
||||||
|
self.consume(TokenType.COMMA, "Expected ',' between schema elements")
|
||||||
|
|
||||||
|
right: Token = self.consume(TokenType.RIGHT_BRACKET, "Unclosed schema")
|
||||||
|
return SchemaExpr(left=left, elements=elements, right=right)
|
||||||
|
|
||||||
|
def schema_element(self) -> Expr:
|
||||||
|
if self.match(TokenType.UNDERSCORE):
|
||||||
|
return SchemaElementExpr(name=None, type=None)
|
||||||
|
|
||||||
|
if not self.check(TokenType.IDENTIFIER):
|
||||||
|
raise self.error(self.peek(), "Expected schema element")
|
||||||
|
|
||||||
|
name: Optional[Token] = None
|
||||||
|
type: Optional[TypeExpr] = None
|
||||||
|
if self.check_next(TokenType.COLON):
|
||||||
|
name = self.advance()
|
||||||
|
self.advance()
|
||||||
|
if not self.match(TokenType.UNDERSCORE):
|
||||||
|
type = self.type()
|
||||||
|
return SchemaElementExpr(name=name, type=type)
|
||||||
@@ -54,7 +54,7 @@ class Parser(ABC, Generic[T]):
|
|||||||
)
|
)
|
||||||
self.current: int = 0
|
self.current: int = 0
|
||||||
self.length: int = len(self.tokens)
|
self.length: int = len(self.tokens)
|
||||||
self.errors: list[TokenError]
|
self.errors: list[TokenError] = []
|
||||||
|
|
||||||
def error(self, token: Token, message: str):
|
def error(self, token: Token, message: str):
|
||||||
"""Record an error
|
"""Record an error
|
||||||
@@ -120,6 +120,26 @@ class Parser(ABC, Generic[T]):
|
|||||||
return False
|
return False
|
||||||
return self.peek().type == token_type
|
return self.peek().type == token_type
|
||||||
|
|
||||||
|
def check_next(self, token_type: TokenType) -> bool:
|
||||||
|
"""Check whether the next token is of the given type
|
||||||
|
|
||||||
|
This function always returns False if the parser is at the EOF token
|
||||||
|
|
||||||
|
Args:
|
||||||
|
token_type (TokenType): the type of token to check
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the current token is of the given type and not EOF
|
||||||
|
"""
|
||||||
|
if self.is_at_end():
|
||||||
|
return False
|
||||||
|
if self.current + 1 >= self.length:
|
||||||
|
return False
|
||||||
|
token: Token = self.tokens[self.current + 1]
|
||||||
|
if token.type == TokenType.EOF:
|
||||||
|
return False
|
||||||
|
return token.type == token_type
|
||||||
|
|
||||||
def advance(self) -> Token:
|
def advance(self) -> Token:
|
||||||
"""Consume and return the current token, if not at the EOF
|
"""Consume and return the current token, if not at the EOF
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user