Skip to content

Reference

yapp.core

yapp core classes

AttrDict

Bases: dict

Extends dict so that elements can be accessed as attributes

__init__(other=None, **kwargs)

Create a new AttrDict

Parameters:

Name Type Description Default
self None, mapping, iterable required

recursive_convert(obj) staticmethod

Makes all dict inside obj AttrDict

InputAdapter

Bases: ABC

Abstract Input Adapter

An input adapter represents a type of input from a specific source

get(key) abstractmethod

Returns the requested input

Inputs

Bases: dict

Inputs implementation (just dict with some utility methods)

expose(source, internal_name, name)

Expose input attribute using another name

register(name, adapter)

New input adapter (just a new Item)

Job

Bases: ABC

Job represents a step in our pipeline

completed() property

True if job successfully run, False otherwise

config() property

Shortcut for self.pipeline.config

execute(*inputs, **params) abstractmethod

Job entrypoint

name() property

Helper to return the name of the class

Monitor

Pipeline status monitoring class Wrapper class used to group hooks, just define your hooks as class method and they will be automatically called when needed

OutputAdapter

Bases: ABC

Abstract output Adapter

An output adapter represents a specific output destination

empty(job_name)

Override this if you wish to save something when a Job returns nothing, Leave as it is you prefer ignoring it.

Parameters:

Name Type Description Default
job_name str

Name of the job returning None

required

name() property

Helper to return the name of the class

save(key, data) abstractmethod

Save intermediate data here

Parameters:

Name Type Description Default
key str

Key is the name used as key in the returned dict from the Job, or if it didn't return a dictionary, the Job's name.

required
data dict | Any

Data returned from Job execution

required

save_result(key, data)

Save final result here Leave it as it is you just use save

Pipeline

yapp Pipeline object

Pipeline implementation. Collects jobs, hooks and input and output adapter and runs the pipeline.

Attributes:

Name Type Description
OK_LOGLEVEL int

Loglevel to use for pipeline and jobs completed execution status messages

VALID_HOOKS list

list of valid hooks that can be used in a pipeline

__nested_timed_calls int

level of nested calls to timed, used to enhance logging

__call__(save_results=None)

Pipeline entrypoint

Sets up inputs, outputs and config (if specified) and runs the pipeline

__init__(job_list, name='', inputs=None, outputs=None, monitor=None, **hooks)

init.

Parameters:

Name Type Description Default
job_list Sequence[type[Job]]

List of Jobs classes to run (in correct order) inside the pipeline

required
name str

Pipeline name

''
inputs Union[Inputs, None]

Inputs for the pipeline

None
outputs Union[Sequence[type[OutputAdapter]], Set[type[OutputAdapter]], type[OutputAdapter], None]

Outputs for the pipeline

None
monitor Union[Monitor, None]

Monitor for the pipeline

None
**hooks

Hooks to attach to the pipeline

{}

_run()

Runs all Pipeline's jobs

_run_job(job)

Execution of a single job

completed() property

True if pipeline successfully run, False otherwise

config() property

Shortcut for configuration from inputs

job_name() property

Shortcut for self.current_job.name which handles no current_job

run_hook(hook_name)

Run all hooks for current event

A hook is just a function taking a pipeline as single argument

Parameters:

Name Type Description Default
hook_name str

name of the hook to run ("on_pipeline_start", "on_job_start", etc.)

required

save_output(name, data, results=False)

Save data to each output adapter

Parameters:

Name Type Description Default
name str

name to pass to the output adapters when saving the data

required
data Any

data to save

required

timed(typename, name, func, *args, _update_object=None, **kwargs)

Runs a timed execution of a function, logging times

The first two parameters are used to specify the type and name of the entity to run.

Parameters:

Name Type Description Default
typename str

name of the type of the component to run ("pipeline", "job", "hook", etc.)

required
name str

name of the component to run

required
func callable

function to run

required
*args ()
**kwargs {}

Returns:

Type Description

(Any) The output of provided function

attr_dict

AttrDict

Bases: dict

Extends dict so that elements can be accessed as attributes

__init__(other=None, **kwargs)

Create a new AttrDict

Parameters:

Name Type Description Default
self None, mapping, iterable required
recursive_convert(obj) staticmethod

Makes all dict inside obj AttrDict

errors

ConfigurationError

Bases: YappFatalError

Exception raised when an invalid configuration file is found

