Top

threadly module

threadly a simple threadpool and scheduler for python.

"""
threadly a simple threadpool and scheduler for python.
"""

import threading
import Queue
from Queue import Empty as EmptyException
import logging
import time



class Scheduler(object):
  """
  Main Scheduler Object.
  """

  def __init__(self, poolsize):
    """
    Construct an Scheduler instance with the set thread pool size.
    
    `poolsize` positive integer for the number of threads you want in this pool .
    """

    self.__log = logging.getLogger("root.threadly")
    self.__clock = Clock()
    self.__key_lock = threading.Condition()
    self.__poolsize = poolsize
    self.__running = True
    self.__in_shutdown = False
    self.__main_queue = Queue.Queue()
    self.__delayed_tasks = SortedLockingList()
    self.__in_delay = False
    self.__threads = list()
    self.__delay_lock = threading.Condition()
    self.__keys = dict()
    for i in xrange(self.__poolsize):
      tmp_thread = threading.Thread(target=self.__thread_pool)
      tmp_thread.name = "Executor-Pool-Thread-%d"%(i)
      tmp_thread.daemon = True
      tmp_thread.start()
      self.__threads.append(tmp_thread)

  def get_poolsize(self):
    """
    Returns the number of threads used in this Pool.
    """
    return len(self.__threads)

  def get_queue_size(self):
    """
    Returns the number of items currently awaiting Execution.
    """
    return self.__main_queue.qsize()

  def execute(self, task, args=(), kwargs={}):
    """
    Execute a given task as soon as possible.
    
    `task` is a callable to be called on the Scheduler.

    `args` are the arguments to pass to the callable when called.

    `kwargs` are the keyword args to be passed to the callable when called.
    """
    self.schedule(task, args=args, kwargs=kwargs)

  def schedule_with_future(self, task, delay=0, key=None, args=(), kwargs={}):
    """
    Returns a `ListenableFuture` for this task.  Once the task is completed the future will also be completed. 
    This works pretty much exactly like `schedule` except you can not make a task recurring.
    
    `task` is a callable to be called on the Scheduler.

    `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

    `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.

    `args` are the arguments to pass to the callable when called.

    `kwargs` are the keyword args to be passed to the callable when called.
    """
    job=(task, args, kwargs)
    future = ListenableFuture()
    self.schedule(futureJob, delay=delay, key=key, args=(future, job))
    return future

  def schedule(self, task, delay=0, recurring=False, key=None, args=(), kwargs={}):
    """
    This schedules a task to be executed.  It can be delayed, and set to a key.  It can also be marked
    as recurring. 
    
    `task` is a callable to be called on the Scheduler.

    `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

    `recurring` set this to True if this should be a recurring.  You should be careful that delay is > 0 when setting this to True

    `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.

    `args` are the arguments to pass to the callable when called.

    `kwargs` are the keyword args to be passed to the callable when called.
    """
    if delay > 0:
      s_task = int(self.__clock.accurate_time() * 1000) + delay
      send = False
      if delay/1000.0 <= self.__get_next_wait_time():
        send = True
      self.__delayed_tasks.add((s_task, task, delay, recurring, key, args, kwargs))
      if send:
        self.__main_queue.put((self.__empty, (), {}))
    else:
      if key != None:
        self.__key_lock.acquire()
        if key not in self.__keys:
          tmp = KeyRunner()
          self.__keys[key] = tmp
        self.__key_lock.release()
        run_key = self.__keys[key]
        run_key.add((task, args, kwargs))
        run_key.lock.acquire()
        if not run_key.in_queue and run_key.size() > 0:

          run_key.in_queue = True
          self.__main_queue.put((run_key.run_all, (), {}))
        run_key.lock.release()
      else:
        self.__main_queue.put((task, args, kwargs))

  def remove(self, task):
    """
    Remove a scheduled task from the queue.  This is a best effort remove, the task could still possibly run.  This is most useful to cancel recurring tasks.
    If there is more then one task with this callable scheduled only the first one is removed.

    `task` callable task to remove from the scheduled tasks list.
    """
    count = 0
    found = False
    for tasks in self.__delayed_tasks.safeIterator():
      if tasks[1] == task:
        found = True
        break
      else:
        count+=1
    if found:
      self.__delayed_tasks.pop(count)
      return True
    return False

  def shutdown(self):
    """
    Shuts down the threadpool.  Any task currently on the queue will be ran, but all Scheduled tasks 
    will removed and no more tasks can be added.
    """
    self.__running = False
    self.__delayed_tasks.clear()
    self.execute(self.__internal_shutdown)

  def shutdown_now(self):
    """
    Shuts down the threadpool.  Any task currently being executed will still complete, 
    but the queue will be emptied out.
    """
    self.__running = False
    self.__delayed_tasks.clear()
    while not self.__main_queue.empty():
      self.__main_queue.get_nowait()
    self.__internal_shutdown()

  def __internal_shutdown(self):
    self.__running = False
    for tmp_thread in self.__threads:
      while tmp_thread != None and tmp_thread.isAlive() and threading != None and tmp_thread != threading.current_thread():
        self.__main_queue.put((self.__empty, (), {}))

  def __empty(self):
    pass

  def __get_next_wait_time(self):
    tmp = self.__delayed_tasks.peek()
    if tmp == None or self.__delayed_tasks.size() == 0:
      return 2**32
    else:
      task = tmp[0] - int(self.__clock.accurate_time()*1000)
      return (task/1000.0)-.0005

  def __check_delay_queue(self):
    dl = self.__delayed_tasks.lock()
    if dl:
      try:
        to = self.__get_next_wait_time()
        while to <= 0:
          run_task = self.__delayed_tasks.pop(0)
          self.schedule(run_task[1], key=run_task[4], args=run_task[5], kwargs=run_task[6])
          #run_task[3] is recurring, if so we add again as a scheduled event
          if run_task[3] == True and not self.__in_shutdown:
            self.schedule(run_task[1], run_task[2], run_task[3], run_task[4], run_task[5], run_task[6])
          to = self.__get_next_wait_time()
      finally:
        self.__delayed_tasks.unlock()
    return dl


  def __thread_pool(self):
    while self.__running:
      try:
        runner = None
        to = self.__get_next_wait_time()
        if to <= 0 and self.__check_delay_queue():
          to = self.__get_next_wait_time()
        if to <= 0:
          to = 5
        if runner == None:
          runner = self.__main_queue.get(True, to)
        if runner != None:
          runner[0](*runner[1], **runner[2])
      except IndexError as exp:
        pass
      except EmptyException as exp:
        pass
      except Exception as exp:
        self.__log.error("Exception while Executing: %s, %s"%(runner, exp))


