Reference
yapp.core
yapp core classes
AttrDict
Bases: dict
Extends dict so that elements can be accessed as attributes
__init__(other=None, **kwargs)
Create a new AttrDict
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
None, mapping, iterable
|
required |
recursive_convert(obj)
staticmethod
Makes all dict inside obj AttrDict
InputAdapter
Bases: ABC
Abstract Input Adapter
An input adapter represents a type of input from a specific source
get(key)
abstractmethod
Returns the requested input
Inputs
Bases: dict
Inputs implementation (just dict with some utility methods)
expose(source, internal_name, name)
Expose input attribute using another name
register(name, adapter)
New input adapter (just a new Item)
Job
Bases: ABC
Job represents a step in our pipeline
completed()
property
True if job successfully run, False otherwise
config()
property
Shortcut for self.pipeline.config
execute(*inputs, **params)
abstractmethod
Job entrypoint
name()
property
Helper to return the name of the class
Monitor
Pipeline status monitoring class Wrapper class used to group hooks, just define your hooks as class method and they will be automatically called when needed
OutputAdapter
Bases: ABC
Abstract output Adapter
An output adapter represents a specific output destination
empty(job_name)
Override this if you wish to save something when a Job returns nothing, Leave as it is you prefer ignoring it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Name of the job returning None |
required |
name()
property
Helper to return the name of the class
save(key, data)
abstractmethod
Save intermediate data here
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Key is the name used as key in the returned dict from the Job, or if it didn't return a dictionary, the Job's name. |
required |
data |
dict | Any
|
Data returned from Job execution |
required |
save_result(key, data)
Save final result here Leave it as it is you just use save
Pipeline
yapp Pipeline object
Pipeline implementation. Collects jobs, hooks and input and output adapter and runs the pipeline.
Attributes:
Name | Type | Description |
---|---|---|
OK_LOGLEVEL |
int
|
Loglevel to use for pipeline and jobs completed execution status messages |
VALID_HOOKS |
list
|
list of valid hooks that can be used in a pipeline |
__nested_timed_calls |
int
|
level of nested calls to |
__call__(save_results=None)
Pipeline entrypoint
Sets up inputs, outputs and config (if specified) and runs the pipeline
__init__(job_list, name='', inputs=None, outputs=None, monitor=None, **hooks)
init.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_list |
Sequence[type[Job]]
|
List of Jobs classes to run (in correct order) inside the pipeline |
required |
name |
str
|
Pipeline name |
''
|
inputs |
Union[Inputs, None]
|
Inputs for the pipeline |
None
|
outputs |
Union[Sequence[type[OutputAdapter]], Set[type[OutputAdapter]], type[OutputAdapter], None]
|
Outputs for the pipeline |
None
|
monitor |
Union[Monitor, None]
|
Monitor for the pipeline |
None
|
**hooks |
Hooks to attach to the pipeline |
{}
|
_run()
Runs all Pipeline's jobs
_run_job(job)
Execution of a single job
completed()
property
True if pipeline successfully run, False otherwise
config()
property
Shortcut for configuration from inputs
job_name()
property
Shortcut for self.current_job.name which handles no current_job
run_hook(hook_name)
Run all hooks for current event
A hook is just a function taking a pipeline as single argument
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook_name |
str
|
name of the hook to run ("on_pipeline_start", "on_job_start", etc.) |
required |
save_output(name, data, results=False)
Save data to each output adapter
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
name to pass to the output adapters when saving the data |
required |
data |
Any
|
data to save |
required |
timed(typename, name, func, *args, _update_object=None, **kwargs)
Runs a timed execution of a function, logging times
The first two parameters are used to specify the type and name of the entity to run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
typename |
str
|
name of the type of the component to run ("pipeline", "job", "hook", etc.) |
required |
name |
str
|
name of the component to run |
required |
func |
callable
|
function to run |
required |
*args |
()
|
||
**kwargs |
{}
|
Returns:
Type | Description |
---|---|
(Any) The output of provided function |
attr_dict
AttrDict
Bases: dict
Extends dict so that elements can be accessed as attributes
__init__(other=None, **kwargs)
Create a new AttrDict
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
None, mapping, iterable
|
required |
recursive_convert(obj)
staticmethod
Makes all dict inside obj AttrDict
errors
ConfigurationError
EmptyConfiguration
ImportedCodeFailed
MissingConfiguration
MissingEnv
Bases: YappFatalError
Exception raised when an environment variable requested in config is not defined
MissingPipeline
YappFatalError
Bases: RuntimeError
, ABC
Generic fatal exception
log()
abstractmethod
Used to log specific error messages
log_and_exit()
Calls log and exit with the relevan error exit_code
input_adapter
InputAdapter
Bases: ABC
Abstract Input Adapter
An input adapter represents a type of input from a specific source
get(key)
abstractmethod
Returns the requested input
inputs
Inputs
Bases: dict
Inputs implementation (just dict with some utility methods)
expose(source, internal_name, name)
Expose input attribute using another name
register(name, adapter)
New input adapter (just a new Item)
job
Job
Bases: ABC
Job represents a step in our pipeline
completed()
property
True if job successfully run, False otherwise
config()
property
Shortcut for self.pipeline.config
execute(*inputs, **params)
abstractmethod
Job entrypoint
name()
property
Helper to return the name of the class
monitor
Monitor
Pipeline status monitoring class Wrapper class used to group hooks, just define your hooks as class method and they will be automatically called when needed
output_adapter
OutputAdapter
Bases: ABC
Abstract output Adapter
An output adapter represents a specific output destination
empty(job_name)
Override this if you wish to save something when a Job returns nothing, Leave as it is you prefer ignoring it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name |
str
|
Name of the job returning None |
required |
name()
property
Helper to return the name of the class
save(key, data)
abstractmethod
Save intermediate data here
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str
|
Key is the name used as key in the returned dict from the Job, or if it didn't return a dictionary, the Job's name. |
required |
data |
dict | Any
|
Data returned from Job execution |
required |
save_result(key, data)
Save final result here Leave it as it is you just use save
pipeline
Pipeline
yapp Pipeline object
Pipeline implementation. Collects jobs, hooks and input and output adapter and runs the pipeline.
Attributes:
Name | Type | Description |
---|---|---|
OK_LOGLEVEL |
int
|
Loglevel to use for pipeline and jobs completed execution status messages |
VALID_HOOKS |
list
|
list of valid hooks that can be used in a pipeline |
__nested_timed_calls |
int
|
level of nested calls to |
__call__(save_results=None)
Pipeline entrypoint
Sets up inputs, outputs and config (if specified) and runs the pipeline
__init__(job_list, name='', inputs=None, outputs=None, monitor=None, **hooks)
init.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_list |
Sequence[type[Job]]
|
List of Jobs classes to run (in correct order) inside the pipeline |
required |
name |
str
|
Pipeline name |
''
|
inputs |
Union[Inputs, None]
|
Inputs for the pipeline |
None
|
outputs |
Union[Sequence[type[OutputAdapter]], Set[type[OutputAdapter]], type[OutputAdapter], None]
|
Outputs for the pipeline |
None
|
monitor |
Union[Monitor, None]
|
Monitor for the pipeline |
None
|
**hooks |
Hooks to attach to the pipeline |
{}
|
_run()
Runs all Pipeline's jobs
_run_job(job)
Execution of a single job
completed()
property
True if pipeline successfully run, False otherwise
config()
property
Shortcut for configuration from inputs
job_name()
property
Shortcut for self.current_job.name which handles no current_job
run_hook(hook_name)
Run all hooks for current event
A hook is just a function taking a pipeline as single argument
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hook_name |
str
|
name of the hook to run ("on_pipeline_start", "on_job_start", etc.) |
required |
save_output(name, data, results=False)
Save data to each output adapter
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
name to pass to the output adapters when saving the data |
required |
data |
Any
|
data to save |
required |
timed(typename, name, func, *args, _update_object=None, **kwargs)
Runs a timed execution of a function, logging times
The first two parameters are used to specify the type and name of the entity to run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
typename |
str
|
name of the type of the component to run ("pipeline", "job", "hook", etc.) |
required |
name |
str
|
name of the component to run |
required |
func |
callable
|
function to run |
required |
*args |
()
|
||
**kwargs |
{}
|
Returns:
Type | Description |
---|---|
(Any) The output of provided function |
enforce_list(value)
Makes sure the argument can be treated as a list
yapp.cli
yapp cli parsing
main()
yapp cli entrypoint
logs
LogFormatter
Bases: logging.Formatter
Custom LogFormatter, probably not the best way at all to do this but was fun doing it this way.
get_color(loglevel=None)
Returns the color escape characters to print
add_logging_level(level_name, level_num, method_name=None)
Comprehensively adds a new logging level to the logging
module and the
currently configured logging class.
level_name
becomes an attribute of the logging
module with the value
level_num
. method_name
becomes a convenience method for both logging
itself and the class returned by logging.getLoggerClass()
(usually just
logging.Logger
). If method_name
is not specified, level_name.lower()
is
used.
To avoid accidental clobberings of existing attributes, this method will
raise an AttributeError
if the level name is already an attribute of the
logging
module or if the method name is already present
Taken from this great answer on StackOverflow
Example
add_logging_level('TRACE', logging.DEBUG - 5) logging.getLogger(name).setLevel("TRACE") logging.getLogger(name).trace('that worked') logging.trace('so did this') logging.TRACE 5
setup_logging(loglevel, color=False, logfile='', show_lineno=False)
Setup logging for yapp
parsing
ConfigParser
Parses config files and build a pipeline accordingly
build_hooks(cfg_hooks)
Sets up hooks from hooks
field in YAML files
build_inputs(cfg_inputs, config=None)
Sets up inputs from inputs
and expose
fields in YAML files
build_job(step, params)
Create Job given pipeline and step name
build_monitor(cfg_monitor)
Sets up monitor from monitor
field in YAML files
build_new_job_class(step, module, func_name, params)
Build new Job subclass at runtime
build_outputs(cfg_outputs)
Sets up outputs from outputs
field in YAML files
build_pipeline(pipeline_cfg, inputs=None, outputs=None, hooks=None, monitor=None)
Creates pipeline from pipeline and config definition dicts
create_adapter(adapter_name, params)
Loads the relevant module and instantiates an adapter from it
do_validation(pipelines_yaml)
Performs validation on a dict read from a pipelines.yml file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipelines_yaml |
dict
|
pipelines_yaml |
required |
load_module(module_name)
Loads a python module from a .py file or yapp modules
make_hook(single_hook)
Create a single hook from its dict Configuration
make_input(single_input)
Create a single input from its dict Configuration
make_output(single_output)
Create a single output from its dict Configuration
parse(skip_validation=False)
Reads and parses pipelines.yml, creates a pipeline object
switch_workdir(workdir=None)
Switches to the pipeline workdir that jobs and hooks expect
camel_to_snake(name)
Returns snake_case version of a CamelCase string
do_nothing_constructor(self, node)
Constructor just returning the string for the node
env_constructor(loader, node)
Conctructor to automatically look up for env variables
yaml_read(path)
Read YAML from path
validation
ErrorHandler
Bases: BasicErrorHandler
Cerberus custom ErrorHandler to print config errors the way I want
check_code_reference(field, value, error)
Check if a string can be a valid python module or function reference
validate(definitions)
Validate schema for definitions from YAML file
yapp.adapters
yapp input and ouput adapters
file
CsvInput
Bases: InputAdapter
CSV Input adapter
An input adapter for CSV files, input is read into a pandas DataFrame
pandas
FunctionWrapperInputAdapter
pgsql
PgSqlInput
PgSqlOutput
make_pgsql_connection(username, password, host, port, database)
Create PostgreSQL connection using SQLAlchemy create_engine
Parameters:
Name | Type | Description | Default |
---|---|---|---|
username |
required | ||
password |
required | ||
host |
required | ||
port |
required | ||
database |
required |
snowflake
SnowflakeInput
sql
SqlInput
Bases: InputAdapter
SQL Input adapter
An input adapter for SQL databases, input is read into a pandas DataFrame
SqlOutput
Bases: OutputAdapter
SQL output adapter
Output adapter for SQL databases, a pandas DataFrame is written to a table