Package ClusterShell :: Module Task :: Class Task
[hide private]
[frames] | no frames]

Class Task

source code



Task to execute. May be bound to a specific thread.

To create a task in a new thread:
    task = Task()

To create or get the instance of the task associated with the
thread object thr (threading.Thread):
    task = Task(thread=thr)

Add command to execute locally in task with:
    task.shell("/bin/hostname")

Add command to execute in a distant node in task with:
    task.shell("/bin/hostname", nodes="tiger[1-20]")

Run task in its associated thread (will block only if the calling
thread is the associated thread:
    task.resume()

Nested Classes [hide private]
  _SyncMsgHandler
Special task control port event handler.
  tasksyncmethod
Class encapsulating a function that checks if the calling task is running or is the current task, and allowing it to be used as a decorator making the wrapped task method thread-safe.
  _SuspendCondition
Special class to manage task suspend condition.
Instance Methods [hide private]
 
__init__(self, thread=None)
Initialize a Task, creating a new thread if needed.
source code
 
_is_task_self(self)
Private method used by the library to check if the task is task_self(), but do not create any task_self() instance.
source code
 
_handle_exception(self) source code
 
_thread_start(self)
Task-managed thread entry point
source code
 
_run(self, timeout)
Run task (always called from its self thread).
source code
 
default(self, default_key, def_val=None)
Return per-task value for key from the "default" dictionary.
source code
 
set_default(self, default_key, value)
Set task value for specified key in the dictionary "default".
source code
 
info(self, info_key, def_val=None)
Return per-task information.
source code
 
set_info(*args, **kwargs)
Set task value for a specific key information.
source code
 
shell(self, command, **kwargs)
Schedule a shell command for local or distant execution.
source code
 
copy(self, source, dest, nodes, **kwargs)
Copy local file to distant nodes.
source code
 
_add_port(*args, **kwargs)
Add an EnginePort instance to Engine (private method).
source code
 
_remove_port(*args, **kwargs)
Remove a port from Engine (private method).
source code
 
port(self, handler=None, autoclose=False)
Create a new task port.
source code
 
timer(*args, **kwargs)
Create task's timer.
source code
 
schedule(*args, **kwargs)
Schedule a worker for execution.
source code
 
_resume_thread(self)
Resume called from another thread.
source code
 
_resume(self) source code
 
resume(self, timeout=0)
Resume task.
source code
 
_suspend_wait(*args, **kwargs) source code
 
suspend(self)
Suspend task execution.
source code
 
_abort(*args, **kwargs) source code
 
abort(self, kill=False)
Abort a task.
source code
 
_terminate(self, kill)
Abort completion subroutine.
source code
 
join(self)
Suspend execution of the calling thread until the target task terminates, unless the target task has already terminated.
source code
 
running(self)
Return True if the task is running.
source code
 
_reset(self)
Reset buffers and retcodes management variables.
source code
 
_msg_add(self, source, msg)
Add a worker message associated with a source.
source code
 
_errmsg_add(self, source, msg)
Add a worker error message associated with a source.
source code
 
_rc_set(self, source, rc, override=True)
Add a worker return code associated with a source.
source code
 
_timeout_add(self, source)
Add a worker timeout associated with a source.
source code
 
_msg_by_source(self, source)
Get a message by its source (worker, key).
source code
 
_errmsg_by_source(self, source)
Get an error message by its source (worker, key).
source code
 
_call_tree_matcher(self, tree_match_func, match_keys=None, worker=None)
Call identified tree matcher (items, walk) method with options.
source code
 
_rc_by_source(self, source)
Get a return code by its source (worker, key).
source code
 
_rc_iter_by_key(self, key)
Return an iterator over return codes for the given key.
source code
 
_rc_iter_by_worker(self, worker, match_keys=None)
Return an iterator over return codes and keys list for a specific worker and optional matching keys.
source code
 
_krc_iter_by_worker(self, worker)
Return an iterator over key, rc for a specific worker.
source code
 
_num_timeout_by_worker(self, worker)
Return the number of timed out "keys" for a specific worker.
source code
 
_iter_keys_timeout_by_worker(self, worker)
Iterate over timed out keys (ie.
source code
 
key_buffer(self, key)
Get buffer for a specific key.
source code
 
node_buffer(self, key)
Get buffer for a specific key.
source code
 
key_error(self, key)
Get error buffer for a specific key.
source code
 
node_error(self, key)
Get error buffer for a specific key.
source code
 
key_retcode(self, key)
Return return code for a specific key.
source code
 
node_retcode(self, key)
Return return code for a specific key.
source code
 
max_retcode(self)
Get max return code encountered during last run.
source code
 
iter_buffers(self, match_keys=None)
Iterate over buffers, returns a tuple (buffer, keys).
source code
 
iter_errors(self, match_keys=None)
Iterate over error buffers, returns a tuple (buffer, keys).
source code
 
iter_retcodes(self, match_keys=None)
Iterate over return codes, returns a tuple (rc, keys).
source code
 
num_timeout(self)
Return the number of timed out "keys" (ie.
source code
 
iter_keys_timeout(self)
Iterate over timed out keys (ie.
source code

Inherited from object: __delattr__, __format__, __getattribute__, __hash__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__

Class Methods [hide private]
 
wait(cls, from_thread)
Class method that blocks calling thread until all tasks have finished (from a ClusterShell point of view, for instance, their task.resume() return).
source code
Static Methods [hide private]
a new object with type S, a subtype of T
__new__(cls, thread=None)
For task bound to a specific thread, this class acts like a "thread singleton", so new style class is used and new object are only instantiated if needed.
source code
Class Variables [hide private]
  _std_default = {'engine': 'auto', 'port_qlimit': 32, 'stderr':...
  _std_info = {'command_timeout': 0, 'connect_timeout': 10, 'deb...
  _tasks = {}
  _taskid_max = 0
  _task_lock = threading.Lock()
Properties [hide private]

Inherited from object: __class__

Method Details [hide private]

__new__(cls, thread=None)
Static Method

source code 

For task bound to a specific thread, this class acts like a "thread singleton", so new style class is used and new object are only instantiated if needed.

Returns: a new object with type S, a subtype of T
Overrides: object.__new__

__init__(self, thread=None)
(Constructor)

source code 

Initialize a Task, creating a new thread if needed.

Overrides: object.__init__

set_default(self, default_key, value)

source code 

Set task value for specified key in the dictionary "default".
Users may store their own task-specific key, value pairs
using this method and retrieve them with default().

Threading considerations:
  Unlike set_info(), when called from the task's thread or
  not, set_default() immediately updates the underlying
  dictionary in a thread-safe manner. This method doesn't
  wake up the engine when called.

set_info(*args, **kwargs)

source code 

Set task value for a specific key information. Key, value
pairs can be passed to the engine and/or workers.
Users may store their own task-specific info key, value pairs
using this method and retrieve them with info().

Threading considerations:
  Unlike set_default(), the underlying info dictionary is only
  modified from the task's thread. So calling set_info() from
  another thread leads to queueing the request for late apply
  (at run time) using the task dispatch port. When received,
  the request wakes up the engine when the task is running and
  the info dictionary is then updated.

Decorators:
  • @tasksyncmethod()

shell(self, command, **kwargs)

source code 

Schedule a shell command for local or distant execution.

Local usage:
    task.shell(command [, key=key] [, handler=handler]
          [, timeout=secs] [, autoclose=enable_autoclose]
          [, stderr=enable_stderr])

Distant usage:
    task.shell(command, nodes=nodeset [, handler=handler]
          [, timeout=secs], [, autoclose=enable_autoclose]
          [, strderr=enable_stderr])

_add_port(*args, **kwargs)

source code 

Add an EnginePort instance to Engine (private method).

Decorators:
  • @tasksyncmethod()

_remove_port(*args, **kwargs)

source code 

Remove a port from Engine (private method).

Decorators:
  • @tasksyncmethod()

port(self, handler=None, autoclose=False)

source code 

Create a new task port. A task port is an abstraction object to
deliver messages reliably between tasks.

Basic rules:
    A task can send messages to another task port (thread safe).
    A task can receive messages from an acquired port either by
    setting up a notification mechanism or using a polling
    mechanism that may block the task waiting for a message
    sent on the port.
    A port can be acquired by one task only.

If handler is set to a valid EventHandler object, the port is
a send-once port, ie. a message sent to this port generates an
ev_msg event notification issued the port's task. If handler
is not set, the task can only receive messages on the port by
calling port.msg_recv().

timer(*args, **kwargs)

source code 

Create task's timer.

Decorators:
  • @tasksyncmethod()

schedule(*args, **kwargs)

source code 

Schedule a worker for execution. Only useful for manually instantiated workers.

Decorators:
  • @tasksyncmethod()

resume(self, timeout=0)

source code 

Resume task. If task is task_self(), workers are executed in the calling thread so this method will block until workers have finished. This is always the case for a single-threaded application (eg. which doesn't create other Task() instance than task_self()). Otherwise, the current thread doesn't block. In that case, you may then want to call task_wait() to wait for completion.

_suspend_wait(*args, **kwargs)

source code 
Decorators:
  • @tasksyncmethod()

suspend(self)

source code 

Suspend task execution. This method may be called from another task (thread-safe). The function returns False if the task cannot be suspended (eg. it's not running), or returns True if the task has been successfully suspended. To resume a suspended task, use task.resume().

_abort(*args, **kwargs)

source code 
Decorators:
  • @tasksyncmethod()

abort(self, kill=False)

source code 

Abort a task. Aborting a task removes (and stops when needed) all workers. If optional parameter kill is True, the task object is unbound from the current thread, so calling task_self() creates a new Task object.

_iter_keys_timeout_by_worker(self, worker)

source code 

Iterate over timed out keys (ie. nodes) for a specific worker.

key_buffer(self, key)

source code 

Get buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap.

node_buffer(self, key)

source code 

Get buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap.

key_error(self, key)

source code 

Get error buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap.

node_error(self, key)

source code 

Get error buffer for a specific key. When the key is associated to multiple workers, the resulting buffer will contain all workers content that may overlap.

key_retcode(self, key)

source code 

Return return code for a specific key. When the key is associated to multiple workers, return the max return code from these workers.

node_retcode(self, key)

source code 

Return return code for a specific key. When the key is associated to multiple workers, return the max return code from these workers.

max_retcode(self)

source code 

Get max return code encountered during last run.

How retcodes work:
  If the process exits normally, the return code is its exit
  status. If the process is terminated by a signal, the return
  code is 128 + signal number.

iter_buffers(self, match_keys=None)

source code 

Iterate over buffers, returns a tuple (buffer, keys). For remote
workers (Ssh), keys are list of nodes. In that case, you should use
NodeSet.fromlist(keys) to get a NodeSet instance (which is more
convenient and efficient):

Optional parameter match_keys add filtering on these keys.

Usage example:

    for buffer, nodelist in task.iter_buffers():
        print NodeSet.fromlist(nodelist)
        print buffer

iter_errors(self, match_keys=None)

source code 

Iterate over error buffers, returns a tuple (buffer, keys).

See iter_buffers().

iter_retcodes(self, match_keys=None)

source code 

Iterate over return codes, returns a tuple (rc, keys).

Optional parameter match_keys add filtering on these keys.

How retcodes work:
  If the process exits normally, the return code is its exit
  status. If the process is terminated by a signal, the return
  code is 128 + signal number.

num_timeout(self)

source code 

Return the number of timed out "keys" (ie. nodes).

iter_keys_timeout(self)

source code 

Iterate over timed out keys (ie. nodes).

wait(cls, from_thread)
Class Method

source code 

Class method that blocks calling thread until all tasks have finished (from a ClusterShell point of view, for instance, their task.resume() return). It doesn't necessarly mean that associated threads have finished.


Class Variable Details [hide private]

_std_default

Value:
{'engine': 'auto',
 'port_qlimit': 32,
 'stderr': False,
 'stderr_msgtree': True,
 'stdout_msgtree': True}

_std_info

Value:
{'command_timeout': 0,
 'connect_timeout': 10,
 'debug': False,
 'fanout': 64,
 'print_debug': <function _task_print_debug at 0x13b2c80>}