gc3libs.workflow

Implementation of task collections.

Tasks can be grouped into collections, which are tasks themselves, therefore can be controlled (started/stopped/cancelled) like a single whole. Collection classes provided in this module implement the basic patterns of job group execution; they can be combined to form more complex workflows. Hook methods are provided so that derived classes can implement problem-specific job control policies.

class gc3libs.workflow.AbortOnError

Mix-in class to make a SequentialTaskCollection turn to TERMINATED state as soon as one of the tasks fail.

A second effect of mixing this class in is that the self.execution.returncode mirrors the return code of the last finished task.

Note

For the mix-in to take effect, this class should be listed before the base task collection class, e.g.:

# this works
class MyTaskCollection(AbortOnError, SequentialTaskCollection):
  pass

# this *does not* work
class MyOtherTaskCollection(SequentialTaskCollection, AbortOnError):
  pass

See SequentialTaskCollection.next() and GitHub issue #512 for some caveats on applying this to dynamically-built task collections.

class gc3libs.workflow.ChunkedParameterSweep(min_value, max_value, step, chunk_size, **extra_args)
new_task(param, **extra_args)

Return the Task corresponding to the parameter value param.

This method must be overridden in subclasses to generate tasks.

update_state(**extra_args)

Like ParallelTaskCollection.update_state(), but also creates new tasks if less than chunk_size are running.

class gc3libs.workflow.DependentTaskCollection(tasks=None, **extra_args)

Run a set of tasks, respecting inter-dependencies between them.

Each task can list a number of tasks that need to be run before it; upon submission, a DependentTaskCollection creates a direct acyclic graph from that dependency information and ensures that no task is run before its dependencies have been successfully executed.

The collection state is set to TERMINATED once all tasks have reached the same terminal status.

add(task, after=None)

Add a task to the collection.

The task will be run after any tasks referenced in the after sequence have terminated their run. Alternatively, a task can list tasks it depends upon in its .after attribute; i.e., the following two syntaxes are equivalent:

>>> coll.add(task1, after=[task2])

>>> task1.after = [task2]
>>> coll.add(task1)

Note: tasks can only be added to a DependentTaskCollection while it’s in state NEW.

submit(resubmit=False, targets=None, **extra_args)

Start the current task in the collection.

class gc3libs.workflow.ParallelTaskCollection(tasks=None, **extra_args)

A ParallelTaskCollection runs all of its tasks concurrently.

The collection state is set to TERMINATED once all tasks have reached the same terminal status.

add(task)

Add a task to the collection.

attach(controller)

Use the given Controller interface for operations on the job associated with this task.

kill(**extra_args)

Terminate all tasks in the collection, and set collection state to TERMINATED.

progress()

Try to advance all jobs in the collection to the next state in a normal lifecycle.

redo(*args, **kwargs)

Reset collection and all included tasks to state NEW.

If not all included tasks should are in a terminal state or NEW, an AssertionError exception will be thrown. See also Task.redo() for a listing of allowed run states when redo() is called.

submit(resubmit=False, targets=None, **extra_args)

Start all tasks in the collection.

terminated()

Set exitcode based on termination status of sub-tasks.

update_state(**extra_args)

Update state of all tasks in the collection.

class gc3libs.workflow.RetryableTask(task, max_retries=0, **extra_args)

Wrap a Task instance and re-submit it until a specified termination condition is met.

By default, the re-submission upon failure happens iff execution terminated with nonzero return code; the failed task is retried up to self.max_retries times (indefinitely if self.max_retries is 0).

Override the retry method to implement a different retryal policy.

Note: The resubmission code is implemented in the terminated(), so be sure to call it if you override in derived classes.

attach(controller)

Use the given Grid interface for operations on the job associated with this task.

changed

Evaluates to True if this task or any of its subtasks has been modified and should be saved to persistent storage.

detach()

Remove any reference to the current grid interface. After this, calling any method other than attach() results in an exception TaskDetachedFromControllerError being thrown.

fetch_output(*args, **extra_args)

Retrieve the outputs of the computational job associated with this task into directory output_dir, or, if that is None, into the directory whose path is stored in instance attribute .output_dir.

If the execution state is TERMINATING, transition the state to TERMINATED (which runs the appropriate hook).

