Module sinbadflow.element

Base building block of pipelines

Expand source code
'''Base building block of pipelines'''
from .utils import Trigger
from forbiddenfruit import curse

def rshift_list(self, elem):
    '''Override list __rshift__ method with the __rshift__ functionality using forbiddenfruit library'''
    return Element(self) >> elem
curse(list, "__rshift__", rshift_list)


class Element():
    '''Base component of BaseAgent which are executed by Sinbadflow. Element object follows a linked list implementation
    with __rshift__ override for connection to one another.

    Args:
        data: string - payload of the element (notebook path)
        trigger: Trigger - element run trigger (see supported triggers in Status IntEnum)

    Usage example:
    
        For pipeline creation use ">>" symbols between the elements
            pipeline = Element() >> Element()

        For parallel run use list of BaseAgents followed by ">>" symbol
            pipeline = [Element(),Element()] >> Element()

        For pipeline concatenation use the same ">>" symbol
            pipeline_x >> pipeline_y
    '''

    def __init__(self, data, trigger=Trigger.DEFAULT):
        self.data = data
        self.trigger = trigger
        self.next_elem = None
        self.prev_elem = None

    def __wrap_element(self, elem):
        # elem >> [elem] case
        if type(elem) == list:
            return Element(elem)
        #[elem] >> elem
        elif type(elem.data) == list:
            return elem
        #elem >> elem
        return Element([elem])

    def __get_head_element(self, elem):
        while elem.prev_elem is not None:
            elem = elem.prev_elem
        return elem

    def __rshift__(self, elem):
        # This is the first elem in pipeline, let's wrap you (head)
        if self.prev_elem == None:
            self = self.__wrap_element(self)
        # Wrap the elem object for single/parallel use, get the head
        wrapped_elem = self.__get_head_element(self.__wrap_element(elem))
        self.next_elem = wrapped_elem
        wrapped_elem.prev_elem = self
        return wrapped_elem

Functions

def rshift_list(self, elem)

Override list rshift method with the rshift functionality using forbiddenfruit library

Expand source code
def rshift_list(self, elem):
    '''Override list __rshift__ method with the __rshift__ functionality using forbiddenfruit library'''
    return Element(self) >> elem

Classes

class Element (data, trigger=Trigger.DEFAULT)

Base component of BaseAgent which are executed by Sinbadflow. Element object follows a linked list implementation with rshift override for connection to one another.

Args

data
string - payload of the element (notebook path)
trigger
Trigger - element run trigger (see supported triggers in Status IntEnum)

Usage example:

For pipeline creation use ">>" symbols between the elements
    pipeline = Element() >> Element()

For parallel run use list of BaseAgents followed by ">>" symbol
    pipeline = [Element(),Element()] >> Element()

For pipeline concatenation use the same ">>" symbol
    pipeline_x >> pipeline_y
Expand source code
class Element():
    '''Base component of BaseAgent which are executed by Sinbadflow. Element object follows a linked list implementation
    with __rshift__ override for connection to one another.

    Args:
        data: string - payload of the element (notebook path)
        trigger: Trigger - element run trigger (see supported triggers in Status IntEnum)

    Usage example:
    
        For pipeline creation use ">>" symbols between the elements
            pipeline = Element() >> Element()

        For parallel run use list of BaseAgents followed by ">>" symbol
            pipeline = [Element(),Element()] >> Element()

        For pipeline concatenation use the same ">>" symbol
            pipeline_x >> pipeline_y
    '''

    def __init__(self, data, trigger=Trigger.DEFAULT):
        self.data = data
        self.trigger = trigger
        self.next_elem = None
        self.prev_elem = None

    def __wrap_element(self, elem):
        # elem >> [elem] case
        if type(elem) == list:
            return Element(elem)
        #[elem] >> elem
        elif type(elem.data) == list:
            return elem
        #elem >> elem
        return Element([elem])

    def __get_head_element(self, elem):
        while elem.prev_elem is not None:
            elem = elem.prev_elem
        return elem

    def __rshift__(self, elem):
        # This is the first elem in pipeline, let's wrap you (head)
        if self.prev_elem == None:
            self = self.__wrap_element(self)
        # Wrap the elem object for single/parallel use, get the head
        wrapped_elem = self.__get_head_element(self.__wrap_element(elem))
        self.next_elem = wrapped_elem
        wrapped_elem.prev_elem = self
        return wrapped_elem

Subclasses