EmptyConfiguration

Bases: YappFatalError

Exception raised when an empty configuration file is found

ImportedCodeFailed

Bases: YappFatalError

Exception raised when a module is found bu importing it fails

MissingConfiguration

Bases: YappFatalError

Exception raised when no configuration file is found

MissingEnv

Bases: YappFatalError

Exception raised when an environment variable requested in config is not defined

MissingPipeline

Bases: YappFatalError

Exception raised when users requests a pipeline name not in pipelines.yml

YappFatalError

Bases: RuntimeError, ABC

Generic fatal exception

log() abstractmethod

Used to log specific error messages

log_and_exit()

Calls log and exit with the relevan error exit_code

input_adapter

InputAdapter

Bases: ABC

Abstract Input Adapter

An input adapter represents a type of input from a specific source

get(key) abstractmethod

Returns the requested input

inputs

Inputs

Bases: dict

Inputs implementation (just dict with some utility methods)

expose(source, internal_name, name)

Expose input attribute using another name

register(name, adapter)

New input adapter (just a new Item)

job

Job

Bases: ABC

Job represents a step in our pipeline

completed() property

True if job successfully run, False otherwise

config() property

Shortcut for self.pipeline.config

execute(*inputs, **params) abstractmethod

Job entrypoint

name() property

Helper to return the name of the class

monitor

Monitor

Pipeline status monitoring class Wrapper class used to group hooks, just define your hooks as class method and they will be automatically called when needed

output_adapter

OutputAdapter

Bases: ABC

Abstract output Adapter

An output adapter represents a specific output destination

empty(job_name)

Override this if you wish to save something when a Job returns nothing, Leave as it is you prefer ignoring it.

Parameters:

Name Type Description Default
job_name str

Name of the job returning None

required
name() property

Helper to return the name of the class

save(key, data) abstractmethod

Save intermediate data here

Parameters:

Name Type Description Default
key str

Key is the name used as key in the returned dict from the Job, or if it didn't return a dictionary, the Job's name.

required
data dict | Any

Data returned from Job execution

required
save_result(key, data)

Save final result here Leave it as it is you just use save

pipeline

Pipeline

yapp Pipeline object

Pipeline implementation. Collects jobs, hooks and input and output adapter and runs the pipeline.

Attributes:

Name Type Description
OK_LOGLEVEL int

Loglevel to use for pipeline and jobs completed execution status messages

VALID_HOOKS list

list of valid hooks that can be used in a pipeline

__nested_timed_calls int

level of nested calls to timed, used to enhance logging

__call__(save_results=None)

Pipeline entrypoint

Sets up inputs, outputs and config (if specified) and runs the pipeline

__init__(job_list, name='', inputs=None, outputs=None, monitor=None, **hooks)

init.

Parameters:

Name Type Description Default
job_list Sequence[type[Job]]

List of Jobs classes to run (in correct order) inside the pipeline

required
name str

Pipeline name

''
inputs Union[Inputs, None]

Inputs for the pipeline

None
outputs Union[Sequence[type[OutputAdapter]], Set[type[OutputAdapter]], type[OutputAdapter], None]

Outputs for the pipeline

None
monitor Union[Monitor, None]

Monitor for the pipeline

None
**hooks

Hooks to attach to the pipeline

{}
_run()

Runs all Pipeline's jobs

_run_job(job)

Execution of a single job

completed() property

True if pipeline successfully run, False otherwise

config() property

Shortcut for configuration from inputs

job_name() property

Shortcut for self.current_job.name which handles no current_job

run_hook(hook_name)

Run all hooks for current event

A hook is just a function taking a pipeline as single argument

Parameters:

Name Type Description Default
hook_name str

name of the hook to run ("on_pipeline_start", "on_job_start", etc.)

required
save_output(name, data, results=False)

Save data to each output adapter

Parameters:

Name Type Description Default
name str

name to pass to the output adapters when saving the data

required
data Any

data to save

required
timed(typename, name, func, *args, _update_object=None, **kwargs)

Runs a timed execution of a function, logging times

The first two parameters are used to specify the type and name of the entity to run.

Parameters:

Name Type Description Default
typename str

name of the type of the component to run ("pipeline", "job", "hook", etc.)

required
name str

name of the component to run

required
func callable

function to run

required
*args ()
**kwargs {}

Returns:

Type Description