See gc3libs.Core.fetch_output() for a full explanation.

Returns:Path to the directory where the job output has been collected.
free(**extra_args)

Release any remote resources associated with this task.

See gc3libs.Core.free() for a full explanation.

kill(**extra_args)

Terminate the computational job associated with this task.

See gc3libs.Core.kill() for a full explanation.

peek(*args, **extra_args)

Download size bytes (at offset offset from the start) from the associated job standard output or error stream, and write them into a local file. Return a file-like object from which the downloaded contents can be read.

See gc3libs.Core.peek() for a full explanation.

retry()

Return True or False, depending on whether the failed task should be re-submitted or not.

The default behavior is to retry a task iff its execution terminated with nonzero returncode and the maximum retry limit has not been reached. If self.max_retries is 0, then the dependent task is retried indefinitely.

Override this method in subclasses to implement a different policy.

submit(resubmit=False, targets=None, **extra_args)

Start the computational job associated with this Task instance.

update_state()

Update the state of the dependent task, then resubmit it if it’s TERMINATED and self.retry() is True.

class gc3libs.workflow.SequentialTaskCollection(tasks, **extra_args)

A SequentialTaskCollection runs its tasks one at a time.

After a task has completed, the next method is called with the index of the finished task in the self.tasks list; the return value of the next method is then made the collection execution.state. If the returned state is RUNNING, then the subsequent task is started, otherwise no action is performed.

The default next implementation just runs the tasks in the order they were given to the constructor, and sets the state to TERMINATED when all tasks have been run.

add(task)

Add a task to the collection.

attach(controller)

Use the given Controller interface for operations on the job associated with this task.

kill(**extra_args)

Stop execution of this sequence. Kill currently-running task (if any), then set collection state to TERMINATED.

next(done)

Return collection state or task to run after step number done is terminated.

This method is called when a task is finished; the done argument contains the index number of the just-finished task into the self.tasks list. In other words, the task that just completed is available as self.tasks[done].

The return value from next can be either a task state (i.e., an instance of Run.State), or a valid index number for self.tasks. In the first case:

  • if the return value is Run.State.TERMINATED, then no other jobs will be run;
  • otherwise, the return value is assigned to execution.state and the next job in the self.tasks list is executed.

If instead the return value is a (nonnegative) number, then tasks in the sequence will be re-run starting from that index.

The default implementation runs tasks in the order they were given to the constructor, and sets the state to TERMINATED when all tasks have been run. This method can (and should) be overridden in derived classes to implement policies for serial job execution.

progress()

Advance the associated job through all states of a regular lifecycle. In detail:

  1. If execution.state is NEW, the associated job is started.
  2. The state is updated until it reaches TERMINATED
  3. Output is collected and the final returncode is returned.

An exception TaskError is raised if the job hits state STOPPED or UNKNOWN during an update in phase 2.

When the job reaches TERMINATING state, the output is retrieved; if this operation is successfull, state is advanced to TERMINATED.

Once the job reaches TERMINATED state, the return code (stored also in .returncode) is returned; if the job is not yet in TERMINATED state, calling progress returns None.

Raises:exception UnexpectedStateError if the associated job goes into state STOPPED or UNKNOWN
Returns:final returncode, or None if the execution state is not TERMINATED.
redo(from_stage=0, *args, **kwargs)

Rewind the sequence to a given stage and reset its state to NEW.

In addition, when called with argument from_stage set to the total number of tasks in the collection, will try continuing the sequence by (ultimately) calling self.next() to get a new task.

stage()

Return the Task that is currently executing, or None (if finished or not yet started).

submit(resubmit=False, targets=None, **extra_args)

Start the current task in the collection.

update_state(**extra_args)

Update state of the collection, based on the jobs’ statuses.

class gc3libs.workflow.StagedTaskCollection(**extra_args)

Simplified interface for creating a sequence of Tasks. This can be used when the number of Tasks to run is fixed and known at program writing time.

A StagedTaskCollection subclass should define methods stage0, stage1, … up to stageN (for some arbitrary value of N positive integer). Each of these stageN must return a Task instance; the task returned by the stage0 method will be executed first, followed by the task returned by stage1, and so on. The sequence stops at the first N such that stageN is not defined.

