Source code for pyLambdaFlows.tree

import pickle
from .utils import isIterable



[docs]class Tree(): """Intern class to create json from input data and computational graph. This class is for an intern purpose. It create a lambda instantiation graph with all dependancies in order to create the final json for AWS API call. """
[docs] def __init__(self, target): """Default constructor. This constructor requires a target pyLambdaflow node to deal with. This node will be concidered as the leaf node. :param pyLambdaFlows.Operation target: The node that will be treated. """ self.depth = 0 self.max_idx = 0 self.target = target self.aws_functions = set() self.bottoms = list() self.treated = dict()
[docs] def compute(self, feed_dict): """Intern method to create the dependancies graph with a feed dict. This method applies a BFS on the computational graph in order to figure out the lambda instantiation number per OP. :param dict feed_dict: A dict with all datas. PyLambdaFlow as key and list as value. :raises: RuntimeError: If missing values on the feed_dict. """ self.aws_functions = set() self.bottoms = list() self.treated = dict() self.curr_idx = 0 self._dfs(self.target, feed_dict) self.max_idx = self.curr_idx
def _dfs(self, node, feed_dict): """Intern dfs recursive method. This method is for intern purpose only. It go through all the given computing graph in order to determine the lambda instantiation number per layer. Firstly, this method go into the deepiest elements on the graph and rollout and calculate at eatch step the lambda instance required using the dispenser op function. :param node: Recursive method. :param dict feed_dict: Data dictionnary. """ if node.parent is None: # We are in a leaf if not node in feed_dict: raise RuntimeError("Some source op don't have provided values !") lambda_list = list() for element in feed_dict[node]: lambda_list.append(InstanceNode(node.funct, pickle.dumps(element).hex(), self.curr_idx, None)) self.curr_idx += 1 self.treated[node] = lambda_list return for parent in node.parent: if not parent in self.treated: self._dfs(parent, feed_dict) dependencies = list() for idx, parent in enumerate(node.parent): dependencies.append(node.dispenser[idx](len(self.treated[parent]))) if len(set(map(lambda x: len(x), dependencies)))>1: raise RuntimeError("Node get multiple parent with differents dim !") lambda_list = list() # If we have only one parent, we will have a 1dim data field if(len(dependencies)==0): raise RuntimeError("Intern Eror") elif(len(dependencies)==1): dependencies = dependencies[0] for dep in dependencies: curr_parents = list() for element in dep: curr_parents.append(self.treated[node.parent[0]][element]) lambda_list.append(InstanceNode(node.funct, None, self.curr_idx, curr_parents)) self.curr_idx+=1 # If we have mult-parent, we shall get a 2 dim data field else: for dep in zip(*dependencies): curr_parents = list() for i in range(len(dep)): sub_curr_parents= list() for sub in dep[i]: sub_curr_parents.append(self.treated[node.parent[i]][sub]) curr_parents.append(sub_curr_parents) lambda_list.append(InstanceNode(node.funct, None, self.curr_idx, curr_parents)) self.curr_idx+=1 self.treated[node] = lambda_list
[docs] def getNode(self, idx): """Return the associeted function to the given index. :return: The associeted pyLambdaflow operator. """ for key, items in self.treated.items(): if len(list(filter(lambda x: x.idx==int(idx), items)))>0: return key return None
[docs] def generateJson(self, tableName="None"): """Generate the request json. This function compute the json with all children dependancies using a BFS algorithm and the precomputed lambda instance graph (compute method). :param tableName: :return: the json """ jsonData = dict() BFS_queue = [self.target] while len(BFS_queue)!=0: curr_node = BFS_queue[0] del BFS_queue[0] if not curr_node.parent is None: for par in curr_node.parent: BFS_queue.append(par) for element in self.treated[curr_node]: curr_json = dict() curr_json["idx"] = str(element.idx) curr_json["func"] = curr_node.aws_lambda_name curr_json["children"] = element.childrenJson curr_json["data"] = list() curr_json["table"] = tableName if not element.parents is None: curr_json["source"] = "data" # TODO python use ref so we can gather those loop but I have to be sure for parent in element.parents: if isIterable(parent): curr_json["data"].append(list(map(lambda element: str(element.idx),parent))) else: curr_json["data"].append(str(parent.idx)) for parent in element.parents: if isIterable(parent): for sub_parent in parent: sub_parent.add_children_data(str(element.idx), curr_json) else: parent.add_children_data(str(element.idx), curr_json) else: curr_json["source"] = "direct" curr_json["data"].append(element.args) jsonData[element.idx] = curr_json return jsonData
[docs] def gen_counter_values(self): """Return the parent number of each lambda instance. This method compute the parents number for each lambda instance using the precomputed graph representation. :return: A list containing the parent number for the specific list index. """ result = [0,]*self.curr_idx for elements_list in self.treated.values(): for element in elements_list: if not element.parents is None : if isIterable(element.parents[0]): result[element.idx] = 0 for sub_parents in element.parents: result[element.idx] += len(set(sub_parents)) else: result[element.idx] = len(set(element.parents)) return result
[docs] def getResultIdx(self): """Return all lambda index associeted to the target op. :return: List of index associeted to the final op. """ return [ element.idx for element in self.treated[self.target] ]
class InstanceNode(): def __init__(self, funct, args, idx, parents=None): self.funct = funct # Path function self.args = args # Equals data if root or None otherwise self.idx = idx # idx self.parents = parents self.childrenJson = dict() def add_children_data(self, idx, json): self.childrenJson[idx] = json