123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589 |
- __all__ = ['Parser', 'ParserError']
- from .error import MarkedYAMLError
- from .tokens import *
- from .events import *
- from .scanner import *
- class ParserError(MarkedYAMLError):
- pass
- class Parser:
-
-
- DEFAULT_TAGS = {
- '!': '!',
- '!!': 'tag:yaml.org,2002:',
- }
- def __init__(self):
- self.current_event = None
- self.yaml_version = None
- self.tag_handles = {}
- self.states = []
- self.marks = []
- self.state = self.parse_stream_start
- def dispose(self):
-
- self.states = []
- self.state = None
- def check_event(self, *choices):
-
- if self.current_event is None:
- if self.state:
- self.current_event = self.state()
- if self.current_event is not None:
- if not choices:
- return True
- for choice in choices:
- if isinstance(self.current_event, choice):
- return True
- return False
- def peek_event(self):
-
- if self.current_event is None:
- if self.state:
- self.current_event = self.state()
- return self.current_event
- def get_event(self):
-
- if self.current_event is None:
- if self.state:
- self.current_event = self.state()
- value = self.current_event
- self.current_event = None
- return value
-
-
-
- def parse_stream_start(self):
-
- token = self.get_token()
- event = StreamStartEvent(token.start_mark, token.end_mark,
- encoding=token.encoding)
-
- self.state = self.parse_implicit_document_start
- return event
- def parse_implicit_document_start(self):
-
- if not self.check_token(DirectiveToken, DocumentStartToken,
- StreamEndToken):
- self.tag_handles = self.DEFAULT_TAGS
- token = self.peek_token()
- start_mark = end_mark = token.start_mark
- event = DocumentStartEvent(start_mark, end_mark,
- explicit=False)
-
- self.states.append(self.parse_document_end)
- self.state = self.parse_block_node
- return event
- else:
- return self.parse_document_start()
- def parse_document_start(self):
-
- while self.check_token(DocumentEndToken):
- self.get_token()
-
- if not self.check_token(StreamEndToken):
- token = self.peek_token()
- start_mark = token.start_mark
- version, tags = self.process_directives()
- if not self.check_token(DocumentStartToken):
- raise ParserError(None, None,
- "expected '<document start>', but found %r"
- % self.peek_token().id,
- self.peek_token().start_mark)
- token = self.get_token()
- end_mark = token.end_mark
- event = DocumentStartEvent(start_mark, end_mark,
- explicit=True, version=version, tags=tags)
- self.states.append(self.parse_document_end)
- self.state = self.parse_document_content
- else:
-
- token = self.get_token()
- event = StreamEndEvent(token.start_mark, token.end_mark)
- assert not self.states
- assert not self.marks
- self.state = None
- return event
- def parse_document_end(self):
-
- token = self.peek_token()
- start_mark = end_mark = token.start_mark
- explicit = False
- if self.check_token(DocumentEndToken):
- token = self.get_token()
- end_mark = token.end_mark
- explicit = True
- event = DocumentEndEvent(start_mark, end_mark,
- explicit=explicit)
-
- self.state = self.parse_document_start
- return event
- def parse_document_content(self):
- if self.check_token(DirectiveToken,
- DocumentStartToken, DocumentEndToken, StreamEndToken):
- event = self.process_empty_scalar(self.peek_token().start_mark)
- self.state = self.states.pop()
- return event
- else:
- return self.parse_block_node()
- def process_directives(self):
- self.yaml_version = None
- self.tag_handles = {}
- while self.check_token(DirectiveToken):
- token = self.get_token()
- if token.name == 'YAML':
- if self.yaml_version is not None:
- raise ParserError(None, None,
- "found duplicate YAML directive", token.start_mark)
- major, minor = token.value
- if major != 1:
- raise ParserError(None, None,
- "found incompatible YAML document (version 1.* is required)",
- token.start_mark)
- self.yaml_version = token.value
- elif token.name == 'TAG':
- handle, prefix = token.value
- if handle in self.tag_handles:
- raise ParserError(None, None,
- "duplicate tag handle %r" % handle,
- token.start_mark)
- self.tag_handles[handle] = prefix
- if self.tag_handles:
- value = self.yaml_version, self.tag_handles.copy()
- else:
- value = self.yaml_version, None
- for key in self.DEFAULT_TAGS:
- if key not in self.tag_handles:
- self.tag_handles[key] = self.DEFAULT_TAGS[key]
- return value
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- def parse_block_node(self):
- return self.parse_node(block=True)
- def parse_flow_node(self):
- return self.parse_node()
- def parse_block_node_or_indentless_sequence(self):
- return self.parse_node(block=True, indentless_sequence=True)
- def parse_node(self, block=False, indentless_sequence=False):
- if self.check_token(AliasToken):
- token = self.get_token()
- event = AliasEvent(token.value, token.start_mark, token.end_mark)
- self.state = self.states.pop()
- else:
- anchor = None
- tag = None
- start_mark = end_mark = tag_mark = None
- if self.check_token(AnchorToken):
- token = self.get_token()
- start_mark = token.start_mark
- end_mark = token.end_mark
- anchor = token.value
- if self.check_token(TagToken):
- token = self.get_token()
- tag_mark = token.start_mark
- end_mark = token.end_mark
- tag = token.value
- elif self.check_token(TagToken):
- token = self.get_token()
- start_mark = tag_mark = token.start_mark
- end_mark = token.end_mark
- tag = token.value
- if self.check_token(AnchorToken):
- token = self.get_token()
- end_mark = token.end_mark
- anchor = token.value
- if tag is not None:
- handle, suffix = tag
- if handle is not None:
- if handle not in self.tag_handles:
- raise ParserError("while parsing a node", start_mark,
- "found undefined tag handle %r" % handle,
- tag_mark)
- tag = self.tag_handles[handle]+suffix
- else:
- tag = suffix
-
-
-
-
- if start_mark is None:
- start_mark = end_mark = self.peek_token().start_mark
- event = None
- implicit = (tag is None or tag == '!')
- if indentless_sequence and self.check_token(BlockEntryToken):
- end_mark = self.peek_token().end_mark
- event = SequenceStartEvent(anchor, tag, implicit,
- start_mark, end_mark)
- self.state = self.parse_indentless_sequence_entry
- else:
- if self.check_token(ScalarToken):
- token = self.get_token()
- end_mark = token.end_mark
- if (token.plain and tag is None) or tag == '!':
- implicit = (True, False)
- elif tag is None:
- implicit = (False, True)
- else:
- implicit = (False, False)
- event = ScalarEvent(anchor, tag, implicit, token.value,
- start_mark, end_mark, style=token.style)
- self.state = self.states.pop()
- elif self.check_token(FlowSequenceStartToken):
- end_mark = self.peek_token().end_mark
- event = SequenceStartEvent(anchor, tag, implicit,
- start_mark, end_mark, flow_style=True)
- self.state = self.parse_flow_sequence_first_entry
- elif self.check_token(FlowMappingStartToken):
- end_mark = self.peek_token().end_mark
- event = MappingStartEvent(anchor, tag, implicit,
- start_mark, end_mark, flow_style=True)
- self.state = self.parse_flow_mapping_first_key
- elif block and self.check_token(BlockSequenceStartToken):
- end_mark = self.peek_token().start_mark
- event = SequenceStartEvent(anchor, tag, implicit,
- start_mark, end_mark, flow_style=False)
- self.state = self.parse_block_sequence_first_entry
- elif block and self.check_token(BlockMappingStartToken):
- end_mark = self.peek_token().start_mark
- event = MappingStartEvent(anchor, tag, implicit,
- start_mark, end_mark, flow_style=False)
- self.state = self.parse_block_mapping_first_key
- elif anchor is not None or tag is not None:
-
-
- event = ScalarEvent(anchor, tag, (implicit, False), '',
- start_mark, end_mark)
- self.state = self.states.pop()
- else:
- if block:
- node = 'block'
- else:
- node = 'flow'
- token = self.peek_token()
- raise ParserError("while parsing a %s node" % node, start_mark,
- "expected the node content, but found %r" % token.id,
- token.start_mark)
- return event
-
- def parse_block_sequence_first_entry(self):
- token = self.get_token()
- self.marks.append(token.start_mark)
- return self.parse_block_sequence_entry()
- def parse_block_sequence_entry(self):
- if self.check_token(BlockEntryToken):
- token = self.get_token()
- if not self.check_token(BlockEntryToken, BlockEndToken):
- self.states.append(self.parse_block_sequence_entry)
- return self.parse_block_node()
- else:
- self.state = self.parse_block_sequence_entry
- return self.process_empty_scalar(token.end_mark)
- if not self.check_token(BlockEndToken):
- token = self.peek_token()
- raise ParserError("while parsing a block collection", self.marks[-1],
- "expected <block end>, but found %r" % token.id, token.start_mark)
- token = self.get_token()
- event = SequenceEndEvent(token.start_mark, token.end_mark)
- self.state = self.states.pop()
- self.marks.pop()
- return event
-
- def parse_indentless_sequence_entry(self):
- if self.check_token(BlockEntryToken):
- token = self.get_token()
- if not self.check_token(BlockEntryToken,
- KeyToken, ValueToken, BlockEndToken):
- self.states.append(self.parse_indentless_sequence_entry)
- return self.parse_block_node()
- else:
- self.state = self.parse_indentless_sequence_entry
- return self.process_empty_scalar(token.end_mark)
- token = self.peek_token()
- event = SequenceEndEvent(token.start_mark, token.start_mark)
- self.state = self.states.pop()
- return event
-
-
-
-
- def parse_block_mapping_first_key(self):
- token = self.get_token()
- self.marks.append(token.start_mark)
- return self.parse_block_mapping_key()
- def parse_block_mapping_key(self):
- if self.check_token(KeyToken):
- token = self.get_token()
- if not self.check_token(KeyToken, ValueToken, BlockEndToken):
- self.states.append(self.parse_block_mapping_value)
- return self.parse_block_node_or_indentless_sequence()
- else:
- self.state = self.parse_block_mapping_value
- return self.process_empty_scalar(token.end_mark)
- if not self.check_token(BlockEndToken):
- token = self.peek_token()
- raise ParserError("while parsing a block mapping", self.marks[-1],
- "expected <block end>, but found %r" % token.id, token.start_mark)
- token = self.get_token()
- event = MappingEndEvent(token.start_mark, token.end_mark)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- def parse_block_mapping_value(self):
- if self.check_token(ValueToken):
- token = self.get_token()
- if not self.check_token(KeyToken, ValueToken, BlockEndToken):
- self.states.append(self.parse_block_mapping_key)
- return self.parse_block_node_or_indentless_sequence()
- else:
- self.state = self.parse_block_mapping_key
- return self.process_empty_scalar(token.end_mark)
- else:
- self.state = self.parse_block_mapping_key
- token = self.peek_token()
- return self.process_empty_scalar(token.start_mark)
-
-
-
-
-
-
-
-
-
-
- def parse_flow_sequence_first_entry(self):
- token = self.get_token()
- self.marks.append(token.start_mark)
- return self.parse_flow_sequence_entry(first=True)
- def parse_flow_sequence_entry(self, first=False):
- if not self.check_token(FlowSequenceEndToken):
- if not first:
- if self.check_token(FlowEntryToken):
- self.get_token()
- else:
- token = self.peek_token()
- raise ParserError("while parsing a flow sequence", self.marks[-1],
- "expected ',' or ']', but got %r" % token.id, token.start_mark)
-
- if self.check_token(KeyToken):
- token = self.peek_token()
- event = MappingStartEvent(None, None, True,
- token.start_mark, token.end_mark,
- flow_style=True)
- self.state = self.parse_flow_sequence_entry_mapping_key
- return event
- elif not self.check_token(FlowSequenceEndToken):
- self.states.append(self.parse_flow_sequence_entry)
- return self.parse_flow_node()
- token = self.get_token()
- event = SequenceEndEvent(token.start_mark, token.end_mark)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- def parse_flow_sequence_entry_mapping_key(self):
- token = self.get_token()
- if not self.check_token(ValueToken,
- FlowEntryToken, FlowSequenceEndToken):
- self.states.append(self.parse_flow_sequence_entry_mapping_value)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_sequence_entry_mapping_value
- return self.process_empty_scalar(token.end_mark)
- def parse_flow_sequence_entry_mapping_value(self):
- if self.check_token(ValueToken):
- token = self.get_token()
- if not self.check_token(FlowEntryToken, FlowSequenceEndToken):
- self.states.append(self.parse_flow_sequence_entry_mapping_end)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_sequence_entry_mapping_end
- return self.process_empty_scalar(token.end_mark)
- else:
- self.state = self.parse_flow_sequence_entry_mapping_end
- token = self.peek_token()
- return self.process_empty_scalar(token.start_mark)
- def parse_flow_sequence_entry_mapping_end(self):
- self.state = self.parse_flow_sequence_entry
- token = self.peek_token()
- return MappingEndEvent(token.start_mark, token.start_mark)
-
-
-
-
-
- def parse_flow_mapping_first_key(self):
- token = self.get_token()
- self.marks.append(token.start_mark)
- return self.parse_flow_mapping_key(first=True)
- def parse_flow_mapping_key(self, first=False):
- if not self.check_token(FlowMappingEndToken):
- if not first:
- if self.check_token(FlowEntryToken):
- self.get_token()
- else:
- token = self.peek_token()
- raise ParserError("while parsing a flow mapping", self.marks[-1],
- "expected ',' or '}', but got %r" % token.id, token.start_mark)
- if self.check_token(KeyToken):
- token = self.get_token()
- if not self.check_token(ValueToken,
- FlowEntryToken, FlowMappingEndToken):
- self.states.append(self.parse_flow_mapping_value)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_mapping_value
- return self.process_empty_scalar(token.end_mark)
- elif not self.check_token(FlowMappingEndToken):
- self.states.append(self.parse_flow_mapping_empty_value)
- return self.parse_flow_node()
- token = self.get_token()
- event = MappingEndEvent(token.start_mark, token.end_mark)
- self.state = self.states.pop()
- self.marks.pop()
- return event
- def parse_flow_mapping_value(self):
- if self.check_token(ValueToken):
- token = self.get_token()
- if not self.check_token(FlowEntryToken, FlowMappingEndToken):
- self.states.append(self.parse_flow_mapping_key)
- return self.parse_flow_node()
- else:
- self.state = self.parse_flow_mapping_key
- return self.process_empty_scalar(token.end_mark)
- else:
- self.state = self.parse_flow_mapping_key
- token = self.peek_token()
- return self.process_empty_scalar(token.start_mark)
- def parse_flow_mapping_empty_value(self):
- self.state = self.parse_flow_mapping_key
- return self.process_empty_scalar(self.peek_token().start_mark)
- def process_empty_scalar(self, mark):
- return ScalarEvent(None, None, (True, False), '', mark, mark)
|