The exit status of the whole sequence is the exit status of the last Task instance run. However, if any of the stageN methods returns an integer value instead of a Task instance, then the sequence stops and that number is used as the sequence exit code.

next(done)

Return collection state or task to run after step number done is terminated.

This method is called when a task is finished; the done argument contains the index number of the just-finished task into the self.tasks list. In other words, the task that just completed is available as self.tasks[done].

The return value from next can be either a task state (i.e., an instance of Run.State), or a valid index number for self.tasks. In the first case:

  • if the return value is Run.State.TERMINATED, then no other jobs will be run;
  • otherwise, the return value is assigned to execution.state and the next job in the self.tasks list is executed.

If instead the return value is a (nonnegative) number, then tasks in the sequence will be re-run starting from that index.

The default implementation runs tasks in the order they were given to the constructor, and sets the state to TERMINATED when all tasks have been run. This method can (and should) be overridden in derived classes to implement policies for serial job execution.

class gc3libs.workflow.StopOnError

Mix-in class to make a SequentialTaskCollection turn to STOPPED state as soon as one of the tasks fail.

A second effect of mixing this class in is that the self.execution.returncode mirrors the return code of the last finished task.

Note

For the mix-in to take effect, this class should be listed before the base task collection class, e.g.:

# this works
class MyTaskCollection(StopOnError, SequentialTaskCollection):
  pass

# this *does not* work
class MyOtherTaskCollection(SequentialTaskCollection, StopOnError):
  pass

See SequentialTaskCollection.next() and GitHub issue #512 for some caveats on applying this to dynamically-built task collections.

class gc3libs.workflow.TaskCollection(tasks=None, **extra_args)

Base class for all task collections. A “task collection” is a group of tasks, that can be managed collectively as a single one.

A task collection implements the same interface as the Task class, so you can use a TaskCollection everywhere a Task is required. A task collection has a state attribute, which is an instance of gc3libs.Run.State; each concrete collection class decides how to deduce a collective state based on the individual task states.

add(task)

Add a task to the collection.

attach(controller)

Use the given Controller interface for operations on the job associated with this task.

changed

Evaluates to True if this task or any of its subtasks has been modified and should be saved to persistent storage.

detach()

Remove any reference to the current grid interface. After this, calling any method other than attach() results in an exception TaskDetachedFromControllerError being thrown.

fetch_output(output_dir=None, overwrite=False, changed_only=True, **extra_args)

Retrieve the outputs of the computational job associated with this task into directory output_dir, or, if that is None, into the directory whose path is stored in instance attribute .output_dir.

If the execution state is TERMINATING, transition the state to TERMINATED (which runs the appropriate hook).

See gc3libs.Core.fetch_output() for a full explanation.

Returns:Path to the directory where the job output has been collected.
free()

This method just asks the Engine to free the contained tasks.

iter_tasks()

Iterate over non-collection tasks enclosed in this collection.

iter_workflow()

Returns an iterator that will traverse the whole tree of tasks.

kill(**extra_args)

Terminate the computational job associated with this task.

See gc3libs.Core.kill() for a full explanation.

peek(what, offset=0, size=None, **extra_args)

Raise a gc3libs.exceptions.InvalidOperation error, as there is no meaningful semantics that can be defined for peek into a generic collection of tasks.

remove(task)

Remove a task from the collection.

stats(only=None)

Return a dictionary mapping each state name into the count of tasks in that state. In addition, the following keys are defined:

  • ok: count of TERMINATED tasks with return code 0
  • failed: count of TERMINATED tasks with nonzero return code
  • total: count of managed tasks, whatever their state

If the optional argument only is not None, tasks whose class is not contained in only are ignored.

Parameters:only (tuple) – Restrict counting to tasks of these classes.
submit(resubmit=False, targets=None, **extra_args)

Start the computational job associated with this Task instance.

terminated()

Called when the job state transitions to TERMINATED, i.e., the job has finished execution (with whatever exit status, see returncode) and the final output has been retrieved.

Default implementation for TaskCollection is to set the exitcode to the maximum of the exit codes of its tasks, or None if no task has a numeric exit code.

If no tasks were run, the exitcode is set to 0.

update_state(**extra_args)

Update the running state of all managed tasks.