API Reference¶
Below we document the interface classes and functions of pypeliner.
Scheduler Object¶
Job scheduling class
-
class
pypeliner.scheduler.Scheduler¶ Job scheduling class for queueing a set of jobs and running those jobs according to their dependencies.
-
run(workflow_def, exec_queue, file_storage, runskip)¶ Run the pipeline
Parameters: - workflow_def – workflow of jobs to be submitted.
- exec_queue – queue to which jobs will be submitted. The queues implemented
in
pypeliner.execqueueshould suffice for most purposes - runskip – callable object returning boolean, used to determine whether to run jobs
Call this function after adding jobs to a workflow using
pypeliner.scheduler.Scheduler.transform()etc. Jobs will be run locally or remotely using the exec_queue provided until completion. On failure, the function will wait for the remaining jobs to finish but will not submit new ones. The first interrupt (control-C) in this function will result in the sessation of new job creation, and the second interrupt will attempt to cleanly cancel all jobs.
-
Managed Objects¶
Interface classes used to describe objects managed by the pipeline system.
Objects of each type can be used as placeholder arguments to pypeliner.scheduler.Scheduler.transform() or
pypeliner.scheduler.Scheduler.commandline(). During pipeline execution, the placeholder argument will be
replaced by the appropriate file or object to create the function or command line arguments.
Managed classes have a common set of parameters:
- name - An identifier for the managed object
- axes - The axes on which the managed object is defined
Axes relate to parallelism. A managed object with an empty list for axes has a single instance in the system. A managed object with a single axis will have as many instances as there are chunks defined for that axis. Axes can also be nested to arbitrary depth.
For example, suppose we are running the same analysis on 2 datasets, thus our first axis is ‘dataset’ with 2 chunks ‘A’
and ‘B’. Each dataset is split by line, thus our second axis is ‘line’. Each dataset may have a different number of
lines, and as such the number of chunks for the ‘line’ axis may be different between dataset ‘A’ and ‘B’. Thus if
dataset ‘A’ has 2 lines nad ‘B’ has 1 line, a managed object defined on the axes ‘dataset’, ‘line’ will have the
following instances: {'dataset':'A', 'line':1}, {'dataset':'A', 'line':2}, {'dataset':'B', 'line':1}.
A managed object will resolve to a function or command line argument dependent on whether it is a regular input/output, merge input or split output. Regular inputs/outputs have the same axes as the job to which they are given. Merge inputs are inputs with a single additional axis, the merge axis. Split outputs are outputs with a single additional axis, the split axis.
-
class
pypeliner.managed.Template(name, *axes, **kwargs)¶ Represents a name templated by axes
Template objects will resolve the specified name templated by the given axes. name should be a format string, with named fields that match the names of the axes.
For instance, Template(‘{case}_details’, ‘case’) will resolve to the strings ‘tumour_details’ and ‘normal_details’ if the case axis has chunks ‘tumour’ and ‘normal’.
Parameters: - name – The format string to be resolved by pypeliner. Each axis should appear at least once as a named field in the format string.
- axes – The axes to use to resolve name.
-
class
pypeliner.managed.TempFile(name, *axes, **kwargs)¶ Interface class used to represent a managed temporary file
-
class
pypeliner.managed.InputFile(name, *axes, **kwargs)¶ Interface class used to represent a user specified managed file input
InputFile objects will resolve the specified name templated by the given axes. name should be a format string, with named fields that match the names of the axes. The modification time of the file will be used to determine if the file has been modified more recently than a job’s outputs, in order to determine if a job must be run.
For instance, InputFile(‘{case}.bam’, ‘case’) will resolve to the strings ‘tumour.bam’ and ‘normal.bam’ if the case axis has chunks ‘tumour’ and ‘normal’.
Parameters: - name – The name of the input file. Each axis should appear at least once as a named field in the filename.
- axes – The axes for the input file.
For a merge input, InputFile will resolve to a dictionary of filenames as specified above, with chunks of the merge axis as keys.
-
class
pypeliner.managed.OutputFile(name, *axes, **kwargs)¶ Interface class used to represent a user specified managed file output
OutputFile objects will resolve the specified filename templated by the given axes. name should be a format string, with named fields that match the names of the axes. An OutputFile of the given name and axes is associated with a single job that creates that file.
For instance, OutputFile(‘{case}.bam’, ‘case’) will resolve to the strings ‘tumour.bam’ and ‘normal.bam’ if the case axis has chunks ‘tumour’ and ‘normal’.
Parameters: - name – The name of the output file. Each axis should appear at least once as a named field in the filename.
- axes – The axes for the output file.
For a split output, OutputFile will resolve to a callback function taking the chunk of the split axis as its only parameter and returning the filename for that chunk.
-
class
pypeliner.managed.TempInputObj(name, *axes, **kwargs)¶ Interface class used to represent a managed object input
TempInputObj objects will resolve to an object managed by the pipeline system. The contents of the object are used for dependency tracking, as described for
pypeliner.managed.TempOutputObj.Parameters: - name – The name of the object.
- axes – The axes for the object.
For a merge input, TempInputObj will resolve to a dictionary of objects with chunks of the merge axis as keys.
-
extract(func)¶ Resolve to the return value of the given function called on the object rather than the object itself.
Parameters: func – The function to be executed on the object. Warning about state
The function provided should not have any state as this state cannot be tracked by the dependency system. Appropriate uses are a lambda function that accesses a dictionary entry or performs a fixed calculation.
-
prop(prop_name)¶ Resolve to a property of the object instead of the object itself.
Parameters: name – The name of the property.
-
class
pypeliner.managed.TempOutputObj(name, *axes, **kwargs)¶ Interface class used to represent a managed object output
TempOutputObj objects are only appropriate as return values for calls to
pypeliner.scheduler.Scheduler.transform(). The object returned by the function executed for a transform job will be stored by the pipeline using pickle.If returning a user specified type, it is advisable to add a __eq__ method. Dependency tracking for objects is done by checking if the object has changed since the last call that created the object, and will call __eq__, or will default to comparing __dict__.
Parameters: - name – The name of the object.
- axes – The axes for the object.
For a split output, the pipeline system expects a dictionary of objects with chunks of the split axis as keys.
-
class
pypeliner.managed.TempInputFile(name, *axes, **kwargs)¶ Interface class used to represent a managed temporary file input
TempInputFile objects will resolve to a filename in the temporary file space of the pipeline. Temporary files are subject to garbage collection.
Parameters: - name – The name of the temporary file, basename only (no path information).
- axes – The axes for the file.
For a merge input, InputFile will resolve to a dictionary of filenames, with chunks of the merge axis as keys.
-
class
pypeliner.managed.TempOutputFile(name, *axes, **kwargs)¶ Interface class used to represent a managed temporary file output
TempOutputFile objects will resolve to a filename in the temporary file space of the pipeline. Temporary files are subject to garbage collection.
Parameters: - name – The name of the temporary file, basename only (no path information).
- axes – The axes for the file.
For a split output, TempOutputFile will resolve to a callback function taking the chunk of the split axis as its only parameter and returning the filename for that chunk.
-
class
pypeliner.managed.InputInstance(axis)¶
-
class
pypeliner.managed.InputChunks(*axes)¶ Interface class used to represent an input chunk list for a specific axis
Parameters: axes – The axes of interest for which to obtain a list of chunks. InputChunks acts similar to a merge. The specified axes should match the axes of its job, with a single additional axis as for a merge. Resolves to a list of chunks for the given ‘merge’ axis.
-
class
pypeliner.managed.OutputChunks(*axes, **kwargs)¶ Interface class used to represent an output that defines the list of chunks for a specific axis
Parameters: axes – The axes for which chunks will be set. OutputChunks acts similar to a split object. OutputChunks objects are only appropriate as return values for calls to
pypeliner.scheduler.Scheduler.transform(). The specified axes should match the axes of its job, with a single additional axis as for a split. The pipeline system expects the job function to return a list, which is then interpreted as the list of chunks for the given ‘split’ axis.
Pypeline Object¶
Pipeline application functionality
Provides classes and functions to help creation of a pipeline application. Using this module, it should be possible to create a pipeline application with the full functionality of pypeliner with only a few lines of code. Typically, the first few lines of a pipeline application would be as follows.
Import pypeliner:
import pypeliner
Create an argparse.ArgumentParser object to handle command line arguments
including a config, then parse the arguments:
argparser = argparse.ArgumentParser()
pypeliner.app.add_arguments(argparser)
argparser.add_argument('arg1', help='Additional argument 1')
argparser.add_argument('arg2', help='Additional argument 2')
args = vars(argparser.parse_args())
Create a pypeliner.app.Pypeline object passing the arguments to the config parameter:
pyp = pypeliner.app.Pypeline(config=args)
Create a workflow and run:
workflow = pypeliner.workflow.Workflow()
workflow.transform(...)
workflow.commandline(...)
pyp.run(workflow)
The following options are supported via the config dictionary argument to
pypeliner.app.Pypeline or as command line arguments
by calling pypeliner.app.add_arguments() on an
argparse.ArgumentParser object and passing the argument dictionary to
pypeliner.app.Pypeline.
- tmpdir
- Location of pypeliner temporary files.
- pipelinedir
- Location of pypeliner database, logs.
- loglevel
- Logging level for console messages. Valid logging levels are:
- CRITICAL
- ERROR
- WARNING
- INFO
- DEBUG
- submit
- Type of exec queue to use for job submission. If available, a default value will be obtained from the $DEFAULT_SUBMITQUEUE environment variable.
- nativespec
- Required for qsub based job submission, specifies how to request memory from the cluster system. Nativespec is treated as a python style format string and uses the job’s context to resolve named. If available, a default value will be obtained from the $DEFAULT_NATIVESPEC environment variable.
- maxjobs
- The maximum number of jobs to execute in parallel, either on a cluster or using subprocess to create multiple processes.
- repopulate
- Recreate all temporary files that may have been cleaned up during a previous run in which garbage collection was enabled. Files may be subsequently garbage collected after creation depending on the nocleanup option.
- nocleanup
- Do not clean up temporary files. Does not recreate files that were garbage collected in a previous run of the pipeline.
- interactive
- Run the pipeline in interactive mode, prompting at each step as to whether the job should be rerun.
- sentinel_only
- Run the pipeline in sentinal only mode, no time stamp checking on files for out of date status of jobs, rerun jobs based on whether they have already been run.
- context_config
- Run jobs within a specific type of container, either docker or singularity. config contains credentials for docker or path to dir with singularity containers
-
pypeliner.app.add_arguments(argparser)¶ Add pypeliner arguments to an argparser object
Parameters: argparser – argparse.ArgumentParserobject to which pypeliner specific arguments should be added. Seepypeliner.appfor available options.Add arguments to an
argparse.ArgumentParserobject to allow command line control of pypeliner. The options should be extracted after callingargparse.ArgumentParser.parse_args, converted to a dictionary using vars and provided to the initializer of apypeliner.app.Pypelineobject.
-
class
pypeliner.app.Pypeline(modules=(), config=None)¶ Pipeline application helper class
Parameters: - modules – list of modules pypeliner must import before running functions for this pipeline. These modules will be imported prior to calling any of the user functions added to the pipeline.
- config – dictionary of configuration options. See
pypeliner.appfor available options.
The Pipeline class sets up logging, creates an execution queue, and creates the scheduler with options provided by the config argument.
Command Line Helper¶
-
class
pypeliner.commandline.Callable(func, args, kwargs)¶ callable functions and args for pypeliner_delegate for running in docker containers
Parameters: - func – function to run in docker
- args – arguments
-
exception
pypeliner.commandline.CommandLineException(args, command, returncode)¶ A command produced a non-zero exit code.
Parameters: - args – full set of arguments in failed command line
- command – command that failed
- returncode – exit code of failed command
-
exception
pypeliner.commandline.CommandNotFoundException(args, command)¶ A command was not found on the path.
Parameters: - args – full set of arguments in failed command line
- command – command that could not be found
-
pypeliner.commandline.execute(*args)¶ Execute a command line
Parameters: - args – executable and command line arguments
- kwargs – container keyword arguments
Execute a command line, and handle pipes between processes and to files. The ‘|’, ‘>’ and ‘<’ characters are interpretted as pipes between processes, to files and from files in the same way as in bash. Each process is checked for successful completion, with a meaningful exception thrown in the case of an error.
Raises: CommandLineException,CommandNotFoundException