Previous topic

cogen.core.queue

Next topic

cogen.core.sockets

This Page

Quick search

Enter search terms or a module, class or function name.

cogen.core.schedulers

Scheduling framework.

The scheduler handles the timeouts, run the operations and does very basic management of coroutines. Most of the heavy logic is in each operation class. See: cogen.core.events and cogen.core.sockets. Most of those operations work with attributes we set in the scheduler.

cogen is multi-state. All the state related to coroutines and network is in the scheduler and it’s associated proactor. That means you could run several cogen schedulers in the same process/thread/whatever.

There is just one thing that uses global objects - the threadlocal-like local object in the coroutines module. It was actually aded for the wsgiserver factory that monkey patches the threadlocal module in order to make pylons run correctly (pylons relies heavily on threadlocals).

class cogen.core.schedulers.Scheduler(proactor=<class 'cogen.core.proactors.ctypes_iocp_impl.CTYPES_IOCPProactor'>, default_priority=0, default_timeout=None, proactor_resolution=0.01, proactor_greedy=True, ops_greedy=False, proactor_multiplex_first=None, proactor_default_size=None)

Bases: object

Basic deque-based scheduler with timeout support and primitive prioritisaiton parameters.

Usage:

mysched = Scheduler(proactor=DefaultProactor, 
        default_priority=priority.LAST, default_timeout=None)
  • proactor: a proactor class to use
  • default_priority: a default priority option for operations that do not set it. check cogen.core.util.priority.
  • default_timeout: a default timedelta or number of seconds to wait for the operation, -1 means no timeout.
add(coro, args=(), kwargs={}, first=True)
Add a coroutine in the scheduler. You can add arguments (_args_, _kwargs_) to init the coroutine with.
handle_timeouts()

Handle timeouts. Raise timeouted operations with a OperationTimeout in the associated coroutine (if they are still alive and the operation hasn’t actualy sucessfuly completed) or, if the operation has a weak_timeout flag, update the timeout point and add it back in the heapq.

weak_timeout notes:

  • weak_timeout means a last_update attribute is updated with a timestamp of the last activity in the operation - for example, a may recieve new data and not complete (not enough data, etc)
  • if there was activity since the last time we’ve cheched this timeout we push it back in the heapq with a timeout value we’ll check it again

Also, we call a cleanup on the op, only if cleanup return true we raise the timeout (finalized isn’t enough to check if the op has completed since finalized is set when the operation gets back in the coro - and it might still be in the Scheduler.active queue when we get to this timeout - well, this is certainly a problem magnet: TODO: fix_finalized)

iter_run()

The actual processing for the main loop is here.

Running the main loop as a generator (where a iteration is a full sched, proactor and timers/timeouts run) is usefull for interleaving the main loop with other applications that have a blocking main loop and require cogen to run in the same thread.

next_timer_delta()
Returns a timevalue that the proactor will wait on.
process_op(op, coro)
Process a (op, coro) pair and return another pair. Handles exceptions.
run()
This is the main loop. This loop will exit when there are no more coroutines to run or stop has been called.
stop()