class SortedLockingList:
  """
  This is a sortedList implementation for multiThreads.  One main goal is to make adds as cheap as possible.
  """
  def __init__(self):
    self.slist = list()
    self.uslist = list()
    self.__lock = threading.Condition()

  def clear(self):
    """
    clears out the list
    """
    self.__lock.acquire()
    self.slist = list()
    self.uslist = list()
    self.__lock.release()


  def lock(self):
    """
    Returns `True` if you get the lock `False` if you dont.

    Non-Blocking lock request, returns True if you get the lock false if you dont.  This is the main
    lock for the list once acquired you must release before any other thread can access the list.
    """
    return self.__lock.acquire(False)

  def unlock(self):
    """
    Releases the lists lock.
    """
    self.__lock.release()

  def size(self):
    """
    Returns and `int` of the current size of the list.
    """
    return len(self.slist) + len(self.uslist)

  def peek(self):
    """
    Returns the first entry in the list, this does not remove the entry from the list.
    """
    self.__lock.acquire()
    self.__combine()
    if len(self.slist) == 0:
      tmp = None
    else:
      tmp = self.slist[0]
    self.__lock.release()
    return tmp

  def pop(self, i=0):
    """
    Returns either the first entry from the list or the spesified entry.
    """
    self.__lock.acquire()
    self.__combine()
    tmp = self.slist.pop(i)
    self.__lock.release()
    return tmp
  
  def add(self, item):
    """
    Adds an entry to the list.

    `item` entry to add to the list.
    """
    self.uslist.append(item)
  
  def __combine(self):
    try:
      self.__lock.acquire()
      while len(self.uslist) > 0:
        item = self.uslist.pop(0)
        c = len(self.slist)
        if c == 0:
          self.slist.append(item)
        elif item < self.slist[0]:
          self.slist.insert(0, item)
        elif c == 1 or item > self.slist[c-1]:
          self.slist.append(item)
        else:
          l = self.slist
          lmax = len(l)-1
          ch = c/2
          while True:
            if item < l[ch]:
              if ch == 0:
                print "ERROR:"
                return
              else:
                lmax = ch-1
                ch = ch/2
            elif item > l[ch]:
              if ch >= lmax:
                self.slist.insert(ch+1, item)
                break
              else:
                diff = lmax-ch
                ch = ch+((diff/2)+1)
            else:
              l.insert(ch, item)
              break
    finally:
      self.__lock.release()

  def remove(self, item):
    """
    Removes an item from the list.
  
    `item` item to remove from the list.
    """
    try:
      self.__lock.acquire()
      self.__combine()
      self.slist.remove(item)
    except:
      pass
    finally:
      self.__lock.release()

  def safeIterator(self):
    """
    This is a non-Blocking safe iterator for the list.  It is essentially just a copy of the sorted lists
    entries at the time it was called. 
    """
    try:
      self.__lock.acquire()
      self.__combine()
    finally:
      self.__lock.release()
    local = list(self.slist)
    for i in local:
      yield i


class Executor(Scheduler):
  """
  A class for backwards compatibility, Scheduler should be used instead now.
  """
  pass


class KeyRunner(object):
  """
  A class to wrap keys objects for the executer.
  Items can be added to the KeyRunner while its running.  
  This is used to keep all tasks for a given key in one thread.
  """
  def __init__(self):
    self.__run = list()
    self.lock = threading.Condition()
    self.in_queue = False

  def size(self):
    return len(self.__run)

  def add(self, task):
    """
    Add a task to this runner set.

    `task` adds callable task to the current keyrunner set.
    """
    self.lock.acquire()
    self.__run.append(task)
    self.lock.release()

  def run_next(self):
    """
    Run the next item for this key.
    """
    self.lock.acquire()
    runner = self.__run.pop(0)
    self.lock.release()
    runner[0](*runner[1], **runner[2])

  def run_all(self):
    """
    Run all items in this keyRunner.
    """
    while len(self.__run) > 0:
      self.run_next()
      if len(self.__run) == 0:
        self.lock.acquire()
        if len(self.__run) == 0:
          self.in_queue = False
          self.lock.release()
          break
        self.lock.release()

def futureJob(future, job):
  """
  This is a simple helper function used to wrap a task on the Scheduler in a future.  Once
  the job runs the future will complete. 
  
  `future` The future that will be completed once the job finishes.
  `job` The job to run before completing the future.
  """
  try:
    job[0](*job[1], **job[2])
    future.setter(True)
  except Exception as e:
    print "Error running futureJob:", e
    future.setter(False)


class ListenableFuture():
  """
  This class i used to make a Future that can have listeners and callbacks added to it.
  Once setter(object) is called all listeners/callbacks are also called.  Callbacks will 
  be given the set object, and .get() will return said object.
  """
  def __init__(self):
    self.lock = threading.Condition()
    self.settable = None
    self.listeners = list()
    self.callables = list()
  
  def addListener(self, listener, args=(), kwargs={}):
    """
    Add a listener function to this ListenableFuture.  Once set is called on this future
    all listeners will be ran.  Arguments for the listener can be given if needed.
    
    `listener` a callable that will be called when the future is completed
    `args` tuple arguments that will be passed to the listener when called.
    `kwargs` dict keyword arguments to be passed to the passed listener when called.
    """
    if self.settable == None:
      self.listeners.append((listener, args, kwargs))
    else:
      listener(*args, **kwargs)

  def addCallable(self, cable, args=(), kwargs={}):
    """
    Add a callable function to this ListenableFuture.  Once set is called on this future
    all callables will be ran.  This works the same as the listener except the set object is passed 
    as the first argument when the callable is called. Arguments for the listener can be given if needed.
    
    `cable` a callable that will be called when the future is completed, it must have at least 1 argument.
    `args` tuple arguments that will be passed to the listener when called.
    `kwargs` dict keyword arguments to be passed to the passed listener when called.
    """
    if self.settable is None:
      self.callables.append((cable, args, kwargs))
    else:
      cable(self.settable, *args, **kwargs)

  def get(self, timeout=2**32):
    """
    This is a blocking call that will return the set object once it is set.
    
    `timeout` The max amount of time to wait for get (in seconds).  If this is reached a null is returned
    `returns` the set object.  This can technically be anything so know what your listening for.
    """
    if self.settable is not None:
      return self.settable
    start = time.time()
    try:
      self.lock.acquire()
      while self.settable is None and time.time() - start < timeout:
        self.lock.wait(timeout - (time.time() - start))
      return self.settable
    finally:
      self.lock.release()

  def setter(self, obj):
    """
    This is used to complete this future. Whatever thread sets this will be used to call all 
    listeners and callables for this future.
    
    `obj` The object you want to set on this future (usually use just True if you dont care)
    """
    if self.settable is None:
      self.settable = obj
      self.lock.acquire()
      self.lock.notify_all()
      self.lock.release()
      while len(self.listeners) > 0:
        i = self.listeners.pop(0)
        try:
          i[0](*i[1], **i[2])
        except Exception as e:
          print "Exception calling listener", i[0], e
      while len(self.callables) > 0:
        i = self.callables.pop(0)
        try:
          i[0](self.settable, *i[1], **i[2])
        except Exception as e:
          print "Exception calling listener", i[0], e
    else:
      raise Exception("Already Set!")
      

class Singleton(object):
  """A Simple inheritable singleton"""

  __single = None

  def __new__(cls, *args, **kwargs):
    if cls != type(cls.__single):
      cls.__single = object.__new__(cls, *args, **kwargs)
    return cls.__single


