Make compose() and load() ensure that the input stream contains a single document...
[pyyaml/python3.git] / lib / yaml / composer.py
blob06e5ac782f1ade899fe0f7e3c4f754784e498efe
2 __all__ = ['Composer', 'ComposerError']
4 from error import MarkedYAMLError
5 from events import *
6 from nodes import *
8 class ComposerError(MarkedYAMLError):
9 pass
11 class Composer(object):
13 def __init__(self):
14 self.anchors = {}
16 def check_node(self):
17 # Drop the STREAM-START event.
18 if self.check_event(StreamStartEvent):
19 self.get_event()
21 # If there are more documents available?
22 return not self.check_event(StreamEndEvent)
24 def get_node(self):
25 # Get the root node of the next document.
26 if not self.check_event(StreamEndEvent):
27 return self.compose_document()
29 def get_single_node(self):
30 # Drop the STREAM-START event.
31 self.get_event()
33 # Compose a document if the stream is not empty.
34 document = None
35 if not self.check_event(StreamEndEvent):
36 document = self.compose_document()
38 # Ensure that the stream contains no more documents.
39 if not self.check_event(StreamEndEvent):
40 event = self.get_event()
41 raise ComposerError("expected a single document in the stream",
42 document.start_mark, "but found another document",
43 event.start_mark)
45 # Drop the STREAM-END event.
46 self.get_event()
48 return document
50 def compose_document(self):
51 # Drop the DOCUMENT-START event.
52 self.get_event()
54 # Compose the root node.
55 node = self.compose_node(None, None)
57 # Drop the DOCUMENT-END event.
58 self.get_event()
60 self.anchors = {}
61 return node
63 def compose_node(self, parent, index):
64 if self.check_event(AliasEvent):
65 event = self.get_event()
66 anchor = event.anchor
67 if anchor not in self.anchors:
68 raise ComposerError(None, None, "found undefined alias %r"
69 % anchor.encode('utf-8'), event.start_mark)
70 return self.anchors[anchor]
71 event = self.peek_event()
72 anchor = event.anchor
73 if anchor is not None:
74 if anchor in self.anchors:
75 raise ComposerError("found duplicate anchor %r; first occurence"
76 % anchor.encode('utf-8'), self.anchors[anchor].start_mark,
77 "second occurence", event.start_mark)
78 self.descend_resolver(parent, index)
79 if self.check_event(ScalarEvent):
80 node = self.compose_scalar_node(anchor)
81 elif self.check_event(SequenceStartEvent):
82 node = self.compose_sequence_node(anchor)
83 elif self.check_event(MappingStartEvent):
84 node = self.compose_mapping_node(anchor)
85 self.ascend_resolver()
86 return node
88 def compose_scalar_node(self, anchor):
89 event = self.get_event()
90 tag = event.tag
91 if tag is None or tag == u'!':
92 tag = self.resolve(ScalarNode, event.value, event.implicit)
93 node = ScalarNode(tag, event.value,
94 event.start_mark, event.end_mark, style=event.style)
95 if anchor is not None:
96 self.anchors[anchor] = node
97 return node
99 def compose_sequence_node(self, anchor):
100 start_event = self.get_event()
101 tag = start_event.tag
102 if tag is None or tag == u'!':
103 tag = self.resolve(SequenceNode, None, start_event.implicit)
104 node = SequenceNode(tag, [],
105 start_event.start_mark, None,
106 flow_style=start_event.flow_style)
107 if anchor is not None:
108 self.anchors[anchor] = node
109 index = 0
110 while not self.check_event(SequenceEndEvent):
111 node.value.append(self.compose_node(node, index))
112 index += 1
113 end_event = self.get_event()
114 node.end_mark = end_event.end_mark
115 return node
117 def compose_mapping_node(self, anchor):
118 start_event = self.get_event()
119 tag = start_event.tag
120 if tag is None or tag == u'!':
121 tag = self.resolve(MappingNode, None, start_event.implicit)
122 node = MappingNode(tag, [],
123 start_event.start_mark, None,
124 flow_style=start_event.flow_style)
125 if anchor is not None:
126 self.anchors[anchor] = node
127 while not self.check_event(MappingEndEvent):
128 #key_event = self.peek_event()
129 item_key = self.compose_node(node, None)
130 #if item_key in node.value:
131 # raise ComposerError("while composing a mapping", start_event.start_mark,
132 # "found duplicate key", key_event.start_mark)
133 item_value = self.compose_node(node, item_key)
134 #node.value[item_key] = item_value
135 node.value.append((item_key, item_value))
136 end_event = self.get_event()
137 node.end_mark = end_event.end_mark
138 return node