gc3libs.core¶
Top-level classes for task execution and control.
-
class
gc3libs.core.
BgEngine
(lib, *args, **kwargs)¶ Run a GC3Pie
Engine
instance in the background.A BgEngine exposes the same interface as a regular Engine class, but proxies all operations for asynchronous execution by the wrapped Engine instance. In practice, this means that all invocations of Engine operations on a BgEngine always succeed: errors will only be visible in the background thread of execution.
-
add
(task)¶ Proxy to
Engine.add()
(which see).
-
static
at_most_once_per_cycle
(fn)¶ Ensure the decorated function is not executed more than once per each poll interval.
Cached results are returned instead, if Engine.progress() has not been called in between two separate invocations of the wrapped function.
Warning
Keyword arguments are ignored when doing a lookup for previously-cached function results. This means that the following expressions might all return the same cached value:
f(), f(foo=1), f(bar=2, baz='a')
-
close
()¶ Proxy to
Engine.close()
(which see).
-
counts
(only=<class 'gc3libs.Task'>)¶ Proxy to
Engine.counts()
(which see).
-
fetch_output
(task, output_dir=None, overwrite=False, changed_only=True, **extra_args)¶ Proxy to
Engine.fetch_output()
(which see).
-
find_task_by_id
(task_id)¶ Proxy to
Engine.find_task_by_id()
(which see).
-
free
(task, **extra_args)¶ Proxy to
Engine.free()
(which see).
-
get_backend
(name)¶ Proxy to
Engine.get_backend()
(which see).
-
get_resources
()¶ Proxy to
Engine.get_resources()
(which see).
-
iter_tasks
()¶ Proxy to
Engine.iter_tasks()
(which see).
-
kill
(task, **extra_args)¶ Proxy to
Engine.kill()
(which see).
-
peek
(task, what='stdout', offset=0, size=None, **extra_args)¶ Proxy to
Engine.peek()
(which see).
-
progress
()¶ Proxy to Engine.progress.
If the background thread is already running, this is a no-op, as progressing tasks is already taken care of by the background thread. Otherwise, just forward the call to the wrapped engine.
-
remove
(task)¶ Proxy to
Engine.remove()
(which see).
-
select_resource
(match)¶ Proxy to
Engine.select_resource()
(which see).
-
start
(interval)¶ Start triggering the main loop at the given interval frequency.
Parameters: interval (gc3libs.quantity.Duration) – Time span between successive calls of _perform()
-
stats
(only=None)¶ Proxy to
Engine.stats()
(which see).
-
stop
(wait=False)¶ Stop background execution of the main loop.
Call
start()
to resume running.Parameters: wait (bool) – When True
, wait until all pending actions on the background thread have been completed.
-
submit
(task, resubmit=False, targets=None, **extra_args)¶ Proxy to
Engine.submit()
(which see).
-
trigger_after_progress
(func, *args, **kwargs)¶ Call a function after running Engine.progress() in the main loop. Exceptions raised during the call will be logged at WARNING level but otherwise ignored.
The function call will be triggered only once at the next run of the main loop; it will not be fired repeatedly at every re-run of the main loop.
Any suppplemental positional arguments or keyword-arguments that are supplied will be passed unchanged to the trigger function.
-
trigger_before_progress
(func, *args, **kwargs)¶ Call a function before running Engine.progress() in the main loop. Exceptions raised during the call will be logged at WARNING level but otherwise ignored.
The function call will be triggered only once at the next run of the main loop; it will not be fired repeatedly at every re-run of the main loop.
Any suppplemental positional arguments or keyword-arguments that are supplied will be passed unchanged to the trigger function.
-
update_job_state
(*tasks, **extra_args)¶ Proxy to
Engine.update_job_state()
(which see).
-
-
class
gc3libs.core.
Core
(cfg, matchmaker=<gc3libs.core.MatchMaker object>, resource_errors_are_fatal=None)¶ Core operations: submit, update state, retrieve (a snapshot of) output, cancel job.
Core operations are blocking, i.e., they return only after the operation has successfully completed, or an error has been detected.
Operations are always performed by a Core object. Core implements an overlay Grid on the resources specified in the configuration file.
Initialization of a
Core
instance also initializes all resources in the passedConfiguration
instance. By default, GC3Pie’s Core objects will ignore errors in initializing resources, and only raise an exception if no resources can be initialized. This can be changed by either passing an optional argumentresource_errors_are_fatal=True
, or by setting the environmental variableGC3PIE_RESOURCE_INIT_ERRORS_ARE_FATAL
toyes
or1
.-
add
(task)¶ This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.
-
close
()¶ Used to invoke explicitly the destructor on objects e.g. LRMS
-
fetch_output
(app, download_dir=None, overwrite=False, changed_only=True, **extra_args)¶ Retrieve output into local directory app.output_dir.
If the task is not expected to produce any output (i.e., app.would_output == False) then the only effect of this is to advance the state of
TERMINATING
tasks toTERMINATED
.Optional argument download_dir overrides the download location.
The download directory is created if it does not exist. If it already exists, and the optional argument overwrite is
False
(default), it is renamed with a .NUMBER suffix and a new empty one is created in its place. Otherwise, if ‘overwrite` isTrue
, files are downloaded over the ones already present; in this case, the changed_only argument controls which files are overwritten:- if changed_only is
True
(default), then only files for which the source has a different size or has been modified more recently than the destination are copied; - if changed_only is
False
, then all files in source will be copied into destination, unconditionally.
Source files that do not exist at destination will be copied, independently of the overwrite and changed_only settings.
If the task is in TERMINATING state, the state is changed to TERMINATED, attribute
output_dir
is set to the absolute path to the directory where files were downloaded, and the terminated transition method is called on the app object.Task output cannot be retrieved when app.execution is in one of the states NEW or SUBMITTED; an OutputNotAvailableError exception is thrown in these cases.
Raise: gc3libs.exceptions.OutputNotAvailableError if no output can be fetched from the remote job (e.g., the Application/Task object is in NEW or SUBMITTED state, indicating the remote job has not started running). - if changed_only is
-
free
(app, **extra_args)¶ Free up any remote resources used for the execution of app. In particular, this should delete any remote directories and files.
It is an error to call this method if app.execution.state is anything other than TERMINATED: an InvalidOperation exception will be raised in this case.
Raise: gc3libs.exceptions.InvalidOperation if app.execution.state differs from Run.State.TERMINATED.
-
get_resources
(**extra_args)¶ Return list of resources configured into this Core instance.
-
kill
(app, **extra_args)¶ Terminate a job.
Terminating a job in RUNNING, SUBMITTED, or STOPPED state entails canceling the job with the remote execution system; terminating a job in the NEW or TERMINATED state is a no-op.
-
peek
(app, what='stdout', offset=0, size=None, **extra_args)¶ Download size bytes (at offset bytes from the start) from the remote job standard output or error stream, and write them into a local file. Return file-like object from which the downloaded contents can be read.
If size is None (default), then snarf all available contents of the remote stream from offset unto the end.
The only allowed values for the what arguments are the strings ‘stdout’ and ‘stderr’, indicating that the relevant section of the job’s standard output resp. standard error should be downloaded.
-
remove
(task)¶ This method is here just to allow Core and Engine objects to be used interchangeably. It’s effectively a no-op, as it makes no sense in the synchronous/blocking semantics implemented by Core.
-
select_resource
(match)¶ Disable resources that do not satisfy predicate match. Return number of enabled resources.
Argument match can be:
- either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
- or it can be a string: only resources whose name matches
(wildcards
*
and?
are allowed) are retained.
Note
Calling this method modifies the configured list of resources in-place.
-
submit
(app, resubmit=False, targets=None, **extra_args)¶ Submit a job running an instance of the given task app.
Upon successful submission, call the submitted method on the app object. If targets are given, submission of the task is attempted to the resources in the order given; the submit method returns after the first successful attempt. If targets is
None
(default), a brokering procedure is run to determine the best resource among the configured ones.At the beginning of the submission process, the app.execution state is reset to
NEW
; if submission is successful, the task will be inSUBMITTED
orRUNNING
state when this call returns.Raise: gc3libs.exceptions.InputFileError if an input file does not exist or cannot otherwise be read.
Parameters: - app (Task) – A GC3Pie
Task
instance to be submitted. - resubmit – If
True
, submit task regardless of its execution state; ifFalse
(default), submission is a no-op if task is not inNEW
state. - targets – A list of Resource`s to submit the task to; resources are tried in the order given. If ``None` (default), perform brokering among all the configured resources.
- app (Task) – A GC3Pie
-
update_job_state
(*apps, **extra_args)¶ Update state of all applications passed in as arguments.
If keyword argument update_on_error is False (default), then application execution state is not changed in case a backend error happens; it is changed to UNKNOWN otherwise.
Note that if state of a job changes, the Run.state calls the appropriate handler method on the application/task object.
Raise: gc3libs.exceptions.InvalidArgument in case one of the passed Application or Task objects is invalid. This can stop updating the state of other objects in the argument list. Raise: gc3libs.exceptions.ConfigurationError if the configuration of this Core object is invalid or otherwise inconsistent (e.g., a resource references a non-existing auth section).
-
update_resources
(resources=<built-in function all>, **extra_args)¶ Update the state of a given set of resources.
Each resource object in the returned list will have its updated attribute set to True if the update operation succeeded, or False if it failed.
Optional argument resources should be a subset of the resources configured in this Core instance (the actual
Lrms
objects, not the resource names). By default, all configured resources are updated.
-
-
class
gc3libs.core.
Engine
(controller, tasks=[], store=None, can_submit=True, can_retrieve=True, max_in_flight=0, max_submitted=0, output_dir=None, scheduler=<gc3libs.core.scheduler object>, retrieve_running=False, retrieve_overwrites=False, retrieve_changed_only=True, forget_terminated=False)¶ Manage a collection of tasks, until a terminal state is reached. Specifically:
- tasks in NEW state are submitted;
- the state of tasks in SUBMITTED, RUNNING or STOPPED state is updated;
- when a task reaches TERMINATED state, its output is downloaded.
The behavior of Engine instances can be further customized by setting the following instance attributes:
- can_submit
- Boolean value: if False, no task will be submitted.
- can_retrieve
- Boolean value: if False, no output will ever be retrieved.
- max_in_flight
- If >0, limit the number of tasks in SUBMITTED or RUNNING state: if the number of tasks in SUBMITTED, RUNNING or STOPPED state is greater than max_in_flight, then no new submissions will be attempted.
- max_submitted
- If >0, limit the number of tasks in SUBMITTED state: if the number of tasks in SUBMITTED state is greater than max_submitted, then no new submissions will be attempted.
- output_dir
- Base directory for job output; if not None, each task’s results will be downloaded in a subdirectory named after the task’s permanent_id.
- scheduler
- A factory function for creating objects that conform to the
Scheduler interface to control task submission; see the
Scheduler
documentation for details. The default value implements a first-come first-serve algorithm: tasks are submitted in the order they have been added to the Engine. - retrieve_running
- If
True
, snapshot output from RUNNING jobs at every invocation ofprogress()
- retrieve_overwrites
- If
True
, overwrite files in the output directory of any job (as opposed to moving destination away and downloading a fresh copy). SeeCore.fetch_output()
for details. - retrieve_changed_only
- If both this and overwrite are
True
, then only changed files are downloaded. SeeCore.fetch_output()
for details. - forget_terminated
When
True
,Engine.remove()
is automatically called on tasks when their state turns toTERMINATED
.Warning
For historical reasons, the default for this option is
False
but this can (and should!) be changed in future releases.
Any of the above can also be set by passing a keyword argument to the constructor (assume
g
is aCore
instance):| >>> e = Engine(g, can_submit=False) | >>> e.can_submit | False
-
add
(task)¶ Add task to the list of tasks managed by this Engine. Adding a task that has already been added to this Engine instance results in a no-op.
-
close
()¶ Call explicilty finalize methods on relevant objects e.g. LRMS
-
counts
(only=<class 'gc3libs.Task'>)¶ Return a dictionary mapping each state name into the count of tasks in that state. In addition, the following keys are defined:
- ok: count of TERMINATED tasks with return code 0
- failed: count of TERMINATED tasks with nonzero return code
- total: total count of managed tasks, whatever their state
If the optional argument only is not None, tasks whose whose class is not contained in only are ignored.
: param class only: Restrict counting to tasks of these classes.
-
fetch_output
(task, output_dir=None, overwrite=False, changed_only=True, **extra_args)¶ Enqueue task for later output retrieval.
Warning
FIXME
The output_dir, overwrite, and changed_only parameters are currently ignored.
-
find_task_by_id
(task_id)¶ Return the task with the given persistent ID added to this Engine instance. If no task has that ID, raise a KeyError.
-
free
(task, **extra_args)¶ Proxy for Core.free, which see.
-
get_resources
()¶ Return list of resources configured into this Core instance.
-
init_counts_for
(cls)¶ Initialize counters for tasks of class cls.
All statistics are initially computed starting from the current collection of tasks managed by this Engine instance; they will be kept up-to-date during task addition/removal/progress.
Warning
In a future release, the Engine might forget about task objects in
TERMINATED
state. Therefore, init_counts_for should be called before any tasks reachesTERMINATED
state, or the counts forTERMINATED
,ok
, andfailed
jobs will be incorrectly initialized to 0.
-
iter_tasks
(only_cls=None)¶ Iterate over tasks managed by the Engine.
If argument only_cls is
None
(default), then iterate over all tasks managed by this Engine. Otherwise, only return tasks which are instances of a (sub)class only_cls.
-
kill
(task, **extra_args)¶ Schedule a task for killing on the next progress run.
-
peek
(task, what='stdout', offset=0, size=None, **extra_args)¶ Proxy for Core.peek (which see).
-
progress
()¶ Update state of all registered tasks and take appropriate action. Specifically:
- tasks in NEW state are submitted;
- the state of tasks in SUBMITTED, RUNNING, STOPPED or UNKNOWN state is updated;
- when a task reaches TERMINATING state, its output is downloaded.
- tasks in TERMINATED status are simply ignored.
The max_in_flight and max_submitted limits (if >0) are taken into account when attempting submission of tasks.
-
redo
(task, *args, **kwargs)¶ Reset task’s state to NEW so that it will be re-run.
Any additional arguments will be forwarded to the task’s own .redo() method; this is useful, e.g., to perform partial re-runs of SequentialTaskCollection instances.
-
remove
(task)¶ Remove a task from the list of tasks managed by this Engine.
Removing a task that is not managed (i.e., already removed or never added) is a no-op.
-
resources
¶ Get dict of configured resources.
This mapping object has configured resource names as keys, and the actual gc3libs.backends.LRMS instances as values. Note that only resources whose
.enabled
attribute evaluates toTrue
will be considered for scheduling.This is just a reference to the
.resources
attribute of the underlying core object; see Core.resources for more information.
-
select_resource
(match)¶ Disable resources that do not satisfy predicate match. Return number of enabled resources.
Argument match can be:
- either a function (or a generic callable) that is passed each Resource object in turn, and should return a boolean indicating whether the resources should be kept (True) or not (False);
- or it can be a string: only resources whose name matches
(wildcards
*
and?
are allowed) are retained.
Note
Calling this method modifies the configured list of resources in-place.
-
submit
(task, resubmit=False, targets=None, **extra_args)¶ Submit task at the next invocation of progress.
The task state is reset using the task’s own method .redo(), and then the task added to the collection of managed tasks. Note that the use of redo() implies that only tasks in a terminal state can be resubmitted!
The targets argument is only present for interface compatiblity with
Core.submit()
but is otherwise ignored.
-
update_job_state
(*tasks, **extra_args)¶ Return list of current states of the given tasks. States will only be updated at the next invocation of progress; in particular, no state-change handlers are called as a result of calling this method.
-
class
gc3libs.core.
MatchMaker
¶ Select and sort resources for attempting submission of a Task.
A match-making algorithm must implement two methods:
- filter: given a task and a list of resources, return the list of resources that the given task could be submitted to.
- rank: given a task and a list of resources, return a list of resources sorted in preference order, i.e., submission of the given task will be attempted to the first returned resource, then the next one, etc.
This class implements the default match-making algorithm in GC3Pie, which operates as follows:
- filter phase: if task has a compatible_resources method (as
instances of
Application
do), retain only those resources where it evaluates toTrue
. Otherwise, return the resources list unchanged. - rank phase: sort resources according to the task’s rank_resources method, or retain the given order if task does not define such method.
-
filter
(task, resources)¶ Return the subset of resources to which task could be submitted to.
Note that the result subset could be empty (no resource can accomodate task’s requirements).
The default implementation uses the task’s compatible_resources method to retain only the resources that satisfy the task’s requirements. If task does not provide such a method, the resource list is returned unchanged.
-
rank
(task, resources)¶ Sort the list of resources in the preferred order for submitting task.
Unless overridden in a derived class, this calls the task’s rank_resources method to sort the list. If the task does not provide such a method, the resources list is returned unchanged.
-
class
gc3libs.core.
Scheduler
(tasks, resources)¶ Instances of the Scheduler class are used in
Engine.progress()
to determine what tasks (among those in Run.State.NEW state) are to be submitted.A Scheduler object must implement both the context protocol and the iterator protocol.
The way a Scheduler instance is actually used within Engine is as follows:
- A Scheduler instance is created, passing it two arguments: a
list of tasks in
NEW
state, and a dictionary of configured resources (keys are resource names, values are actual resource objects). - When a new submission cycle starts, the
__enter__()
method is called. - The Engine iterates by repeatedly calling the
next()
method to receive tasks to be submitted. Thesend()
andthrow()
methods are used to notify the scheduler of the outcome of the submission attempt. - When the submission cycle ends, the
__exit__()
method is called.
The Scheduler.schedule generator is the heart of the submission process and has basically complete control over it. It is initialized with the list of tasks in
NEW
state, and the list of configured resources. Thenext()
method should yield pairs (task index, resource name), where the task index is the position of the task to be submitted next in the given list, and –similarly– the resource name is the name of the resource to which the task should be submitted.For each pair yielded, submission of that task to the selected resource is attempted; the state of the task object after submission is sent back (via the
send()
method) to the Scheduler instance; if an exception is raised, that exception is thrown (via thethrow()
method) into the scheduler object instead. Submission stops when the next() call raises a StopIteration exception.- A Scheduler instance is created, passing it two arguments: a
list of tasks in