class Clock(Singleton):
  """
  A Simple clock class to allow for retrieval of time from multiple threads to be more efficient.
  This class is a singleton so anyone using it will be using the same instance.
  The clock updates every 100ms so calls to get time often can be more per formant if they don't need to be exact.
  """
  def __init__(self):
    self.__current = int(time.time()*1000)
    self.__run = False
    self.__thread = None
    self.__get_time = time.time
    self.__sleep = time.sleep
    self.__start_clock_thread()

  def __del__(self):
    self.__stop_clock_thread()

  def __update_clock(self):
    while self.__run:
      self.accurate_time()
      self.__sleep(.1)
      
  def accurate_time_millis(self):
    """
    Get the actual current time. This should be called as little as often, and only 
    when exact times are needed.
    
    `returns` an Integer of the current time in millis.
    """
    self.__current = self.__get_time()
    return int(self.__current*1000)

  def accurate_time(self):
    """
    Get the actual current time. This should be called as little as 
    often, and only when exact times are needed.
    
    `returns` a float with whole numbers being seconds. Pretty much identical to time.time()
    """
    self.__current = self.__get_time()
    return self.__current

  def last_known_time_millis(self):
    """
    Gets the last ran time in milliseconds. This is accurate to 100ms.
    
    `returns` an integer representing the last known time in millis.
    """
    return int(self.__current*1000)

  def last_known_time(self):
    """
    Gets the last ran time seconds.milliseconds. This is accurate to 100ms.
    
    `returns` a float that represents the last known time with seconds as the whole numbers.
    """
    return self.__current

  def __start_clock_thread(self):
    if self.__thread == None or not self.__thread.is_alive():
      self.__run = True
      self.__thread = threading.Thread(target=self.__update_clock)
      self.__thread.name = "Clock Thread"
      self.__thread.daemon = True
      self.__thread.start()

  def __stop_clock_thread(self):
    self.__run = False

Functions

def futureJob(

future, job)

This is a simple helper function used to wrap a task on the Scheduler in a future. Once the job runs the future will complete.

future The future that will be completed once the job finishes. job The job to run before completing the future.

def futureJob(future, job):
  """
  This is a simple helper function used to wrap a task on the Scheduler in a future.  Once
  the job runs the future will complete. 
  
  `future` The future that will be completed once the job finishes.
  `job` The job to run before completing the future.
  """
  try:
    job[0](*job[1], **job[2])
    future.setter(True)
  except Exception as e:
    print "Error running futureJob:", e
    future.setter(False)

Classes

class Clock

A Simple clock class to allow for retrieval of time from multiple threads to be more efficient. This class is a singleton so anyone using it will be using the same instance. The clock updates every 100ms so calls to get time often can be more per formant if they don't need to be exact.

class Clock(Singleton):
  """
  A Simple clock class to allow for retrieval of time from multiple threads to be more efficient.
  This class is a singleton so anyone using it will be using the same instance.
  The clock updates every 100ms so calls to get time often can be more per formant if they don't need to be exact.
  """
  def __init__(self):
    self.__current = int(time.time()*1000)
    self.__run = False
    self.__thread = None
    self.__get_time = time.time
    self.__sleep = time.sleep
    self.__start_clock_thread()

  def __del__(self):
    self.__stop_clock_thread()

  def __update_clock(self):
    while self.__run:
      self.accurate_time()
      self.__sleep(.1)
      
  def accurate_time_millis(self):
    """
    Get the actual current time. This should be called as little as often, and only 
    when exact times are needed.
    
    `returns` an Integer of the current time in millis.
    """
    self.__current = self.__get_time()
    return int(self.__current*1000)

  def accurate_time(self):
    """
    Get the actual current time. This should be called as little as 
    often, and only when exact times are needed.
    
    `returns` a float with whole numbers being seconds. Pretty much identical to time.time()
    """
    self.__current = self.__get_time()
    return self.__current

  def last_known_time_millis(self):
    """
    Gets the last ran time in milliseconds. This is accurate to 100ms.
    
    `returns` an integer representing the last known time in millis.
    """
    return int(self.__current*1000)

  def last_known_time(self):
    """
    Gets the last ran time seconds.milliseconds. This is accurate to 100ms.
    
    `returns` a float that represents the last known time with seconds as the whole numbers.
    """
    return self.__current

  def __start_clock_thread(self):
    if self.__thread == None or not self.__thread.is_alive():
      self.__run = True
      self.__thread = threading.Thread(target=self.__update_clock)
      self.__thread.name = "Clock Thread"
      self.__thread.daemon = True
      self.__thread.start()

  def __stop_clock_thread(self):
    self.__run = False

Ancestors (in MRO)

Methods

def __init__(

self)

def __init__(self):
  self.__current = int(time.time()*1000)
  self.__run = False
  self.__thread = None
  self.__get_time = time.time
  self.__sleep = time.sleep
  self.__start_clock_thread()

def accurate_time(

self)

Get the actual current time. This should be called as little as often, and only when exact times are needed.

returns a float with whole numbers being seconds. Pretty much identical to time.time()

def accurate_time(self):
  """
  Get the actual current time. This should be called as little as 
  often, and only when exact times are needed.
  
  `returns` a float with whole numbers being seconds. Pretty much identical to time.time()
  """
  self.__current = self.__get_time()
  return self.__current

def accurate_time_millis(

self)

Get the actual current time. This should be called as little as often, and only when exact times are needed.

returns an Integer of the current time in millis.

def accurate_time_millis(self):
  """
  Get the actual current time. This should be called as little as often, and only 
  when exact times are needed.
  
  `returns` an Integer of the current time in millis.
  """
  self.__current = self.__get_time()
  return int(self.__current*1000)

def last_known_time(

self)

Gets the last ran time seconds.milliseconds. This is accurate to 100ms.

returns a float that represents the last known time with seconds as the whole numbers.

def last_known_time(self):
  """
  Gets the last ran time seconds.milliseconds. This is accurate to 100ms.
  
  `returns` a float that represents the last known time with seconds as the whole numbers.
  """
  return self.__current

def last_known_time_millis(

self)

Gets the last ran time in milliseconds. This is accurate to 100ms.

returns an integer representing the last known time in millis.

def last_known_time_millis(self):
  """
  Gets the last ran time in milliseconds. This is accurate to 100ms.
  
  `returns` an integer representing the last known time in millis.
  """
  return int(self.__current*1000)

class Executor

A class for backwards compatibility, Scheduler should be used instead now.

class Executor(Scheduler):
  """
  A class for backwards compatibility, Scheduler should be used instead now.
  """
  pass

Ancestors (in MRO)

Methods

def __init__(

self, poolsize)

Inheritance: Scheduler.__init__

Construct an Scheduler instance with the set thread pool size.

poolsize positive integer for the number of threads you want in this pool .

