Module sinbadflow.executor
Main execution part of Sinbadflow library
Expand source code
'''Main execution part of Sinbadflow library'''
from .utils import Logger, LogLevel
from .utils import StatusHandler, Status
from concurrent.futures import ThreadPoolExecutor, wait
from .element import Element
class Sinbadflow():
'''Sinbadflow pipeline runner. Named after famous cartoon "Sinbad: Legend of the Seven Seas" it provides ability to run pipelines made of agents
with specific triggers and conditional functions in parallel (using ThreadPoolExecutor) or single mode.
Args:
logging_option: object - selects preferred option of logging (print/logging supported), print by default
status_handler: StatusHandler - object used for status to trigger comparison and result retrieval, None by default
log_errors: boolean - flag to set explicit error logging with preferred logging_option, False by default
Methods:
run(pipeline: BaseAgent) - runs the input pipeline \n
get_head_from_pipeline(pipeline: BaseAgent) -> BaseAgent - returns the head element form the pipeline \n
print_pipeline(pipeline: BaseAgent) - logs the full pipeline
Usage example:
elem_x = DatabricksAgent('/path/to/notebook', Trigger.OK_PREV)
elem_y = DatabricksAgent('/path/to/notebook', Trigger.FAIL_PREV)
pipeline = elem_x >> elem_y
sf = Sinbadflow()
sf.run(pipeline)
'''
def __init__(self, logging_option=print, status_handler=None, log_errors=False):
if status_handler:
self.status_handler = status_handler
else:
self.status_handler = StatusHandler()
self.logger = Logger(logging_option)
self.log_errors = log_errors
self.head = None
def run(self, pipeline):
'''Runs the input pipeline
Args:
pipeline: BaseAgent object
Example usage:
pipeline = element1 >> element2
sinbadflow_instance.run(pipeline)
'''
pipeline = self.__wrap_element_if_single(pipeline)
self.head = self.get_head_from_pipeline(pipeline)
self.logger.log('Pipeline run started')
self.__traverse_pipeline(self.__run_elements)
self.logger.log(f'\nPipeline run finished')
self.status_handler.print_results(self.logger)
def __wrap_element_if_single(self, pipeline):
if type(pipeline) == list:
return Element(pipeline)
elif pipeline.prev_elem == None and pipeline.next_elem == None:
return Element([pipeline])
return pipeline
def get_head_from_pipeline(self, pipeline):
'''Returns head element from the pipeline
Args:
pipeline: BaseAgent object
Returns:
BaseAgent (head element)
'''
self.__traverse_pipeline(self.__set_head_element, pipeline, False)
return self.head
def __traverse_pipeline(self, func, pipeline=None, forward=True):
pointer = self.head if forward else pipeline
while pointer is not None:
func(pointer)
pointer = pointer.next_elem if forward else pointer.prev_elem
def __set_head_element(self, elem):
if elem.prev_elem == None:
self.head = elem
def __run_elements(self, elem):
triggered_elements = self.__get_non_empty_elements_to_execute(elem)
self.__execute_elements(triggered_elements)
def __get_non_empty_elements_to_execute(self, element):
return [elem for elem in element.data if elem.data != None]
def __execute_elements(self, element_list):
if not len(element_list):
return
self.logger.log('\n-----------PIPELINE STEP-----------')
self.logger.log(
f' Executing pipeline element(s): {[elem.data for elem in element_list]}')
result_statuses = []
with ThreadPoolExecutor(max_workers=None) as executor:
for status in executor.map(self.__execute, element_list):
result_statuses.append(status)
self.status_handler.add_status(result_statuses)
def __execute(self, element):
if not self.__is_trigger_initiated(element.trigger) or not element.conditional_func():
result_status = Status.SKIPPED
else:
try:
element.run()
result_status = Status.OK
except Exception as e:
if self.log_errors:
self.logger.log(e, LogLevel.CRITICAL)
result_status = Status.FAIL
return self.__log_and_return_result(result_status, element)
def __log_and_return_result(self, status, element):
if status == Status.SKIPPED:
conditional_part, func_name = (
' or conditional function', f', conditional_func -> {element.conditional_func.__name__}()') if element.conditional_func.__name__ != 'default_func' else ('', '')
self.logger.log(f' SKIPPED: Trigger rule{conditional_part} failed for element {element.data}: Element trigger rule -> {element.trigger.name}' +
f' and previous run status -> {self.status_handler.last_status.name}{func_name}', LogLevel.WARNING)
return status
else:
level = LogLevel.CRITICAL if status == Status.FAIL else LogLevel.INFO
self.logger.log(
f' Element "{element.data}" run status: {status.name.replace("_PREV","")}', level)
return status
def print_pipeline(self, pipeline):
'''Prints full pipeline
Args:
pipeline: BaseAgent
'''
self.head = self.get_head_from_pipeline(pipeline)
self.logger.log(f'↓ -----START-----')
self.__traverse_pipeline(self.__print_element)
self.logger.log(f'■ -----END-----')
def __print_element(self, elem):
self.logger.log(
f'''↓ Agent(s) to run: {[("• name: "+type(el).__name__ +
", data: "+str(el.data)+", trigger: "+
el.trigger.name+", conditional_func: "+
el.conditional_func.__name__+"()") for el in elem.data]}''')
def __is_trigger_initiated(self, trigger):
return self.status_handler.is_status_mapped_to_trigger(trigger)
Classes
class Sinbadflow (logging_option=<built-in function print>, status_handler=None, log_errors=False)
-
Sinbadflow pipeline runner. Named after famous cartoon "Sinbad: Legend of the Seven Seas" it provides ability to run pipelines made of agents with specific triggers and conditional functions in parallel (using ThreadPoolExecutor) or single mode.
Args
logging_option
- object - selects preferred option of logging (print/logging supported), print by default
status_handler
- StatusHandler - object used for status to trigger comparison and result retrieval, None by default
log_errors
- boolean - flag to set explicit error logging with preferred logging_option, False by default
Methods
run(pipeline: BaseAgent) - runs the input pipeline
get_head_from_pipeline(pipeline: BaseAgent) -> BaseAgent - returns the head element form the pipeline
print_pipeline(pipeline: BaseAgent) - logs the full pipeline
Usage example:
elem_x = DatabricksAgent('/path/to/notebook', Trigger.OK_PREV) elem_y = DatabricksAgent('/path/to/notebook', Trigger.FAIL_PREV) pipeline = elem_x >> elem_y sf = Sinbadflow() sf.run(pipeline)
Expand source code
class Sinbadflow(): '''Sinbadflow pipeline runner. Named after famous cartoon "Sinbad: Legend of the Seven Seas" it provides ability to run pipelines made of agents with specific triggers and conditional functions in parallel (using ThreadPoolExecutor) or single mode. Args: logging_option: object - selects preferred option of logging (print/logging supported), print by default status_handler: StatusHandler - object used for status to trigger comparison and result retrieval, None by default log_errors: boolean - flag to set explicit error logging with preferred logging_option, False by default Methods: run(pipeline: BaseAgent) - runs the input pipeline \n get_head_from_pipeline(pipeline: BaseAgent) -> BaseAgent - returns the head element form the pipeline \n print_pipeline(pipeline: BaseAgent) - logs the full pipeline Usage example: elem_x = DatabricksAgent('/path/to/notebook', Trigger.OK_PREV) elem_y = DatabricksAgent('/path/to/notebook', Trigger.FAIL_PREV) pipeline = elem_x >> elem_y sf = Sinbadflow() sf.run(pipeline) ''' def __init__(self, logging_option=print, status_handler=None, log_errors=False): if status_handler: self.status_handler = status_handler else: self.status_handler = StatusHandler() self.logger = Logger(logging_option) self.log_errors = log_errors self.head = None def run(self, pipeline): '''Runs the input pipeline Args: pipeline: BaseAgent object Example usage: pipeline = element1 >> element2 sinbadflow_instance.run(pipeline) ''' pipeline = self.__wrap_element_if_single(pipeline) self.head = self.get_head_from_pipeline(pipeline) self.logger.log('Pipeline run started') self.__traverse_pipeline(self.__run_elements) self.logger.log(f'\nPipeline run finished') self.status_handler.print_results(self.logger) def __wrap_element_if_single(self, pipeline): if type(pipeline) == list: return Element(pipeline) elif pipeline.prev_elem == None and pipeline.next_elem == None: return Element([pipeline]) return pipeline def get_head_from_pipeline(self, pipeline): '''Returns head element from the pipeline Args: pipeline: BaseAgent object Returns: BaseAgent (head element) ''' self.__traverse_pipeline(self.__set_head_element, pipeline, False) return self.head def __traverse_pipeline(self, func, pipeline=None, forward=True): pointer = self.head if forward else pipeline while pointer is not None: func(pointer) pointer = pointer.next_elem if forward else pointer.prev_elem def __set_head_element(self, elem): if elem.prev_elem == None: self.head = elem def __run_elements(self, elem): triggered_elements = self.__get_non_empty_elements_to_execute(elem) self.__execute_elements(triggered_elements) def __get_non_empty_elements_to_execute(self, element): return [elem for elem in element.data if elem.data != None] def __execute_elements(self, element_list): if not len(element_list): return self.logger.log('\n-----------PIPELINE STEP-----------') self.logger.log( f' Executing pipeline element(s): {[elem.data for elem in element_list]}') result_statuses = [] with ThreadPoolExecutor(max_workers=None) as executor: for status in executor.map(self.__execute, element_list): result_statuses.append(status) self.status_handler.add_status(result_statuses) def __execute(self, element): if not self.__is_trigger_initiated(element.trigger) or not element.conditional_func(): result_status = Status.SKIPPED else: try: element.run() result_status = Status.OK except Exception as e: if self.log_errors: self.logger.log(e, LogLevel.CRITICAL) result_status = Status.FAIL return self.__log_and_return_result(result_status, element) def __log_and_return_result(self, status, element): if status == Status.SKIPPED: conditional_part, func_name = ( ' or conditional function', f', conditional_func -> {element.conditional_func.__name__}()') if element.conditional_func.__name__ != 'default_func' else ('', '') self.logger.log(f' SKIPPED: Trigger rule{conditional_part} failed for element {element.data}: Element trigger rule -> {element.trigger.name}' + f' and previous run status -> {self.status_handler.last_status.name}{func_name}', LogLevel.WARNING) return status else: level = LogLevel.CRITICAL if status == Status.FAIL else LogLevel.INFO self.logger.log( f' Element "{element.data}" run status: {status.name.replace("_PREV","")}', level) return status def print_pipeline(self, pipeline): '''Prints full pipeline Args: pipeline: BaseAgent ''' self.head = self.get_head_from_pipeline(pipeline) self.logger.log(f'↓ -----START-----') self.__traverse_pipeline(self.__print_element) self.logger.log(f'■ -----END-----') def __print_element(self, elem): self.logger.log( f'''↓ Agent(s) to run: {[("• name: "+type(el).__name__ + ", data: "+str(el.data)+", trigger: "+ el.trigger.name+", conditional_func: "+ el.conditional_func.__name__+"()") for el in elem.data]}''') def __is_trigger_initiated(self, trigger): return self.status_handler.is_status_mapped_to_trigger(trigger)
Methods
def get_head_from_pipeline(self, pipeline)
-
Returns head element from the pipeline
Args
pipeline
- BaseAgent object
Returns
BaseAgent (head element)
Expand source code
def get_head_from_pipeline(self, pipeline): '''Returns head element from the pipeline Args: pipeline: BaseAgent object Returns: BaseAgent (head element) ''' self.__traverse_pipeline(self.__set_head_element, pipeline, False) return self.head
def print_pipeline(self, pipeline)
-
Prints full pipeline
Args
pipeline
- BaseAgent
Expand source code
def print_pipeline(self, pipeline): '''Prints full pipeline Args: pipeline: BaseAgent ''' self.head = self.get_head_from_pipeline(pipeline) self.logger.log(f'↓ -----START-----') self.__traverse_pipeline(self.__print_element) self.logger.log(f'■ -----END-----')
def run(self, pipeline)
-
Runs the input pipeline
Args
pipeline
- BaseAgent object
Example usage:
pipeline = element1 >> element2 sinbadflow_instance.run(pipeline)
Expand source code
def run(self, pipeline): '''Runs the input pipeline Args: pipeline: BaseAgent object Example usage: pipeline = element1 >> element2 sinbadflow_instance.run(pipeline) ''' pipeline = self.__wrap_element_if_single(pipeline) self.head = self.get_head_from_pipeline(pipeline) self.logger.log('Pipeline run started') self.__traverse_pipeline(self.__run_elements) self.logger.log(f'\nPipeline run finished') self.status_handler.print_results(self.logger)