gc3libs.core

Top-level interface to Grid functionality.

class gc3libs.core.Core(cfg)

Core operations: submit, update state, retrieve (a snapshot of) output, cancel job.

Core operations are blocking, i.e., they return only after the operation has successfully completed, or an error has been detected.

Operations are always performed by a Core object. Core implements an overlay Grid on the resources specified in the configuration file.

add(task)

This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.

close()

Used to invoke explicitly the destructor on objects e.g. LRMS

fetch_output(app, download_dir=None, overwrite=False, **extra_args)

Retrieve output into local directory app.output_dir; optional argument download_dir overrides this.

The download directory is created if it does not exist. If it already exists, and the optional argument overwrite is False (default), it is renamed with a .NUMBER suffix and a new empty one is created in its place. Otherwise, if ‘overwrite` is True, files are downloaded over the ones already present.

If the task is in TERMINATING state, the state is changed to TERMINATED, attribute output_dir is set to the absolute path to the directory where files were downloaded, and the terminated transition method is called on the app object.

Task output cannot be retrieved when app.execution is in one of the states NEW or SUBMITTED; an OutputNotAvailableError exception is thrown in these cases.

Raise:gc3libs.exceptions.OutputNotAvailableError if no output can be fetched from the remote job (e.g., the Application/Task object is in NEW or SUBMITTED state, indicating the remote job has not started running).
free(app, **extra_args)

Free up any remote resources used for the execution of app. In particular, this should delete any remote directories and files.

It is an error to call this method if app.execution.state is anything other than TERMINATED: an InvalidOperation exception will be raised in this case.

Raise:gc3libs.exceptions.InvalidOperation if app.execution.state differs from Run.State.TERMINATED.
get_resources(**extra_args)

Return list of resources configured into this Core instance.

kill(app, **extra_args)

Terminate a job.

Terminating a job in RUNNING, SUBMITTED, or STOPPED state entails canceling the job with the remote execution system; terminating a job in the NEW or TERMINATED state is a no-op.

peek(app, what='stdout', offset=0, size=None, **extra_args)

Download size bytes (at offset bytes from the start) from the remote job standard output or error stream, and write them into a local file. Return file-like object from which the downloaded contents can be read.

If size is None (default), then snarf all available contents of the remote stream from offset unto the end.

The only allowed values for the what arguments are the strings ‘stdout’ and ‘stderr’, indicating that the relevant section of the job’s standard output resp. standard error should be downloaded.

remove(task)

This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.

select_resource(match)

Alter the configured list of resources, and retain only those that satisfy predicate match.

Argument match can be:

  • either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
  • or it can be a string: only resources whose name matches (wildcards * and ? are allowed) are retained.
submit(app, resubmit=False, **extra_args)

Submit a job running an instance of the given app. Upon successful submission, call the submitted method on the app object.

At the beginning of the submission process, the app.execution state is reset to NEW; if submission is successfull, the task will be in SUBMITTED or RUNNING state when this call returns.

Raise:gc3libs.exceptions.InputFileError if an input file does not exist or cannot otherwise be read.
update_job_state(*apps, **extra_args)

Update state of all applications passed in as arguments.

If keyword argument update_on_error is False (default), then application execution state is not changed in case a backend error happens; it is changed to UNKNOWN otherwise.

Note that if state of a job changes, the Run.state calls the appropriate handler method on the application/task object.

Raise:gc3libs.exceptions.InvalidArgument in case one of the passed Application or Task objects is invalid. This can stop updating the state of other objects in the argument list.
Raise:gc3libs.exceptions.ConfigurationError if the configuration of this Core object is invalid or otherwise inconsistent (e.g., a resource references a non-existing auth section).
update_resources(**extra_args)

Update the state of resources configured into this Core instance.

Each resource object in the returned list will have its updated attribute set to True if the update operation succeeded, or False if it failed.

class gc3libs.core.Engine(controller, tasks=[], store=None, can_submit=True, can_retrieve=True, max_in_flight=0, max_submitted=0, output_dir=None, fetch_output_overwrites=False)

Submit tasks in a collection, and update their state until a terminal state is reached. Specifically:

  • tasks in NEW state are submitted;
  • the state of tasks in SUBMITTED, RUNNING or STOPPED state is updated;
  • when a task reaches TERMINATED state, its output is downloaded.

The behavior of Engine instances can be further customized by setting the following instance attributes:

can_submit
Boolean value: if False, no task will be submitted.
can_retrieve
Boolean value: if False, no output will ever be retrieved.
max_in_flight
If >0, limit the number of tasks in SUBMITTED or RUNNING state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_in_flight, then no new submissions will be attempted.
max_submitted
If >0, limit the number of tasks in SUBMITTED state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_submitted, then no new submissions will be attempted.
output_dir
Base directory for job output; if not None, each task’s results will be downloaded in a subdirectory named after the task’s permanent_id.
fetch_output_overwrites
Default value to pass as the overwrite argument to Core.fetch_output() when retrieving results of a terminated task.

Any of the above can also be set by passing a keyword argument to the constructor (assume g is a Core instance):

| >>> e = Engine(g, can_submit=False)
| >>> e.can_submit
| False
add(task)

Add task to the list of tasks managed by this Engine. Adding a task that has already been added to this Engine instance results in a no-op.

close()

Call explicilty finalize methods on relevant objects e.g. LRMS

fetch_output(task, output_dir=None, overwrite=False, **extra_args)

Enqueue task for later output retrieval.

Warning

FIXME

The output_dir and overwrite parameters are currently ignored.

free(task, **extra_args)

Proxy for Core.free, which see.

get_resources()

Return list of resources configured into this Core instance.

kill(task, **extra_args)

Schedule a task for killing on the next progress run.

peek(task, what='stdout', offset=0, size=None, **extra_args)

Proxy for Core.peek (which see).

progress()

Update state of all registered tasks and take appropriate action. Specifically:

  • tasks in NEW state are submitted;
  • the state of tasks in SUBMITTED, RUNNING, STOPPED or UNKNOWN state is updated;
  • when a task reaches TERMINATING state, its output is downloaded.
  • tasks in TERMINATED status are simply ignored.

The max_in_flight and max_submitted limits (if >0) are taken into account when attempting submission of tasks.

remove(task)

Remove a task from the list of tasks managed by this Engine.

select_resource(match)

Alter the configured list of resources, and retain only those that satisfy predicate match.

Argument match can be:

  • either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
  • or it can be a string: only resources whose name matches (wildcards * and ? are allowed) are retained.
stats(only=None)

Return a dictionary mapping each state name into the count of tasks in that state. In addition, the following keys are defined:

  • ok: count of TERMINATED tasks with return code 0
  • failed: count of TERMINATED tasks with nonzero return code
  • total: total count of managed tasks, whatever their state

If the optional argument only is not None, tasks whose whose class is not contained in only are ignored. : param tuple only: Restrict counting to tasks of these classes.

submit(task, resubmit=False, **extra_args)

Submit task at the next invocation of perform.

The task state is reset to NEW and then added to the collection of managed tasks.

update_job_state(*tasks, **extra_args)

Return list of current states of the given tasks. States will only be updated at the next invocation of progress; in particular, no state-change handlers are called as a result of calling this method.