def __init__(self, poolsize):
  """
  Construct an Scheduler instance with the set thread pool size.
  
  `poolsize` positive integer for the number of threads you want in this pool .
  """
  self.__log = logging.getLogger("root.threadly")
  self.__clock = Clock()
  self.__key_lock = threading.Condition()
  self.__poolsize = poolsize
  self.__running = True
  self.__in_shutdown = False
  self.__main_queue = Queue.Queue()
  self.__delayed_tasks = SortedLockingList()
  self.__in_delay = False
  self.__threads = list()
  self.__delay_lock = threading.Condition()
  self.__keys = dict()
  for i in xrange(self.__poolsize):
    tmp_thread = threading.Thread(target=self.__thread_pool)
    tmp_thread.name = "Executor-Pool-Thread-%d"%(i)
    tmp_thread.daemon = True
    tmp_thread.start()
    self.__threads.append(tmp_thread)

def execute(

self, task, args=(), kwargs={})

Inheritance: Scheduler.execute

Execute a given task as soon as possible.

task is a callable to be called on the Scheduler.

args are the arguments to pass to the callable when called.

kwargs are the keyword args to be passed to the callable when called.

def execute(self, task, args=(), kwargs={}):
  """
  Execute a given task as soon as possible.
  
  `task` is a callable to be called on the Scheduler.
  `args` are the arguments to pass to the callable when called.
  `kwargs` are the keyword args to be passed to the callable when called.
  """
  self.schedule(task, args=args, kwargs=kwargs)

def get_poolsize(

self)

Inheritance: Scheduler.get_poolsize

Returns the number of threads used in this Pool.

def get_poolsize(self):
  """
  Returns the number of threads used in this Pool.
  """
  return len(self.__threads)

def get_queue_size(

self)

Inheritance: Scheduler.get_queue_size

Returns the number of items currently awaiting Execution.

def get_queue_size(self):
  """
  Returns the number of items currently awaiting Execution.
  """
  return self.__main_queue.qsize()

def remove(

self, task)

Inheritance: Scheduler.remove

Remove a scheduled task from the queue. This is a best effort remove, the task could still possibly run. This is most useful to cancel recurring tasks. If there is more then one task with this callable scheduled only the first one is removed.

task callable task to remove from the scheduled tasks list.

def remove(self, task):
  """
  Remove a scheduled task from the queue.  This is a best effort remove, the task could still possibly run.  This is most useful to cancel recurring tasks.
  If there is more then one task with this callable scheduled only the first one is removed.
  `task` callable task to remove from the scheduled tasks list.
  """
  count = 0
  found = False
  for tasks in self.__delayed_tasks.safeIterator():
    if tasks[1] == task:
      found = True
      break
    else:
      count+=1
  if found:
    self.__delayed_tasks.pop(count)
    return True
  return False

def schedule(

self, task, delay=0, recurring=False, key=None, args=(), kwargs={})

Inheritance: Scheduler.schedule

This schedules a task to be executed. It can be delayed, and set to a key. It can also be marked as recurring.

task is a callable to be called on the Scheduler.

delay this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

recurring set this to True if this should be a recurring. You should be careful that delay is > 0 when setting this to True

key this is any python object to use as a key. All tasks using this key will be ran in a single threaded manor.

args are the arguments to pass to the callable when called.

kwargs are the keyword args to be passed to the callable when called.

def schedule(self, task, delay=0, recurring=False, key=None, args=(), kwargs={}):
  """
  This schedules a task to be executed.  It can be delayed, and set to a key.  It can also be marked
  as recurring. 
  
  `task` is a callable to be called on the Scheduler.
  `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.
  `recurring` set this to True if this should be a recurring.  You should be careful that delay is > 0 when setting this to True
  `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.
  `args` are the arguments to pass to the callable when called.
  `kwargs` are the keyword args to be passed to the callable when called.
  """
  if delay > 0:
    s_task = int(self.__clock.accurate_time() * 1000) + delay
    send = False
    if delay/1000.0 <= self.__get_next_wait_time():
      send = True
    self.__delayed_tasks.add((s_task, task, delay, recurring, key, args, kwargs))
    if send:
      self.__main_queue.put((self.__empty, (), {}))
  else:
    if key != None:
      self.__key_lock.acquire()
      if key not in self.__keys:
        tmp = KeyRunner()
        self.__keys[key] = tmp
      self.__key_lock.release()
      run_key = self.__keys[key]
      run_key.add((task, args, kwargs))
      run_key.lock.acquire()
      if not run_key.in_queue and run_key.size() > 0:
        run_key.in_queue = True
        self.__main_queue.put((run_key.run_all, (), {}))
      run_key.lock.release()
    else:
      self.__main_queue.put((task, args, kwargs))

def schedule_with_future(

self, task, delay=0, key=None, args=(), kwargs={})

Inheritance: Scheduler.schedule_with_future

Returns a ListenableFuture for this task. Once the task is completed the future will also be completed. This works pretty much exactly like schedule except you can not make a task recurring.

task is a callable to be called on the Scheduler.

delay this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

key this is any python object to use as a key. All tasks using this key will be ran in a single threaded manor.

args are the arguments to pass to the callable when called.

kwargs are the keyword args to be passed to the callable when called.

def schedule_with_future(self, task, delay=0, key=None, args=(), kwargs={}):
  """
  Returns a `ListenableFuture` for this task.  Once the task is completed the future will also be completed. 
  This works pretty much exactly like `schedule` except you can not make a task recurring.
  
  `task` is a callable to be called on the Scheduler.
  `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.
  `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.
  `args` are the arguments to pass to the callable when called.
  `kwargs` are the keyword args to be passed to the callable when called.
  """
  job=(task, args, kwargs)
  future = ListenableFuture()
  self.schedule(futureJob, delay=delay, key=key, args=(future, job))
  return future

def shutdown(

self)

Inheritance: Scheduler.shutdown

Shuts down the threadpool. Any task currently on the queue will be ran, but all Scheduled tasks will removed and no more tasks can be added.

def shutdown(self):
  """
  Shuts down the threadpool.  Any task currently on the queue will be ran, but all Scheduled tasks 
  will removed and no more tasks can be added.
  """
  self.__running = False
  self.__delayed_tasks.clear()
  self.execute(self.__internal_shutdown)

def shutdown_now(

self)

Inheritance: Scheduler.shutdown_now

Shuts down the threadpool. Any task currently being executed will still complete, but the queue will be emptied out.

def shutdown_now(self):
  """
  Shuts down the threadpool.  Any task currently being executed will still complete, 
  but the queue will be emptied out.
  """
  self.__running = False
  self.__delayed_tasks.clear()
  while not self.__main_queue.empty():
    self.__main_queue.get_nowait()
  self.__internal_shutdown()

class KeyRunner

A class to wrap keys objects for the executer. Items can be added to the KeyRunner while its running.
This is used to keep all tasks for a given key in one thread.

