diskimage-builder/diskimage_builder/graph/digraph.py

195 lines
6.1 KiB
Python
Raw Normal View History

# Copyright 2016 Andreas Florath (andreas@florath.net)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
class Digraph(object):
"""Implements a directed graph.
Each node of the digraph must have a unique name.
"""
class Node(object):
"""Directed graph node.
This holds the incoming and outgoing edges as well as the
nodes' name.
"""
def __init__(self, name):
"""Initializes a node.
Incoming and outgoing are lists of nodes. Typically one
direction is provided and the other can be automatically
computed.
"""
self.__name = name
self.__incoming = set()
self.__outgoing = set()
def get_name(self):
"""Returns the name of the node."""
return self.__name
def add_incoming(self, node):
"""Add node to the incoming list."""
self.__incoming.add(node)
def add_outgoing(self, node):
"""Add node to the incoming list."""
self.__outgoing.add(node)
def get_iter_outgoing(self):
"""Return an iterator over the outgoing nodes."""
return iter(self.__outgoing)
@staticmethod
def __as_named_list(inlist):
"""Return given list as list of names."""
return map(lambda x: x.get_name(), inlist)
def get_outgoing_as_named_list(self):
"""Return the names of all outgoing nodes as a list."""
return self.__as_named_list(self.__outgoing)
def __init__(self):
"""Create a empty digraph."""
self._named_nodes = {}
def create_from_dict(self, init_dgraph, node_gen_func=Node):
"""Creates a new digraph based on the given information."""
# First run: create all nodes
for node_name in init_dgraph:
# Create the node and put it into the object list of all
# nodes and into the local dictionary of named nodes.
named_node = node_gen_func(node_name)
self.add_node(named_node)
# Second run: run through all nodes and create the edges.
for node_name, outs in init_dgraph.items():
node_from = self.find(node_name)
for onode in outs:
node_to = self.find(onode)
if node_to is None:
raise RuntimeError("Node '%s' is referenced "
"but not specified" % onode)
self.create_edge(node_from, node_to)
def add_node(self, anode):
"""Adds a new node to the graph.
Checks if the node with the same name already exists.
"""
assert issubclass(anode.__class__, Digraph.Node)
for node in self._named_nodes.values():
if node.get_name() == anode.get_name():
raise RuntimeError("Node with name [%s] already "
"exists" % node.get_name())
self._named_nodes[anode.get_name()] = anode
def create_edge(self, anode, bnode):
"""Creates an edge from a to b - both must be nodes."""
assert issubclass(anode.__class__, Digraph.Node)
assert issubclass(bnode.__class__, Digraph.Node)
assert anode.get_name() in self._named_nodes.keys()
assert anode == self._named_nodes[anode.get_name()]
assert bnode.get_name() in self._named_nodes.keys()
assert bnode == self._named_nodes[bnode.get_name()]
anode.add_outgoing(bnode)
bnode.add_incoming(anode)
def get_iter_nodes_values(self):
"""Returns the nodes dict to the values.
Note: it is not possible to change things with the help of the
result of this function.
"""
return iter(self._named_nodes.values())
def find(self, name):
"""Get the node with the given name.
Return None if not available.
"""
if name not in self._named_nodes:
return None
return self._named_nodes[name]
def as_dict(self):
"""Outputs this digraph and create a dictionary."""
# Start with an empty dictionary
rval = {}
for node in self._named_nodes.values():
rval[node.get_name()] = node.get_outgoing_as_named_list()
return rval
def topological_sort(dg):
"""Digraph topological search.
This algorithm is based upon a depth first search with
'making' some special nodes.
The result is the topological sorted list of nodes.
"""
# List of topological sorted nodes
tsort = []
# List of nodes already visited.
# (This is held here - local to the algorithm - to not modify the
# nodes themselves.)
visited = []
def visit(node):
"""Recursive deep first search function."""
if node not in visited:
visited.append(node)
for onode in node.get_iter_outgoing():
visit(onode)
tsort.insert(0, node)
# The 'main' function of the topological sort
for node in dg.get_iter_nodes_values():
visit(node)
return tsort
# Utility functions
def digraph_create_from_dict(init_dgraph, node_gen_func=Digraph.Node):
"""Creates a new digraph based on the given information."""
digraph = Digraph()
digraph.create_from_dict(init_dgraph, node_gen_func)
return digraph
def node_list_to_node_name_list(node_list):
"""Converts a node list into a list of the corresponding node names."""
node_name_list = []
for n in node_list:
node_name_list.append(n.get_name())
return node_name_list