(Any) The output of provided function

enforce_list(value)

Makes sure the argument can be treated as a list

yapp.cli

yapp cli parsing

main()

yapp cli entrypoint

logs

LogFormatter

Bases: logging.Formatter

Custom LogFormatter, probably not the best way at all to do this but was fun doing it this way.

get_color(loglevel=None)

Returns the color escape characters to print

add_logging_level(level_name, level_num, method_name=None)

Comprehensively adds a new logging level to the logging module and the currently configured logging class.

level_name becomes an attribute of the logging module with the value level_num. method_name becomes a convenience method for both logging itself and the class returned by logging.getLoggerClass() (usually just logging.Logger). If method_name is not specified, level_name.lower() is used.

To avoid accidental clobberings of existing attributes, this method will raise an AttributeError if the level name is already an attribute of the logging module or if the method name is already present

Taken from this great answer on StackOverflow

Example

add_logging_level('TRACE', logging.DEBUG - 5) logging.getLogger(name).setLevel("TRACE") logging.getLogger(name).trace('that worked') logging.trace('so did this') logging.TRACE 5

setup_logging(loglevel, color=False, logfile='', show_lineno=False)

Setup logging for yapp

parsing

ConfigParser

Parses config files and build a pipeline accordingly

build_hooks(cfg_hooks)

Sets up hooks from hooks field in YAML files

build_inputs(cfg_inputs, config=None)

Sets up inputs from inputs and expose fields in YAML files

build_job(step, params)

Create Job given pipeline and step name

build_monitor(cfg_monitor)

Sets up monitor from monitor field in YAML files

build_new_job_class(step, module, func_name, params)

Build new Job subclass at runtime

build_outputs(cfg_outputs)

Sets up outputs from outputs field in YAML files

build_pipeline(pipeline_cfg, inputs=None, outputs=None, hooks=None, monitor=None)

Creates pipeline from pipeline and config definition dicts

create_adapter(adapter_name, params)

Loads the relevant module and instantiates an adapter from it

do_validation(pipelines_yaml)

Performs validation on a dict read from a pipelines.yml file

Parameters:

Name Type Description Default
pipelines_yaml dict

pipelines_yaml

required
load_module(module_name)

Loads a python module from a .py file or yapp modules

make_hook(single_hook)

Create a single hook from its dict Configuration

make_input(single_input)

Create a single input from its dict Configuration

make_output(single_output)

Create a single output from its dict Configuration

parse(skip_validation=False)

Reads and parses pipelines.yml, creates a pipeline object

switch_workdir(workdir=None)

Switches to the pipeline workdir that jobs and hooks expect

camel_to_snake(name)

Returns snake_case version of a CamelCase string

do_nothing_constructor(self, node)

Constructor just returning the string for the node

env_constructor(loader, node)

Conctructor to automatically look up for env variables

yaml_read(path)

Read YAML from path

validation

ErrorHandler

Bases: BasicErrorHandler

Cerberus custom ErrorHandler to print config errors the way I want

check_code_reference(field, value, error)

Check if a string can be a valid python module or function reference

validate(definitions)

Validate schema for definitions from YAML file

yapp.adapters

yapp input and ouput adapters

file

CsvInput

Bases: InputAdapter

CSV Input adapter

An input adapter for CSV files, input is read into a pandas DataFrame

pandas

FunctionWrapperInputAdapter

Bases: InputAdapter

Very hacky generic input adapter

pgsql

PgSqlInput

Bases: SqlInput

Very simple PostgreSQL input adapter

PgSqlOutput

Bases: SqlOutput

Very simple PostgreSQL ouput adapter

make_pgsql_connection(username, password, host, port, database)

Create PostgreSQL connection using SQLAlchemy create_engine

Parameters:

Name Type Description Default
username required
password required
host required
port required
database required

snowflake

SnowflakeInput

Bases: SqlInput

Very simple PostgreSQL input adapter

sql

SqlInput

Bases: InputAdapter

SQL Input adapter

An input adapter for SQL databases, input is read into a pandas DataFrame

SqlOutput

Bases: OutputAdapter

SQL output adapter

Output adapter for SQL databases, a pandas DataFrame is written to a table

utils

DummyInput

Bases: InputAdapter

Dummy input adapter that always returns an empty DataFrame

DummyOutput

Bases: OutputAdapter

Dummy output adapter that prints data it should save