class KeyRunner(object):
  """
  A class to wrap keys objects for the executer.
  Items can be added to the KeyRunner while its running.  
  This is used to keep all tasks for a given key in one thread.
  """
  def __init__(self):
    self.__run = list()
    self.lock = threading.Condition()
    self.in_queue = False

  def size(self):
    return len(self.__run)

  def add(self, task):
    """
    Add a task to this runner set.

    `task` adds callable task to the current keyrunner set.
    """
    self.lock.acquire()
    self.__run.append(task)
    self.lock.release()

  def run_next(self):
    """
    Run the next item for this key.
    """
    self.lock.acquire()
    runner = self.__run.pop(0)
    self.lock.release()
    runner[0](*runner[1], **runner[2])

  def run_all(self):
    """
    Run all items in this keyRunner.
    """
    while len(self.__run) > 0:
      self.run_next()
      if len(self.__run) == 0:
        self.lock.acquire()
        if len(self.__run) == 0:
          self.in_queue = False
          self.lock.release()
          break
        self.lock.release()

Ancestors (in MRO)

Instance variables

var in_queue

var lock

Methods

def __init__(

self)

def __init__(self):
  self.__run = list()
  self.lock = threading.Condition()
  self.in_queue = False

def add(

self, task)

Add a task to this runner set.

task adds callable task to the current keyrunner set.

def add(self, task):
  """
  Add a task to this runner set.
  `task` adds callable task to the current keyrunner set.
  """
  self.lock.acquire()
  self.__run.append(task)
  self.lock.release()

def run_all(

self)

Run all items in this keyRunner.

def run_all(self):
  """
  Run all items in this keyRunner.
  """
  while len(self.__run) > 0:
    self.run_next()
    if len(self.__run) == 0:
      self.lock.acquire()
      if len(self.__run) == 0:
        self.in_queue = False
        self.lock.release()
        break
      self.lock.release()

def run_next(

self)

Run the next item for this key.

def run_next(self):
  """
  Run the next item for this key.
  """
  self.lock.acquire()
  runner = self.__run.pop(0)
  self.lock.release()
  runner[0](*runner[1], **runner[2])

def size(

self)

def size(self):
  return len(self.__run)

class ListenableFuture

This class i used to make a Future that can have listeners and callbacks added to it. Once setter(object) is called all listeners/callbacks are also called. Callbacks will be given the set object, and .get() will return said object.

class ListenableFuture():
  """
  This class i used to make a Future that can have listeners and callbacks added to it.
  Once setter(object) is called all listeners/callbacks are also called.  Callbacks will 
  be given the set object, and .get() will return said object.
  """
  def __init__(self):
    self.lock = threading.Condition()
    self.settable = None
    self.listeners = list()
    self.callables = list()
  
  def addListener(self, listener, args=(), kwargs={}):
    """
    Add a listener function to this ListenableFuture.  Once set is called on this future
    all listeners will be ran.  Arguments for the listener can be given if needed.
    
    `listener` a callable that will be called when the future is completed
    `args` tuple arguments that will be passed to the listener when called.
    `kwargs` dict keyword arguments to be passed to the passed listener when called.
    """
    if self.settable == None:
      self.listeners.append((listener, args, kwargs))
    else:
      listener(*args, **kwargs)

  def addCallable(self, cable, args=(), kwargs={}):
    """
    Add a callable function to this ListenableFuture.  Once set is called on this future
    all callables will be ran.  This works the same as the listener except the set object is passed 
    as the first argument when the callable is called. Arguments for the listener can be given if needed.
    
    `cable` a callable that will be called when the future is completed, it must have at least 1 argument.
    `args` tuple arguments that will be passed to the listener when called.
    `kwargs` dict keyword arguments to be passed to the passed listener when called.
    """
    if self.settable is None:
      self.callables.append((cable, args, kwargs))
    else:
      cable(self.settable, *args, **kwargs)

  def get(self, timeout=2**32):
    """
    This is a blocking call that will return the set object once it is set.
    
    `timeout` The max amount of time to wait for get (in seconds).  If this is reached a null is returned
    `returns` the set object.  This can technically be anything so know what your listening for.
    """
    if self.settable is not None:
      return self.settable
    start = time.time()
    try:
      self.lock.acquire()
      while self.settable is None and time.time() - start < timeout:
        self.lock.wait(timeout - (time.time() - start))
      return self.settable
    finally:
      self.lock.release()

  def setter(self, obj):
    """
    This is used to complete this future. Whatever thread sets this will be used to call all 
    listeners and callables for this future.
    
    `obj` The object you want to set on this future (usually use just True if you dont care)
    """
    if self.settable is None:
      self.settable = obj
      self.lock.acquire()
      self.lock.notify_all()
      self.lock.release()
      while len(self.listeners) > 0:
        i = self.listeners.pop(0)
        try:
          i[0](*i[1], **i[2])
        except Exception as e:
          print "Exception calling listener", i[0], e
      while len(self.callables) > 0:
        i = self.callables.pop(0)
        try:
          i[0](self.settable, *i[1], **i[2])
        except Exception as e:
          print "Exception calling listener", i[0], e
    else:
      raise Exception("Already Set!")

Ancestors (in MRO)

Instance variables

var callables

var listeners

var lock

var settable

Methods

def __init__(

self)

def __init__(self):
  self.lock = threading.Condition()
  self.settable = None
  self.listeners = list()
  self.callables = list()

def addCallable(

self, cable, args=(), kwargs={})

Add a callable function to this ListenableFuture. Once set is called on this future all callables will be ran. This works the same as the listener except the set object is passed as the first argument when the callable is called. Arguments for the listener can be given if needed.

cable a callable that will be called when the future is completed, it must have at least 1 argument. args tuple arguments that will be passed to the listener when called. kwargs dict keyword arguments to be passed to the passed listener when called.

def addCallable(self, cable, args=(), kwargs={}):
  """
  Add a callable function to this ListenableFuture.  Once set is called on this future
  all callables will be ran.  This works the same as the listener except the set object is passed 
  as the first argument when the callable is called. Arguments for the listener can be given if needed.
  
  `cable` a callable that will be called when the future is completed, it must have at least 1 argument.
  `args` tuple arguments that will be passed to the listener when called.
  `kwargs` dict keyword arguments to be passed to the passed listener when called.
  """
  if self.settable is None:
    self.callables.append((cable, args, kwargs))
  else:
    cable(self.settable, *args, **kwargs)

def addListener(

self, listener, args=(), kwargs={})

Add a listener function to this ListenableFuture. Once set is called on this future all listeners will be ran. Arguments for the listener can be given if needed.

listener a callable that will be called when the future is completed args tuple arguments that will be passed to the listener when called. kwargs dict keyword arguments to be passed to the passed listener when called.

def addListener(self, listener, args=(), kwargs={}):
  """
  Add a listener function to this ListenableFuture.  Once set is called on this future
  all listeners will be ran.  Arguments for the listener can be given if needed.
  
  `listener` a callable that will be called when the future is completed
  `args` tuple arguments that will be passed to the listener when called.
  `kwargs` dict keyword arguments to be passed to the passed listener when called.
  """
  if self.settable == None:
    self.listeners.append((listener, args, kwargs))
  else:
    listener(*args, **kwargs)

def get(

self, timeout=4294967296)

