gc3libs.core

Top-level classes for task execution and control.

class gc3libs.core.Core(cfg, matchmaker=<gc3libs.core.MatchMaker object>, resource_errors_are_fatal=None)

Core operations: submit, update state, retrieve (a snapshot of) output, cancel job.

Core operations are blocking, i.e., they return only after the operation has successfully completed, or an error has been detected.

Operations are always performed by a Core object. Core implements an overlay Grid on the resources specified in the configuration file.

Initialization of a Core instance also initializes all resources in the passed Configuration instance. By default, GC3Pie’s Core objects will ignore errors in initializing resources, and only raise an exception if no resources can be initialized. This can be changed by either passing an optional argument resource_errors_are_fatal=True, or by setting the environmental variable GC3PIE_RESOURCE_INIT_ERRORS_ARE_FATAL to yes or 1.

add(task)

This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.

close()

Used to invoke explicitly the destructor on objects e.g. LRMS

fetch_output(app, download_dir=None, overwrite=False, changed_only=True, **extra_args)

Retrieve output into local directory app.output_dir.

If the task is not expected to produce any output (i.e., app.would_output == False) then the only effect of this is to advance the state of TERMINATING tasks to TERMINATED.

Optional argument download_dir overrides the download location.

The download directory is created if it does not exist. If it already exists, and the optional argument overwrite is False (default), it is renamed with a .NUMBER suffix and a new empty one is created in its place. Otherwise, if ‘overwrite` is True, files are downloaded over the ones already present; in this case, the changed_only argument controls which files are overwritten:

  • if changed_only is True (default), then only files for which the source has a different size or has been modified more recently than the destination are copied;
  • if changed_only is False, then all files in source will be copied into destination, unconditionally.

Source files that do not exist at destination will be copied, independently of the overwrite and changed_only settings.

If the task is in TERMINATING state, the state is changed to TERMINATED, attribute output_dir is set to the absolute path to the directory where files were downloaded, and the terminated transition method is called on the app object.

Task output cannot be retrieved when app.execution is in one of the states NEW or SUBMITTED; an OutputNotAvailableError exception is thrown in these cases.

Raise:gc3libs.exceptions.OutputNotAvailableError if no output can be fetched from the remote job (e.g., the Application/Task object is in NEW or SUBMITTED state, indicating the remote job has not started running).
free(app, **extra_args)

Free up any remote resources used for the execution of app. In particular, this should delete any remote directories and files.

It is an error to call this method if app.execution.state is anything other than TERMINATED: an InvalidOperation exception will be raised in this case.

Raise:gc3libs.exceptions.InvalidOperation if app.execution.state differs from Run.State.TERMINATED.
get_resources(**extra_args)

Return list of resources configured into this Core instance.

kill(app, **extra_args)

Terminate a job.

Terminating a job in RUNNING, SUBMITTED, or STOPPED state entails canceling the job with the remote execution system; terminating a job in the NEW or TERMINATED state is a no-op.

peek(app, what='stdout', offset=0, size=None, **extra_args)

Download size bytes (at offset bytes from the start) from the remote job standard output or error stream, and write them into a local file. Return file-like object from which the downloaded contents can be read.

If size is None (default), then snarf all available contents of the remote stream from offset unto the end.

The only allowed values for the what arguments are the strings ‘stdout’ and ‘stderr’, indicating that the relevant section of the job’s standard output resp. standard error should be downloaded.

remove(task)

This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.

select_resource(match)

Disable resources that do not satisfy predicate match. Return number of enabled resources.

Argument match can be:

  • either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
  • or it can be a string: only resources whose name matches (wildcards * and ? are allowed) are retained.

Note

Calling this method modifies the configured list of resources in-place.

submit(app, resubmit=False, targets=None, **extra_args)

Submit a job running an instance of the given task app.

Upon successful submission, call the submitted method on the app object. If targets are given, submission of the task is attempted to the resources in the order given; the submit method returns after the first successful attempt. If targets is None (default), a brokering procedure is run to determine the best resource among the configured ones.

At the beginning of the submission process, the app.execution state is reset to NEW; if submission is successful, the task will be in SUBMITTED or RUNNING state when this call returns.

Raise:

gc3libs.exceptions.InputFileError if an input file does not exist or cannot otherwise be read.

Parameters:
  • app (Task) – A GC3Pie Task instance to be submitted.
  • resubmit – If True, submit task regardless of its execution state; if False (default), submission is a no-op if task is not in NEW state.
  • targets – A list of Resource`s to submit the task to; resources are tried in the order given. If ``None` (default), perform brokering among all the configured resources.
update_job_state(*apps, **extra_args)

Update state of all applications passed in as arguments.

If keyword argument update_on_error is False (default), then application execution state is not changed in case a backend error happens; it is changed to UNKNOWN otherwise.

Note that if state of a job changes, the Run.state calls the appropriate handler method on the application/task object.

Raise:gc3libs.exceptions.InvalidArgument in case one of the passed Application or Task objects is invalid. This can stop updating the state of other objects in the argument list.
Raise:gc3libs.exceptions.ConfigurationError if the configuration of this Core object is invalid or otherwise inconsistent (e.g., a resource references a non-existing auth section).
update_resources(resources=<built-in function all>, **extra_args)

Update the state of a given set of resources.

Each resource object in the returned list will have its updated attribute set to True if the update operation succeeded, or False if it failed.

Optional argument resources should be a subset of the resources configured in this Core instance (the actual Lrms objects, not the resource names). By default, all configured resources are updated.

class gc3libs.core.Engine(controller, tasks=[], store=None, can_submit=True, can_retrieve=True, max_in_flight=0, max_submitted=0, output_dir=None, scheduler=<gc3libs.core.scheduler object>, retrieve_running=False, retrieve_overwrites=False, retrieve_changed_only=True, forget_terminated=False)

Submit tasks in a collection, and update their state until a terminal state is reached. Specifically:

  • tasks in NEW state are submitted;
  • the state of tasks in SUBMITTED, RUNNING or STOPPED state is updated;
  • when a task reaches TERMINATED state, its output is downloaded.

The behavior of Engine instances can be further customized by setting the following instance attributes:

can_submit
Boolean value: if False, no task will be submitted.
can_retrieve
Boolean value: if False, no output will ever be retrieved.
max_in_flight
If >0, limit the number of tasks in SUBMITTED or RUNNING state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_in_flight, then no new submissions will be attempted.
max_submitted
If >0, limit the number of tasks in SUBMITTED state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_submitted, then no new submissions will be attempted.
output_dir
Base directory for job output; if not None, each task’s results will be downloaded in a subdirectory named after the task’s permanent_id.
scheduler
A factory function for creating objects that conform to the Scheduler interface to control task submission; see the Scheduler documentation for details. The default value implements a first-come first-serve algorithm: tasks are submitted in the order they have been added to the Engine.
retrieve_running
If True, snapshot output from RUNNING jobs at every invocation of progress()
retrieve_overwrites
If True, overwrite files in the output directory of any job (as opposed to moving destination away and downloading a fresh copy). See Core.fetch_output() for details.
retrieve_changed_only
If both this and overwrite are True, then only changed files are downloaded. See Core.fetch_output() for details.
forget_terminated

When True, Engine.remove() is automatically called on tasks when their state turns to TERMINATED.

Warning

For historical reasons, the default for this option is False but this can (and should!) be changed in future releases.

Any of the above can also be set by passing a keyword argument to the constructor (assume g is a Core instance):

| >>> e = Engine(g, can_submit=False)
| >>> e.can_submit
| False
add(task)

Add task to the list of tasks managed by this Engine. Adding a task that has already been added to this Engine instance results in a no-op.

close()

Call explicilty finalize methods on relevant objects e.g. LRMS

fetch_output(task, output_dir=None, overwrite=False, changed_only=True, **extra_args)

Enqueue task for later output retrieval.

Warning

FIXME

The output_dir, overwrite, and changed_only parameters are currently ignored.

find_task_by_id(task_id)

Return the task with the given persistent ID added to this Engine instance. If no task has that ID, raise a KeyError.

free(task, **extra_args)

Proxy for Core.free, which see.

get_resources()

Return list of resources configured into this Core instance.

kill(task, **extra_args)

Schedule a task for killing on the next progress run.

peek(task, what='stdout', offset=0, size=None, **extra_args)

Proxy for Core.peek (which see).

progress()

Update state of all registered tasks and take appropriate action. Specifically:

  • tasks in NEW state are submitted;
  • the state of tasks in SUBMITTED, RUNNING, STOPPED or UNKNOWN state is updated;
  • when a task reaches TERMINATING state, its output is downloaded.
  • tasks in TERMINATED status are simply ignored.

The max_in_flight and max_submitted limits (if >0) are taken into account when attempting submission of tasks.

redo(task, *args, **kwargs)

Reset task’s state to NEW so that it will be re-run.

Any additional arguments will be forwarded to the task’s own .redo() method; this is useful, e.g., to perform partial re-runs of SequentialTaskCollection instances.

remove(task)

Remove a task from the list of tasks managed by this Engine.

select_resource(match)

Disable resources that do not satisfy predicate match. Return number of enabled resources.

Argument match can be:

  • either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
  • or it can be a string: only resources whose name matches (wildcards * and ? are allowed) are retained.

Note

Calling this method modifies the configured list of resources in-place.

stats(only=None)

Return a dictionary mapping each state name into the count of tasks in that state. In addition, the following keys are defined:

  • ok: count of TERMINATED tasks with return code 0
  • failed: count of TERMINATED tasks with nonzero return code
  • total: total count of managed tasks, whatever their state

If the optional argument only is not None, tasks whose whose class is not contained in only are ignored. : param tuple only: Restrict counting to tasks of these classes.

submit(task, resubmit=False, targets=None, **extra_args)

Submit task at the next invocation of progress.

The task state is reset using the task’s own method .redo(), and then the task added to the collection of managed tasks. Note that the use of redo() implies that only tasks in a terminal state can be resubmitted!

The targets argument is only present for interface compatiblity with Core.submit() but is otherwise ignored.

update_job_state(*tasks, **extra_args)

Return list of current states of the given tasks. States will only be updated at the next invocation of progress; in particular, no state-change handlers are called as a result of calling this method.

class gc3libs.core.MatchMaker

Select and sort resources for attempting submission of a Task.

A match-making algorithm must implement two methods:

  • filter: given a task and a list of resources, return the list of resources that the given task could be submitted to.
  • rank: given a task and a list of resources, return a list of resources sorted in preference order, i.e., submission of the given task will be attempted to the first returned resource, then the next one, etc.

This class implements the default match-making algorithm in GC3Pie, which operates as follows:

  • filter phase: if task has a compatible_resources method (as instances of Application do), retain only those resources where it evaluates to True. Otherwise, return the resources list unchanged.
  • rank phase: sort resources according to the task’s rank_resources method, or retain the given order if task does not define such method.
filter(task, resources)

Return the subset of resources to which task could be submitted to.

Note that the result subset could be empty (no resource can accomodate task’s requirements).

The default implementation uses the task’s compatible_resources method to retain only the resources that satisfy the task’s requirements. If task does not provide such a method, the resource list is returned unchanged.

rank(task, resources)

Sort the list of resources in the preferred order for submitting task.

Unless overridden in a derived class, this calls the task’s rank_resources method to sort the list. If the task does not provide such a method, the resources list is returned unchanged.

class gc3libs.core.Scheduler(tasks, resources)

Instances of the Scheduler class are used in Engine.progress() to determine what tasks (among those in Run.State.NEW state) are to be submitted.

A Scheduler object must implement both the context protocol and the iterator protocol.

The way a Scheduler instance is actually used within Engine is as follows:

  1. A Scheduler instance is created, passing it two arguments: a list of tasks in NEW state, and a dictionary of configured resources (keys are resource names, values are actual resource objects).
  2. When a new submission cycle starts, the __enter__() method is called.
  3. The Engine iterates by repeatedly calling the next() method to receive tasks to be submitted. The send() and throw() methods are used to notify the scheduler of the outcome of the submission attempt.
  4. When the submission cycle ends, the __exit__() method is called.

The Scheduler.schedule generator is the heart of the submission process and has basically complete control over it. It is initialized with the list of tasks in NEW state, and the list of configured resources. The next() method should yield pairs (task index, resource name), where the task index is the position of the task to be submitted next in the given list, and –similarly– the resource name is the name of the resource to which the task should be submitted.

For each pair yielded, submission of that task to the selected resource is attempted; the state of the task object after submission is sent back (via the send() method) to the Scheduler instance; if an exception is raised, that exception is thrown (via the throw() method) into the scheduler object instead. Submission stops when the next() call raises a StopIteration exception.

class gc3libs.core.scheduler(fn)

Decorate a generator function for use as a Scheduler object.