gc3libs.core¶
Top-level interface to Grid functionality.
-
class
gc3libs.core.
Core
(cfg, matchmaker=<gc3libs.core.MatchMaker object>)¶ Core operations: submit, update state, retrieve (a snapshot of) output, cancel job.
Core operations are blocking, i.e., they return only after the operation has successfully completed, or an error has been detected.
Operations are always performed by a Core object. Core implements an overlay Grid on the resources specified in the configuration file.
-
add
(task)¶ This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.
-
close
()¶ Used to invoke explicitly the destructor on objects e.g. LRMS
-
fetch_output
(app, download_dir=None, overwrite=False, changed_only=True, **extra_args)¶ Retrieve output into local directory app.output_dir.
If the task is not expected to produce any output (i.e., app.would_output == False) then the only effect of this is to advance the state of
TERMINATING
tasks toTERMINATED
.Optional argument download_dir overrides the download location.
The download directory is created if it does not exist. If it already exists, and the optional argument overwrite is
False
(default), it is renamed with a .NUMBER suffix and a new empty one is created in its place. Otherwise, if ‘overwrite` isTrue
, files are downloaded over the ones already present; in this case, the changed_only argument controls which files are overwritten:- if changed_only is
True
(default), then only files for which the source has a different size or has been modified more recently than the destination are copied; - if changed_only is
False
, then all files in source will be copied into destination, unconditionally.
Source files that do not exist at destination will be copied, independently of the overwrite and changed_only settings.
If the task is in TERMINATING state, the state is changed to TERMINATED, attribute
output_dir
is set to the absolute path to the directory where files were downloaded, and the terminated transition method is called on the app object.Task output cannot be retrieved when app.execution is in one of the states NEW or SUBMITTED; an OutputNotAvailableError exception is thrown in these cases.
Raise: gc3libs.exceptions.OutputNotAvailableError if no output can be fetched from the remote job (e.g., the Application/Task object is in NEW or SUBMITTED state, indicating the remote job has not started running). - if changed_only is
-
free
(app, **extra_args)¶ Free up any remote resources used for the execution of app. In particular, this should delete any remote directories and files.
It is an error to call this method if app.execution.state is anything other than TERMINATED: an InvalidOperation exception will be raised in this case.
Raise: gc3libs.exceptions.InvalidOperation if app.execution.state differs from Run.State.TERMINATED.
-
get_resources
(**extra_args)¶ Return list of resources configured into this Core instance.
-
kill
(app, **extra_args)¶ Terminate a job.
Terminating a job in RUNNING, SUBMITTED, or STOPPED state entails canceling the job with the remote execution system; terminating a job in the NEW or TERMINATED state is a no-op.
-
peek
(app, what='stdout', offset=0, size=None, **extra_args)¶ Download size bytes (at offset bytes from the start) from the remote job standard output or error stream, and write them into a local file. Return file-like object from which the downloaded contents can be read.
If size is None (default), then snarf all available contents of the remote stream from offset unto the end.
The only allowed values for the what arguments are the strings ‘stdout’ and ‘stderr’, indicating that the relevant section of the job’s standard output resp. standard error should be downloaded.
-
remove
(task)¶ This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.
-
select_resource
(match)¶ Alter the configured list of resources, and retain only those that satisfy predicate match. Return number of enabled resources.
Argument match can be:
- either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
- or it can be a string: only resources whose name matches
(wildcards
*
and?
are allowed) are retained.
-
submit
(app, resubmit=False, targets=None, **extra_args)¶ Submit a job running an instance of the given task app.
Upon successful submission, call the submitted method on the app object. If targets are given, submission of the task is attempted to the resources in the order given; the submit method returns after the first successful attempt. If targets is
None
(default), a brokering procedure is run to determine the best resource among the configured ones.At the beginning of the submission process, the app.execution state is reset to
NEW
; if submission is successful, the task will be inSUBMITTED
orRUNNING
state when this call returns.Raise: gc3libs.exceptions.InputFileError if an input file does not exist or cannot otherwise be read.
Parameters: - app (Task) – A GC3Pie
Task
instance to be submitted. - resubmit – If
True
, submit task regardless of its execution state; ifFalse
(default), submission is a no-op if task is not inNEW
state. - targets (list) – A list of Resource`s to submit the task to; resources are tried in the order given. If ``None` (default), perform brokering among all the configured resources.
- app (Task) – A GC3Pie
-
update_job_state
(*apps, **extra_args)¶ Update state of all applications passed in as arguments.
If keyword argument update_on_error is False (default), then application execution state is not changed in case a backend error happens; it is changed to UNKNOWN otherwise.
Note that if state of a job changes, the Run.state calls the appropriate handler method on the application/task object.
Raise: gc3libs.exceptions.InvalidArgument in case one of the passed Application or Task objects is invalid. This can stop updating the state of other objects in the argument list. Raise: gc3libs.exceptions.ConfigurationError if the configuration of this Core object is invalid or otherwise inconsistent (e.g., a resource references a non-existing auth section).
-
update_resources
(**extra_args)¶ Update the state of resources configured into this Core instance.
Each resource object in the returned list will have its updated attribute set to True if the update operation succeeded, or False if it failed.
-
-
class
gc3libs.core.
Engine
(controller, tasks=[], store=None, can_submit=True, can_retrieve=True, max_in_flight=0, max_submitted=0, output_dir=None, fetch_output_overwrites=False, scheduler=<gc3libs.core.scheduler object>, retrieve_running=False, retrieve_overwrites=False, retrieve_changed_only=True)¶ Submit tasks in a collection, and update their state until a terminal state is reached. Specifically:
- tasks in NEW state are submitted;
- the state of tasks in SUBMITTED, RUNNING or STOPPED state is updated;
- when a task reaches TERMINATED state, its output is downloaded.
The behavior of Engine instances can be further customized by setting the following instance attributes:
- can_submit
- Boolean value: if False, no task will be submitted.
- can_retrieve
- Boolean value: if False, no output will ever be retrieved.
- max_in_flight
- If >0, limit the number of tasks in SUBMITTED or RUNNING state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_in_flight, then no new submissions will be attempted.
- max_submitted
- If >0, limit the number of tasks in SUBMITTED state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_submitted, then no new submissions will be attempted.
- output_dir
- Base directory for job output; if not None, each task’s results will be downloaded in a subdirectory named after the task’s permanent_id.
- fetch_output_overwrites
- Default value to pass as the overwrite argument to
Core.fetch_output()
when retrieving results of a terminated task. - scheduler
- A factory function for creating objects that conform to the
Scheduler interface to control task submission; see the
Scheduler
documentation for details. The default value implements a first-come first-serve algorithm: tasks are submitted in the order they have been added to the Engine. - retrieve_running
- If
True
, snapshot output from RUNNING jobs at every invocation ofprogress()
- retrieve_overwrites
- If
True
, overwrite files in the output directory of any job (as opposed to moving destination away and downloading a fresh copy). SeeCore.fetch_output()
for details. - retrieve_changed_only
- If both this and overwrite are
True
, then only changed files are downloaded. SeeCore.fetch_output()
for details.
Any of the above can also be set by passing a keyword argument to the constructor (assume
g
is aCore
instance):| >>> e = Engine(g, can_submit=False) | >>> e.can_submit | False
-
add
(task)¶ Add task to the list of tasks managed by this Engine. Adding a task that has already been added to this Engine instance results in a no-op.
-
close
()¶ Call explicilty finalize methods on relevant objects e.g. LRMS
-
fetch_output
(task, output_dir=None, overwrite=False, changed_only=True, **extra_args)¶ Enqueue task for later output retrieval.
Warning
FIXME
The output_dir, overwrite, and changed_only parameters are currently ignored.
-
free
(task, **extra_args)¶ Proxy for Core.free, which see.
-
get_resources
()¶ Return list of resources configured into this Core instance.
-
kill
(task, **extra_args)¶ Schedule a task for killing on the next progress run.
-
peek
(task, what='stdout', offset=0, size=None, **extra_args)¶ Proxy for Core.peek (which see).
-
progress
()¶ Update state of all registered tasks and take appropriate action. Specifically:
- tasks in NEW state are submitted;
- the state of tasks in SUBMITTED, RUNNING, STOPPED or UNKNOWN state is updated;
- when a task reaches TERMINATING state, its output is downloaded.
- tasks in TERMINATED status are simply ignored.
The max_in_flight and max_submitted limits (if >0) are taken into account when attempting submission of tasks.
-
remove
(task)¶ Remove a task from the list of tasks managed by this Engine.
-
select_resource
(match)¶ Alter the configured list of resources, and retain only those that satisfy predicate match. Return number of enabled resources.
Argument match can be:
- either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
- or it can be a string: only resources whose name matches
(wildcards
*
and?
are allowed) are retained.
-
stats
(only=None)¶ Return a dictionary mapping each state name into the count of tasks in that state. In addition, the following keys are defined:
- ok: count of TERMINATED tasks with return code 0
- failed: count of TERMINATED tasks with nonzero return code
- total: total count of managed tasks, whatever their state
If the optional argument only is not None, tasks whose whose class is not contained in only are ignored. : param tuple only: Restrict counting to tasks of these classes.
-
submit
(task, resubmit=False, targets=None, **extra_args)¶ Submit task at the next invocation of progress.
The task state is reset to
NEW
and then added to the collection of managed tasks.The targets argument is only present for interface compatiblity with
Core.submit()
but is otherwise ignored.
-
update_job_state
(*tasks, **extra_args)¶ Return list of current states of the given tasks. States will only be updated at the next invocation of progress; in particular, no state-change handlers are called as a result of calling this method.
-
class
gc3libs.core.
MatchMaker
¶ Select and sort resources for attempting submission of a Task.
A match-making algorithm must implement two methods:
- filter: given a task and a list of resources, return the list of resources that the given task could be submitted to.
- rank: given a task and a list of resources, return a list of resources sorted in preference order, i.e., submission of the given task will be attempted to the first returned resource, then the next one, etc.
This class implements the default match-making algorithm in GC3Pie, which operates as follows:
- filter phase: if task has a compatible_resources method (as
instances of
Application
do), retain only those resources where it evaluates toTrue
. Otherwise, return the resources list unchanged. - rank phase: sort resources according to the task’s rank_resources method, or retain the given order if task does not define such method.
-
filter
(task, resources)¶ Return the subset of resources to which task could be submitted to.
Note that the result subset could be empty (no resource can accomodate task’s requirements).
The default implementation uses the task’s compatible_resources method to retain only the resources that satisfy the task’s requirements. If task does not provide such a method, the resource list is returned unchanged.
-
rank
(task, resources)¶ Sort the list of resources in the preferred order for submitting task.
Unless overridden in a derived class, this calls the task’s rank_resources method to sort the list. If the task does not provide such a method, the resources list is returned unchanged.
-
class
gc3libs.core.
Scheduler
(tasks, resources)¶ Instances of the Scheduler class are used in
Engine.progress()
to determine what tasks (among those in Run.State.NEW state) are to be submitted.A Scheduler object must implement both the context protocol and the iterator protocol.
The way a Scheduler instance is actually used within Engine is as follows:
- A Scheduler instance is created, passing it two arguments: a
list of tasks in
NEW
state, and a dictionary of configured resources (keys are resource names, values are actual resource objects). - When a new submission cycle starts, the
__enter__()
method is called. - The Engine iterates by repeatedly calling the
next()
method to receive tasks to be submitted. Thesend()
andthrow()
methods are used to notify the scheduler of the outcome of the submission attempt. - When the submission cycle ends, the
__exit__()
method is called.
The Scheduler.schedule generator is the heart of the submission process and has basically complete control over it. It is initialized with the list of tasks in
NEW
state, and the list of configured resources. Thenext()
method should yield pairs (task index, resource name), where the task index is the position of the task to be submitted next in the given list, and –similarly– the resource name is the name of the resource to which the task should be submitted.For each pair yielded, submission of that task to the selected resource is attempted; the state of the task object after submission is sent back (via the
send()
method) to the Scheduler instance; if an exception is raised, that exception is thrown (via thethrow()
method) into the scheduler object instead. Submission stops when the next() call raises a StopIteration exception.- A Scheduler instance is created, passing it two arguments: a
list of tasks in