This is a blocking call that will return the set object once it is set.

timeout The max amount of time to wait for get (in seconds). If this is reached a null is returned returns the set object. This can technically be anything so know what your listening for.

def get(self, timeout=2**32):
  """
  This is a blocking call that will return the set object once it is set.
  
  `timeout` The max amount of time to wait for get (in seconds).  If this is reached a null is returned
  `returns` the set object.  This can technically be anything so know what your listening for.
  """
  if self.settable is not None:
    return self.settable
  start = time.time()
  try:
    self.lock.acquire()
    while self.settable is None and time.time() - start < timeout:
      self.lock.wait(timeout - (time.time() - start))
    return self.settable
  finally:
    self.lock.release()

def setter(

self, obj)

This is used to complete this future. Whatever thread sets this will be used to call all listeners and callables for this future.

obj The object you want to set on this future (usually use just True if you dont care)

def setter(self, obj):
  """
  This is used to complete this future. Whatever thread sets this will be used to call all 
  listeners and callables for this future.
  
  `obj` The object you want to set on this future (usually use just True if you dont care)
  """
  if self.settable is None:
    self.settable = obj
    self.lock.acquire()
    self.lock.notify_all()
    self.lock.release()
    while len(self.listeners) > 0:
      i = self.listeners.pop(0)
      try:
        i[0](*i[1], **i[2])
      except Exception as e:
        print "Exception calling listener", i[0], e
    while len(self.callables) > 0:
      i = self.callables.pop(0)
      try:
        i[0](self.settable, *i[1], **i[2])
      except Exception as e:
        print "Exception calling listener", i[0], e
  else:
    raise Exception("Already Set!")

class Scheduler

Main Scheduler Object.

class Scheduler(object):
  """
  Main Scheduler Object.
  """

  def __init__(self, poolsize):
    """
    Construct an Scheduler instance with the set thread pool size.
    
    `poolsize` positive integer for the number of threads you want in this pool .
    """

    self.__log = logging.getLogger("root.threadly")
    self.__clock = Clock()
    self.__key_lock = threading.Condition()
    self.__poolsize = poolsize
    self.__running = True
    self.__in_shutdown = False
    self.__main_queue = Queue.Queue()
    self.__delayed_tasks = SortedLockingList()
    self.__in_delay = False
    self.__threads = list()
    self.__delay_lock = threading.Condition()
    self.__keys = dict()
    for i in xrange(self.__poolsize):
      tmp_thread = threading.Thread(target=self.__thread_pool)
      tmp_thread.name = "Executor-Pool-Thread-%d"%(i)
      tmp_thread.daemon = True
      tmp_thread.start()
      self.__threads.append(tmp_thread)

  def get_poolsize(self):
    """
    Returns the number of threads used in this Pool.
    """
    return len(self.__threads)

  def get_queue_size(self):
    """
    Returns the number of items currently awaiting Execution.
    """
    return self.__main_queue.qsize()

  def execute(self, task, args=(), kwargs={}):
    """
    Execute a given task as soon as possible.
    
    `task` is a callable to be called on the Scheduler.

    `args` are the arguments to pass to the callable when called.

    `kwargs` are the keyword args to be passed to the callable when called.
    """
    self.schedule(task, args=args, kwargs=kwargs)

  def schedule_with_future(self, task, delay=0, key=None, args=(), kwargs={}):
    """
    Returns a `ListenableFuture` for this task.  Once the task is completed the future will also be completed. 
    This works pretty much exactly like `schedule` except you can not make a task recurring.
    
    `task` is a callable to be called on the Scheduler.

    `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

    `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.

    `args` are the arguments to pass to the callable when called.

    `kwargs` are the keyword args to be passed to the callable when called.
    """
    job=(task, args, kwargs)
    future = ListenableFuture()
    self.schedule(futureJob, delay=delay, key=key, args=(future, job))
    return future

  def schedule(self, task, delay=0, recurring=False, key=None, args=(), kwargs={}):
    """
    This schedules a task to be executed.  It can be delayed, and set to a key.  It can also be marked
    as recurring. 
    
    `task` is a callable to be called on the Scheduler.

    `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

    `recurring` set this to True if this should be a recurring.  You should be careful that delay is > 0 when setting this to True

    `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.

    `args` are the arguments to pass to the callable when called.

    `kwargs` are the keyword args to be passed to the callable when called.
    """
    if delay > 0:
      s_task = int(self.__clock.accurate_time() * 1000) + delay
      send = False
      if delay/1000.0 <= self.__get_next_wait_time():
        send = True
      self.__delayed_tasks.add((s_task, task, delay, recurring, key, args, kwargs))
      if send:
        self.__main_queue.put((self.__empty, (), {}))
    else:
      if key != None:
        self.__key_lock.acquire()
        if key not in self.__keys:
          tmp = KeyRunner()
          self.__keys[key] = tmp
        self.__key_lock.release()
        run_key = self.__keys[key]
        run_key.add((task, args, kwargs))
        run_key.lock.acquire()
        if not run_key.in_queue and run_key.size() > 0:

          run_key.in_queue = True
          self.__main_queue.put((run_key.run_all, (), {}))
        run_key.lock.release()
      else:
        self.__main_queue.put((task, args, kwargs))

  def remove(self, task):
    """
    Remove a scheduled task from the queue.  This is a best effort remove, the task could still possibly run.  This is most useful to cancel recurring tasks.
    If there is more then one task with this callable scheduled only the first one is removed.

    `task` callable task to remove from the scheduled tasks list.
    """
    count = 0
    found = False
    for tasks in self.__delayed_tasks.safeIterator():
      if tasks[1] == task:
        found = True
        break
      else:
        count+=1
    if found:
      self.__delayed_tasks.pop(count)
      return True
    return False

  def shutdown(self):
    """
    Shuts down the threadpool.  Any task currently on the queue will be ran, but all Scheduled tasks 
    will removed and no more tasks can be added.
    """
    self.__running = False
    self.__delayed_tasks.clear()
    self.execute(self.__internal_shutdown)

  def shutdown_now(self):
    """
    Shuts down the threadpool.  Any task currently being executed will still complete, 
    but the queue will be emptied out.
    """
    self.__running = False
    self.__delayed_tasks.clear()
    while not self.__main_queue.empty():
      self.__main_queue.get_nowait()
    self.__internal_shutdown()

  def __internal_shutdown(self):
    self.__running = False
    for tmp_thread in self.__threads:
      while tmp_thread != None and tmp_thread.isAlive() and threading != None and tmp_thread != threading.current_thread():
        self.__main_queue.put((self.__empty, (), {}))

  def __empty(self):
    pass

  def __get_next_wait_time(self):
    tmp = self.__delayed_tasks.peek()
    if tmp == None or self.__delayed_tasks.size() == 0:
      return 2**32
    else:
      task = tmp[0] - int(self.__clock.accurate_time()*1000)
      return (task/1000.0)-.0005

  def __check_delay_queue(self):
    dl = self.__delayed_tasks.lock()
    if dl:
      try:
        to = self.__get_next_wait_time()
        while to <= 0:
          run_task = self.__delayed_tasks.pop(0)
          self.schedule(run_task[1], key=run_task[4], args=run_task[5], kwargs=run_task[6])
          #run_task[3] is recurring, if so we add again as a scheduled event
          if run_task[3] == True and not self.__in_shutdown:
            self.schedule(run_task[1], run_task[2], run_task[3], run_task[4], run_task[5], run_task[6])
          to = self.__get_next_wait_time()
      finally:
        self.__delayed_tasks.unlock()
    return dl


  def __thread_pool(self):
    while self.__running:
      try:
        runner = None
        to = self.__get_next_wait_time()
        if to <= 0 and self.__check_delay_queue():
          to = self.__get_next_wait_time()
        if to <= 0:
          to = 5
        if runner == None:
          runner = self.__main_queue.get(True, to)
        if runner != None:
          runner[0](*runner[1], **runner[2])
      except IndexError as exp:
        pass
      except EmptyException as exp:
        pass
      except Exception as exp:
        self.__log.error("Exception while Executing: %s, %s"%(runner, exp))

