Module sinbadflow.utils.dbr_job
Expand source code
import requests
import json
import time
import logging
from ..settings.dbr_vars import *
class RunStatusError(Exception):
'''Custom exception class used in JobSubmitter class'''
pass
class RunStatusError(Exception):
'''Custom exception class used in JobSubmitter class'''
pass
class WrongModeSelected(Exception):
'''Custom exception class used in JobSubmitter class'''
pass
class NoTokenError(Exception):
'''Custom exception class used in JobSubmitter class'''
pass
# Native Databricks variable setup - will only work in Databricks environment
try:
spark = get_spark()
dbutils = get_dbutils(spark)
except:
logging.warning(
'!!! Failed to set dbutils variable which is used to run notebooks on interactive cluster. Make sure you are inside Databricks environment !!!')
# Native Databricks variable setup - will only work in Databricks environment
class JobSubmitter():
'''JobSubmitter object runs databricks notebook on job or interactive cluster.
Attributes:
cluster_mode: string - Databricks cluster mode to run the job (interactive/job supported)
job_args: dictionary - Databricks notebook arguments
Methods:
set_access_token (token: string) (class method) - sets up access token for cluster creation \n
submit_notebook(notebook_path: string, timeout: int, args:dict) - submits notebook to job cluster \n
get_job_info(run_id: int) - gets the info about specific run_id'''
__access_token = None
DATABRICKS_INSTANCE = 'https://westeurope.azuredatabricks.net'
safety_timeout = None
def __init__(self, cluster_mode, input_job_args):
if cluster_mode in ['interactive', 'job']:
self.cluster_mode = cluster_mode
else:
raise WrongModeSelected(
f'Wrong cluster_mode selected, Dbr object supports "interactive" or "job" modes, {self.cluster_mode} was passed')
self.__job_args = {
"new_cluster": {
"spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
'spark_version': '6.4.x-scala2.11',
'node_type_id': 'Standard_DS3_v2',
'driver_node_type_id': 'Standard_DS3_v2',
'num_workers': 1},
"timeout_seconds": None,
"notebook_task": {
"notebook_path": None},
"notebook_params": {}}
self.__job_args['new_cluster'].update(input_job_args)
@classmethod
def set_access_token(cls, token):
'''Set up Databricks access token
Args:
token: string - Databricks access token
'''
cls.__access_token = token
def submit_notebook(self, notebook_path, timeout, args):
'''Submits notebook to run with timeout and arguments
Args:
notebook_path: string
timeout: int
args: dict
'''
if self.cluster_mode == 'interactive':
dbutils.notebook.run(notebook_path, timeout, args)
return
if self.__access_token == None:
raise NoTokenError(
'\n !!! Access token missing. Use class method JobSubmitter.set_access_token(<TOKEN>) to set class method !!! \n')
self.__set_notebook_job_args(notebook_path, timeout, args)
post_resp = self.__submit_job()
self.safety_timeout = time.time() + timeout * 1.1
run_status = self.__get_notebook_status(post_resp.json())
get_resp = self.get_job_info(post_resp.json().get('run_id'))
if run_status in ['FAILED', 'TIMEDOUT', 'CANCELED', 'SKIPPED', 'INTERNAL_ERROR']:
raise RunStatusError(
f'Run {get_resp.json().get("run_id")} FAILED, status: {run_status}, run notebook: {get_resp.json().get("run_page_url")}')
return
def __set_notebook_job_args(self, notebook_path, timeout, args):
self.__job_args['notebook_task']['notebook_path'] = notebook_path
self.__job_args['notebook_params'] = args
self.__job_args['timeout_seconds'] = timeout
def __submit_job(self):
return requests.post(f'{self.DATABRICKS_INSTANCE}/api/2.0/jobs/runs/submit', json=self.__job_args, headers={'Authorization': f'Bearer {self.__access_token}'})
def get_job_info(self, run_id):
'''Get info from the job cluster with specific run_id
Args:
run_id: int
Returns:
dict'''
return requests.get(f'{self.DATABRICKS_INSTANCE}/api/2.0/jobs/runs/get?run_id={run_id}', headers={'Authorization': f'Bearer {self.__access_token}'})
def __get_notebook_status(self, response):
while self.get_job_info(response.get('run_id')).json().get('state').get('life_cycle_state') in ['PENDING', 'RUNNING', 'TERMINATING']:
time.sleep(10)
if time.time() > self.safety_timeout:
return 'TIMEDOUT'
state = self.get_job_info(response.get('run_id')).json().get('state')
if state.get('life_cycle_state') in ['SKIPPED', 'INTERNAL_ERROR']:
return state.get('life_cycle_state')
return state.get('result_state')
Classes
class JobSubmitter (cluster_mode, input_job_args)
-
JobSubmitter object runs databricks notebook on job or interactive cluster.
Attributes
cluster_mode
- string - Databricks cluster mode to run the job (interactive/job supported)
job_args
- dictionary - Databricks notebook arguments
Methods
set_access_token (token: string) (class method) - sets up access token for cluster creation
submit_notebook(notebook_path: string, timeout: int, args:dict) - submits notebook to job cluster
get_job_info(run_id: int) - gets the info about specific run_id
Expand source code
class JobSubmitter(): '''JobSubmitter object runs databricks notebook on job or interactive cluster. Attributes: cluster_mode: string - Databricks cluster mode to run the job (interactive/job supported) job_args: dictionary - Databricks notebook arguments Methods: set_access_token (token: string) (class method) - sets up access token for cluster creation \n submit_notebook(notebook_path: string, timeout: int, args:dict) - submits notebook to job cluster \n get_job_info(run_id: int) - gets the info about specific run_id''' __access_token = None DATABRICKS_INSTANCE = 'https://westeurope.azuredatabricks.net' safety_timeout = None def __init__(self, cluster_mode, input_job_args): if cluster_mode in ['interactive', 'job']: self.cluster_mode = cluster_mode else: raise WrongModeSelected( f'Wrong cluster_mode selected, Dbr object supports "interactive" or "job" modes, {self.cluster_mode} was passed') self.__job_args = { "new_cluster": { "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"}, 'spark_version': '6.4.x-scala2.11', 'node_type_id': 'Standard_DS3_v2', 'driver_node_type_id': 'Standard_DS3_v2', 'num_workers': 1}, "timeout_seconds": None, "notebook_task": { "notebook_path": None}, "notebook_params": {}} self.__job_args['new_cluster'].update(input_job_args) @classmethod def set_access_token(cls, token): '''Set up Databricks access token Args: token: string - Databricks access token ''' cls.__access_token = token def submit_notebook(self, notebook_path, timeout, args): '''Submits notebook to run with timeout and arguments Args: notebook_path: string timeout: int args: dict ''' if self.cluster_mode == 'interactive': dbutils.notebook.run(notebook_path, timeout, args) return if self.__access_token == None: raise NoTokenError( '\n !!! Access token missing. Use class method JobSubmitter.set_access_token(<TOKEN>) to set class method !!! \n') self.__set_notebook_job_args(notebook_path, timeout, args) post_resp = self.__submit_job() self.safety_timeout = time.time() + timeout * 1.1 run_status = self.__get_notebook_status(post_resp.json()) get_resp = self.get_job_info(post_resp.json().get('run_id')) if run_status in ['FAILED', 'TIMEDOUT', 'CANCELED', 'SKIPPED', 'INTERNAL_ERROR']: raise RunStatusError( f'Run {get_resp.json().get("run_id")} FAILED, status: {run_status}, run notebook: {get_resp.json().get("run_page_url")}') return def __set_notebook_job_args(self, notebook_path, timeout, args): self.__job_args['notebook_task']['notebook_path'] = notebook_path self.__job_args['notebook_params'] = args self.__job_args['timeout_seconds'] = timeout def __submit_job(self): return requests.post(f'{self.DATABRICKS_INSTANCE}/api/2.0/jobs/runs/submit', json=self.__job_args, headers={'Authorization': f'Bearer {self.__access_token}'}) def get_job_info(self, run_id): '''Get info from the job cluster with specific run_id Args: run_id: int Returns: dict''' return requests.get(f'{self.DATABRICKS_INSTANCE}/api/2.0/jobs/runs/get?run_id={run_id}', headers={'Authorization': f'Bearer {self.__access_token}'}) def __get_notebook_status(self, response): while self.get_job_info(response.get('run_id')).json().get('state').get('life_cycle_state') in ['PENDING', 'RUNNING', 'TERMINATING']: time.sleep(10) if time.time() > self.safety_timeout: return 'TIMEDOUT' state = self.get_job_info(response.get('run_id')).json().get('state') if state.get('life_cycle_state') in ['SKIPPED', 'INTERNAL_ERROR']: return state.get('life_cycle_state') return state.get('result_state')
Class variables
var DATABRICKS_INSTANCE
var safety_timeout
Static methods
def set_access_token(token)
-
Set up Databricks access token
Args
token
- string - Databricks access token
Expand source code
@classmethod def set_access_token(cls, token): '''Set up Databricks access token Args: token: string - Databricks access token ''' cls.__access_token = token
Methods
def get_job_info(self, run_id)
-
Get info from the job cluster with specific run_id
Args
run_id
- int
Returns
dict
Expand source code
def get_job_info(self, run_id): '''Get info from the job cluster with specific run_id Args: run_id: int Returns: dict''' return requests.get(f'{self.DATABRICKS_INSTANCE}/api/2.0/jobs/runs/get?run_id={run_id}', headers={'Authorization': f'Bearer {self.__access_token}'})
def submit_notebook(self, notebook_path, timeout, args)
-
Submits notebook to run with timeout and arguments
Args
notebook_path
- string
timeout
- int
args
- dict
Expand source code
def submit_notebook(self, notebook_path, timeout, args): '''Submits notebook to run with timeout and arguments Args: notebook_path: string timeout: int args: dict ''' if self.cluster_mode == 'interactive': dbutils.notebook.run(notebook_path, timeout, args) return if self.__access_token == None: raise NoTokenError( '\n !!! Access token missing. Use class method JobSubmitter.set_access_token(<TOKEN>) to set class method !!! \n') self.__set_notebook_job_args(notebook_path, timeout, args) post_resp = self.__submit_job() self.safety_timeout = time.time() + timeout * 1.1 run_status = self.__get_notebook_status(post_resp.json()) get_resp = self.get_job_info(post_resp.json().get('run_id')) if run_status in ['FAILED', 'TIMEDOUT', 'CANCELED', 'SKIPPED', 'INTERNAL_ERROR']: raise RunStatusError( f'Run {get_resp.json().get("run_id")} FAILED, status: {run_status}, run notebook: {get_resp.json().get("run_page_url")}') return
class NoTokenError (...)
-
Custom exception class used in JobSubmitter class
Expand source code
class NoTokenError(Exception): '''Custom exception class used in JobSubmitter class''' pass
Ancestors
- builtins.Exception
- builtins.BaseException
class RunStatusError (...)
-
Custom exception class used in JobSubmitter class
Expand source code
class RunStatusError(Exception): '''Custom exception class used in JobSubmitter class''' pass
Ancestors
- builtins.Exception
- builtins.BaseException
class WrongModeSelected (...)
-
Custom exception class used in JobSubmitter class
Expand source code
class WrongModeSelected(Exception): '''Custom exception class used in JobSubmitter class''' pass
Ancestors
- builtins.Exception
- builtins.BaseException