summaryrefslogtreecommitdiff
path: root/lib/spack/external/ruamel/yaml/composer.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/spack/external/ruamel/yaml/composer.py')
-rw-r--r--lib/spack/external/ruamel/yaml/composer.py182
1 files changed, 182 insertions, 0 deletions
diff --git a/lib/spack/external/ruamel/yaml/composer.py b/lib/spack/external/ruamel/yaml/composer.py
new file mode 100644
index 0000000000..fb0a55c759
--- /dev/null
+++ b/lib/spack/external/ruamel/yaml/composer.py
@@ -0,0 +1,182 @@
+# coding: utf-8
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+
+try:
+ from .error import MarkedYAMLError
+ from .compat import utf8
+except (ImportError, ValueError): # for Jython
+ from ruamel.yaml.error import MarkedYAMLError
+ from ruamel.yaml.compat import utf8
+
+from ruamel.yaml.events import (
+ StreamStartEvent, StreamEndEvent, MappingStartEvent, MappingEndEvent,
+ SequenceStartEvent, SequenceEndEvent, AliasEvent, ScalarEvent,
+)
+from ruamel.yaml.nodes import (
+ MappingNode, ScalarNode, SequenceNode,
+)
+
+__all__ = ['Composer', 'ComposerError']
+
+
+class ComposerError(MarkedYAMLError):
+ pass
+
+
+class Composer(object):
+ def __init__(self):
+ self.anchors = {}
+
+ def check_node(self):
+ # Drop the STREAM-START event.
+ if self.check_event(StreamStartEvent):
+ self.get_event()
+
+ # If there are more documents available?
+ return not self.check_event(StreamEndEvent)
+
+ def get_node(self):
+ # Get the root node of the next document.
+ if not self.check_event(StreamEndEvent):
+ return self.compose_document()
+
+ def get_single_node(self):
+ # Drop the STREAM-START event.
+ self.get_event()
+
+ # Compose a document if the stream is not empty.
+ document = None
+ if not self.check_event(StreamEndEvent):
+ document = self.compose_document()
+
+ # Ensure that the stream contains no more documents.
+ if not self.check_event(StreamEndEvent):
+ event = self.get_event()
+ raise ComposerError(
+ "expected a single document in the stream",
+ document.start_mark, "but found another document",
+ event.start_mark)
+
+ # Drop the STREAM-END event.
+ self.get_event()
+
+ return document
+
+ def compose_document(self):
+ # Drop the DOCUMENT-START event.
+ self.get_event()
+
+ # Compose the root node.
+ node = self.compose_node(None, None)
+
+ # Drop the DOCUMENT-END event.
+ self.get_event()
+
+ self.anchors = {}
+ return node
+
+ def compose_node(self, parent, index):
+ if self.check_event(AliasEvent):
+ event = self.get_event()
+ alias = event.anchor
+ if alias not in self.anchors:
+ raise ComposerError(
+ None, None, "found undefined alias %r"
+ % utf8(alias), event.start_mark)
+ return self.anchors[alias]
+ event = self.peek_event()
+ anchor = event.anchor
+ if anchor is not None: # have an anchor
+ if anchor in self.anchors:
+ raise ComposerError(
+ "found duplicate anchor %r; first occurence"
+ % utf8(anchor), self.anchors[anchor].start_mark,
+ "second occurence", event.start_mark)
+ self.descend_resolver(parent, index)
+ if self.check_event(ScalarEvent):
+ node = self.compose_scalar_node(anchor)
+ elif self.check_event(SequenceStartEvent):
+ node = self.compose_sequence_node(anchor)
+ elif self.check_event(MappingStartEvent):
+ node = self.compose_mapping_node(anchor)
+ self.ascend_resolver()
+ return node
+
+ def compose_scalar_node(self, anchor):
+ event = self.get_event()
+ tag = event.tag
+ if tag is None or tag == u'!':
+ tag = self.resolve(ScalarNode, event.value, event.implicit)
+ node = ScalarNode(tag, event.value,
+ event.start_mark, event.end_mark, style=event.style,
+ comment=event.comment)
+ if anchor is not None:
+ self.anchors[anchor] = node
+ return node
+
+ def compose_sequence_node(self, anchor):
+ start_event = self.get_event()
+ tag = start_event.tag
+ if tag is None or tag == u'!':
+ tag = self.resolve(SequenceNode, None, start_event.implicit)
+ node = SequenceNode(tag, [],
+ start_event.start_mark, None,
+ flow_style=start_event.flow_style,
+ comment=start_event.comment, anchor=anchor)
+ if anchor is not None:
+ self.anchors[anchor] = node
+ index = 0
+ while not self.check_event(SequenceEndEvent):
+ node.value.append(self.compose_node(node, index))
+ index += 1
+ end_event = self.get_event()
+ if node.flow_style is True and end_event.comment is not None:
+ if node.comment is not None:
+ print('Warning: unexpected end_event commment in sequence '
+ 'node {0}'.format(node.flow_style))
+ node.comment = end_event.comment
+ node.end_mark = end_event.end_mark
+ self.check_end_doc_comment(end_event, node)
+ return node
+
+ def compose_mapping_node(self, anchor):
+ start_event = self.get_event()
+ tag = start_event.tag
+ if tag is None or tag == u'!':
+ tag = self.resolve(MappingNode, None, start_event.implicit)
+ node = MappingNode(tag, [],
+ start_event.start_mark, None,
+ flow_style=start_event.flow_style,
+ comment=start_event.comment, anchor=anchor)
+ if anchor is not None:
+ self.anchors[anchor] = node
+ while not self.check_event(MappingEndEvent):
+ # key_event = self.peek_event()
+ item_key = self.compose_node(node, None)
+ # if item_key in node.value:
+ # raise ComposerError("while composing a mapping",
+ # start_event.start_mark,
+ # "found duplicate key", key_event.start_mark)
+ item_value = self.compose_node(node, item_key)
+ # node.value[item_key] = item_value
+ node.value.append((item_key, item_value))
+ end_event = self.get_event()
+ if node.flow_style is True and end_event.comment is not None:
+ node.comment = end_event.comment
+ node.end_mark = end_event.end_mark
+ self.check_end_doc_comment(end_event, node)
+ return node
+
+ def check_end_doc_comment(self, end_event, node):
+ if end_event.comment and end_event.comment[1]:
+ # pre comments on an end_event, no following to move to
+ if node.comment is None:
+ node.comment = [None, None]
+ assert not isinstance(node, ScalarEvent)
+ # this is a post comment on a mapping node, add as third element
+ # in the list
+ node.comment.append(end_event.comment[1])
+ end_event.comment[1] = None