Ancestors (in MRO)

Methods

def __init__(

self, poolsize)

Construct an Scheduler instance with the set thread pool size.

poolsize positive integer for the number of threads you want in this pool .

def __init__(self, poolsize):
  """
  Construct an Scheduler instance with the set thread pool size.
  
  `poolsize` positive integer for the number of threads you want in this pool .
  """
  self.__log = logging.getLogger("root.threadly")
  self.__clock = Clock()
  self.__key_lock = threading.Condition()
  self.__poolsize = poolsize
  self.__running = True
  self.__in_shutdown = False
  self.__main_queue = Queue.Queue()
  self.__delayed_tasks = SortedLockingList()
  self.__in_delay = False
  self.__threads = list()
  self.__delay_lock = threading.Condition()
  self.__keys = dict()
  for i in xrange(self.__poolsize):
    tmp_thread = threading.Thread(target=self.__thread_pool)
    tmp_thread.name = "Executor-Pool-Thread-%d"%(i)
    tmp_thread.daemon = True
    tmp_thread.start()
    self.__threads.append(tmp_thread)

def execute(

self, task, args=(), kwargs={})

Execute a given task as soon as possible.

task is a callable to be called on the Scheduler.

args are the arguments to pass to the callable when called.

kwargs are the keyword args to be passed to the callable when called.

def execute(self, task, args=(), kwargs={}):
  """
  Execute a given task as soon as possible.
  
  `task` is a callable to be called on the Scheduler.
  `args` are the arguments to pass to the callable when called.
  `kwargs` are the keyword args to be passed to the callable when called.
  """
  self.schedule(task, args=args, kwargs=kwargs)

def get_poolsize(

self)

Returns the number of threads used in this Pool.

def get_poolsize(self):
  """
  Returns the number of threads used in this Pool.
  """
  return len(self.__threads)

def get_queue_size(

self)

Returns the number of items currently awaiting Execution.

def get_queue_size(self):
  """
  Returns the number of items currently awaiting Execution.
  """
  return self.__main_queue.qsize()

def remove(

self, task)

Remove a scheduled task from the queue. This is a best effort remove, the task could still possibly run. This is most useful to cancel recurring tasks. If there is more then one task with this callable scheduled only the first one is removed.

task callable task to remove from the scheduled tasks list.

def remove(self, task):
  """
  Remove a scheduled task from the queue.  This is a best effort remove, the task could still possibly run.  This is most useful to cancel recurring tasks.
  If there is more then one task with this callable scheduled only the first one is removed.
  `task` callable task to remove from the scheduled tasks list.
  """
  count = 0
  found = False
  for tasks in self.__delayed_tasks.safeIterator():
    if tasks[1] == task:
      found = True
      break
    else:
      count+=1
  if found:
    self.__delayed_tasks.pop(count)
    return True
  return False

def schedule(

self, task, delay=0, recurring=False, key=None, args=(), kwargs={})

This schedules a task to be executed. It can be delayed, and set to a key. It can also be marked as recurring.

task is a callable to be called on the Scheduler.

delay this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

recurring set this to True if this should be a recurring. You should be careful that delay is > 0 when setting this to True

key this is any python object to use as a key. All tasks using this key will be ran in a single threaded manor.

args are the arguments to pass to the callable when called.

kwargs are the keyword args to be passed to the callable when called.

def schedule(self, task, delay=0, recurring=False, key=None, args=(), kwargs={}):
  """
  This schedules a task to be executed.  It can be delayed, and set to a key.  It can also be marked
  as recurring. 
  
  `task` is a callable to be called on the Scheduler.
  `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.
  `recurring` set this to True if this should be a recurring.  You should be careful that delay is > 0 when setting this to True
  `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.
  `args` are the arguments to pass to the callable when called.
  `kwargs` are the keyword args to be passed to the callable when called.
  """
  if delay > 0:
    s_task = int(self.__clock.accurate_time() * 1000) + delay
    send = False
    if delay/1000.0 <= self.__get_next_wait_time():
      send = True
    self.__delayed_tasks.add((s_task, task, delay, recurring, key, args, kwargs))
    if send:
      self.__main_queue.put((self.__empty, (), {}))
  else:
    if key != None:
      self.__key_lock.acquire()
      if key not in self.__keys:
        tmp = KeyRunner()
        self.__keys[key] = tmp
      self.__key_lock.release()
      run_key = self.__keys[key]
      run_key.add((task, args, kwargs))
      run_key.lock.acquire()
      if not run_key.in_queue and run_key.size() > 0:
        run_key.in_queue = True
        self.__main_queue.put((run_key.run_all, (), {}))
      run_key.lock.release()
    else:
      self.__main_queue.put((task, args, kwargs))

def schedule_with_future(

self, task, delay=0, key=None, args=(), kwargs={})

Returns a ListenableFuture for this task. Once the task is completed the future will also be completed. This works pretty much exactly like schedule except you can not make a task recurring.

task is a callable to be called on the Scheduler.

delay this is the time to wait (in milliseconds!!) before scheduler will call the passed task.

key this is any python object to use as a key. All tasks using this key will be ran in a single threaded manor.

args are the arguments to pass to the callable when called.

kwargs are the keyword args to be passed to the callable when called.

def schedule_with_future(self, task, delay=0, key=None, args=(), kwargs={}):
  """
  Returns a `ListenableFuture` for this task.  Once the task is completed the future will also be completed. 
  This works pretty much exactly like `schedule` except you can not make a task recurring.
  
  `task` is a callable to be called on the Scheduler.
  `delay` this is the time to wait (in milliseconds!!) before scheduler will call the passed task.
  `key` this is any python object to use as a key.  All tasks using this key will be ran in a single threaded manor.
  `args` are the arguments to pass to the callable when called.
  `kwargs` are the keyword args to be passed to the callable when called.
  """
  job=(task, args, kwargs)
  future = ListenableFuture()
  self.schedule(futureJob, delay=delay, key=key, args=(future, job))
  return future

def shutdown(

self)

Shuts down the threadpool. Any task currently on the queue will be ran, but all Scheduled tasks will removed and no more tasks can be added.

def shutdown(self):
  """
  Shuts down the threadpool.  Any task currently on the queue will be ran, but all Scheduled tasks 
  will removed and no more tasks can be added.
  """
  self.__running = False
  self.__delayed_tasks.clear()
  self.execute(self.__internal_shutdown)

def shutdown_now(

self)

Shuts down the threadpool. Any task currently being executed will still complete, but the queue will be emptied out.

def shutdown_now(self):
  """
  Shuts down the threadpool.  Any task currently being executed will still complete, 
  but the queue will be emptied out.
  """
  self.__running = False
  self.__delayed_tasks.clear()
  while not self.__main_queue.empty():
    self.__main_queue.get_nowait()
  self.__internal_shutdown()

class Singleton

A Simple inheritable singleton

class Singleton(object):
  """A Simple inheritable singleton"""

  __single = None

  def __new__(cls, *args, **kwargs):
    if cls != type(cls.__single):
      cls.__single = object.__new__(cls, *args, **kwargs)
    return cls.__single

Ancestors (in MRO)

class SortedLockingList

This is a sortedList implementation for multiThreads. One main goal is to make adds as cheap as possible.

class SortedLockingList:
  """
  This is a sortedList implementation for multiThreads.  One main goal is to make adds as cheap as possible.
  """
  def __init__(self):
    self.slist = list()
    self.uslist = list()
    self.__lock = threading.Condition()

  def clear(self):
    """
    clears out the list
    """
    self.__lock.acquire()
    self.slist = list()
    self.uslist = list()
    self.__lock.release()


  def lock(self):
    """
    Returns `True` if you get the lock `False` if you dont.

    Non-Blocking lock request, returns True if you get the lock false if you dont.  This is the main
    lock for the list once acquired you must release before any other thread can access the list.
    """
    return self.__lock.acquire(False)

  def unlock(self):
    """
    Releases the lists lock.
    """
    self.__lock.release()

  def size(self):
    """
    Returns and `int` of the current size of the list.
    """
    return len(self.slist) + len(self.uslist)

  def peek(self):
    """
    Returns the first entry in the list, this does not remove the entry from the list.
    """
    self.__lock.acquire()
    self.__combine()
    if len(self.slist) == 0:
      tmp = None
    else:
      tmp = self.slist[0]
    self.__lock.release()
    return tmp

  def pop(self, i=0):
    """
    Returns either the first entry from the list or the spesified entry.
    """
    self.__lock.acquire()
    self.__combine()
    tmp = self.slist.pop(i)
    self.__lock.release()
    return tmp
  
  def add(self, item):
    """
    Adds an entry to the list.

    `item` entry to add to the list.
    """
    self.uslist.append(item)
  
  def __combine(self):
    try:
      self.__lock.acquire()
      while len(self.uslist) > 0:
        item = self.uslist.pop(0)
        c = len(self.slist)
        if c == 0:
          self.slist.append(item)
        elif item < self.slist[0]:
          self.slist.insert(0, item)
        elif c == 1 or item > self.slist[c-1]:
          self.slist.append(item)
        else:
          l = self.slist
          lmax = len(l)-1
          ch = c/2
          while True:
            if item < l[ch]:
              if ch == 0:
                print "ERROR:"
                return
              else:
                lmax = ch-1
                ch = ch/2
            elif item > l[ch]:
              if ch >= lmax:
                self.slist.insert(ch+1, item)
                break
              else:
                diff = lmax-ch
                ch = ch+((diff/2)+1)
            else:
              l.insert(ch, item)
              break
    finally:
      self.__lock.release()

  def remove(self, item):
    """
    Removes an item from the list.
  
    `item` item to remove from the list.
    """
    try:
      self.__lock.acquire()
      self.__combine()
      self.slist.remove(item)
    except:
      pass
    finally:
      self.__lock.release()

  def safeIterator(self):
    """
    This is a non-Blocking safe iterator for the list.  It is essentially just a copy of the sorted lists
    entries at the time it was called. 
    """
    try:
      self.__lock.acquire()
      self.__combine()
    finally:
      self.__lock.release()
    local = list(self.slist)
    for i in local:
      yield i

Ancestors (in MRO)

Instance variables

var slist

var uslist

Methods

def __init__(

self)

def __init__(self):
  self.slist = list()
  self.uslist = list()
  self.__lock = threading.Condition()

def add(

self, item)

Adds an entry to the list.

item entry to add to the list.

def add(self, item):
  """
  Adds an entry to the list.
  `item` entry to add to the list.
  """
  self.uslist.append(item)

def clear(

self)

clears out the list

def clear(self):
  """
  clears out the list
  """
  self.__lock.acquire()
  self.slist = list()
  self.uslist = list()
  self.__lock.release()

def lock(

self)

Returns True if you get the lock False if you dont.

Non-Blocking lock request, returns True if you get the lock false if you dont. This is the main lock for the list once acquired you must release before any other thread can access the list.

def lock(self):
  """
  Returns `True` if you get the lock `False` if you dont.
  Non-Blocking lock request, returns True if you get the lock false if you dont.  This is the main
  lock for the list once acquired you must release before any other thread can access the list.
  """
  return self.__lock.acquire(False)

def peek(

self)

Returns the first entry in the list, this does not remove the entry from the list.

def peek(self):
  """
  Returns the first entry in the list, this does not remove the entry from the list.
  """
  self.__lock.acquire()
  self.__combine()
  if len(self.slist) == 0:
    tmp = None
  else:
    tmp = self.slist[0]
  self.__lock.release()
  return tmp

def pop(

self, i=0)

Returns either the first entry from the list or the spesified entry.

def pop(self, i=0):
  """
  Returns either the first entry from the list or the spesified entry.
  """
  self.__lock.acquire()
  self.__combine()
  tmp = self.slist.pop(i)
  self.__lock.release()
  return tmp

def remove(

self, item)

Removes an item from the list.

item item to remove from the list.

def remove(self, item):
  """
  Removes an item from the list.

  `item` item to remove from the list.
  """
  try:
    self.__lock.acquire()
    self.__combine()
    self.slist.remove(item)
  except:
    pass
  finally:
    self.__lock.release()

def safeIterator(

self)

This is a non-Blocking safe iterator for the list. It is essentially just a copy of the sorted lists entries at the time it was called.

def safeIterator(self):
  """
  This is a non-Blocking safe iterator for the list.  It is essentially just a copy of the sorted lists
  entries at the time it was called. 
  """
  try:
    self.__lock.acquire()
    self.__combine()
  finally:
    self.__lock.release()
  local = list(self.slist)
  for i in local:
    yield i

def size(

self)

Returns and int of the current size of the list.

def size(self):
  """
  Returns and `int` of the current size of the list.
  """
  return len(self.slist) + len(self.uslist)

def unlock(

self)

Releases the lists lock.

def unlock(self):
  """
  Releases the lists lock.
  """
  self.__lock.release()