""" Like the map function, but can use a pool of threads. Really easy to use threads. eg. tmap(f, alist) If you know how to use the map function, you can use threads. """ __author__ = "Rene Dudfield" __version__ = "0.2.0" __license__ = 'Python license' import sys sys.path = ['/home/illume/tmp/lib/python/'] + sys.path import pygame import threading Thread = threading.Thread #from threading import Thread from Queue import Queue from Queue import Empty STOP = object() FINISH = object() DONE_ONE = object() DONE_TWO = object() # a default worker queue. _wq = None # if we are using threads or not. This is the number of workers. _use_workers = 0 # Set this to the maximum for the amount of Cores/CPUs # Note, that the tests early out. # So it should only test the best number of workers +2 MAX_WORKERS_TO_TEST = 64 def init(number_of_workers = 0): """ Does a little test to see if threading is worth it. Sets up a global worker queue if it's worth it. Calling init() is not required, but is generally better to do. """ global _wq, _use_workers if number_of_workers: _use_workers = number_of_workers else: _use_workers = benchmark_workers() # if it is best to use zero workers, then use that. _wq = WorkerQueue(_use_workers) def quit(): """ cleans up everything. """ global _wq, _use_workers _wq = None _use_workers = False def benchmark_workers(a_bench_func = None, the_data = None): """ does a little test to see if workers are at all faster. Returns the number of workers which works best. Takes a little bit of time to run, so you should only really call it once. You can pass in benchmark data, and functions if you want. a_bench_func - f(data) the_data - data to work on. """ global _use_workers #TODO: try and make this scale better with slower/faster cpus. # first find some variables so that using 0 workers takes about 1.0 seconds. # then go from there. # note, this will only work with pygame 1.8rc3+ # replace the doit() and the_data with something that releases the GIL import pygame import pygame.transform import time if not a_bench_func: def doit(x): return pygame.transform.scale(x, (544, 576)) else: doit = a_bench_func if not the_data: thedata = [] for x in range(10): thedata.append(pygame.Surface((155,155), 0, 32)) else: thedata = the_data best = time.time() + 100000000 best_number = 0 last_best = -1 for num_workers in range(0, MAX_WORKERS_TO_TEST): wq = WorkerQueue(num_workers) t1 = time.time() for xx in range(20): print "active count:%s" % threading.activeCount() results = tmap(doit, thedata, worker_queue = wq) t2 = time.time() wq.stop() total_time = t2 - t1 print "total time num_workers:%s: time:%s:" % (num_workers, total_time) if total_time < best: last_best = best_number best_number =num_workers best = total_time if num_workers - best_number > 1: # We tried to add more, but it didn't like it. # so we stop with testing at this number. break return best_number class WorkerQueue(object): def __init__(self, num_workers = 20): self.queue = Queue() self.pool = [] self._setup_workers(num_workers) def _setup_workers(self, num_workers): """ Sets up the worker threads NOTE: undefined behaviour if you call this again. """ self.pool = [] for _ in range(num_workers): self.pool.append(Thread(target=self.threadloop)) for a_thread in self.pool: a_thread.setDaemon(True) a_thread.start() def do(self, f, args, kwArgs): """ puts a function on a queue for running later. """ self.queue.put((f, args, kwArgs)) def stop(self): """ Stops the WorkerQueue, waits for all of the threads to finish up. """ self.queue.put(STOP) for thread in self.pool: thread.join() def threadloop(self, finish = False): """ Loops until all of the tasks are finished. """ while True: args = self.queue.get() if args is STOP: self.queue.put(STOP) self.queue.task_done() break else: try: args[0](*args[1], **args[2]) finally: # clean up the queue, raise the exception. self.queue.task_done() #raise def wait(self): """ waits until all tasks are complete. """ self.queue.join() def __del__(self): self.stop() class FuncResult: """ Used for wrapping up a function call so that the results are stored inside the instances result attribute. """ def __init__(self, f, callback = None, errback = None): """ f - is the function we that we call callback(result) - this is called when the function(f) returns errback(exception) - this is called when the function(f) raises an exception. """ self.f = f self.exception = None self.callback = callback self.errback = errback def __call__(self, *args, **kwargs): #we try to call the function here. If it fails we store the exception. try: self.result = self.f(*args, **kwargs) if self.callback: self.callback(self.result) except e: self.exception = e if self.errback: self.errback(self.exception) def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True): """ like map, but uses a thread pool to execute. num_workers - the number of worker threads that will be used. If pool is passed in, then the num_workers arg is ignored. worker_queue - you can optionally pass in an existing WorkerQueue. wait - True means that the results are returned when everything is finished. False means that we return the [worker_queue, results] right away instead. results, is returned as a list of FuncResult instances. """ if worker_queue: wq = worker_queue else: # see if we have a global queue to work with. if _wq: wq = _wq else: if num_workers == 0: return map(f, seq_args) wq = WorkerQueue(num_workers) # we short cut it here if the number of workers is 0. # normal map should be faster in this case. if len(wq.pool) == 0: return map(f, seq_args) #print "queue size:%s" % wq.queue.qsize() #TODO: divide the data (seq_args) into even chunks and # then pass each thread a map(f, equal_part(seq_args)) # That way there should be less locking, and overhead. results = [] for sa in seq_args: results.append(FuncResult(f)) wq.do(results[-1], [sa], {}) #wq.stop() if wait: #print "wait" wq.wait() #print "after wait" #print "queue size:%s" % wq.queue.qsize() if wq.queue.qsize(): raise Exception("buggy threadmap") # if we created a worker queue, we need to stop it. if not worker_queue and not _wq: #print "stoping" wq.stop() if wq.queue.qsize(): um = wq.queue.get() if not um is STOP: raise Exception("buggy threadmap") return map(lambda x:x.result, results) else: return [wq, results] if __name__ == "__main__": # testing the func result. if 0: fr = FuncResult(lambda x:x+1) fr(2) print fr.result == 3 if 0: def x(sdf): raise "asdf" fr = FuncResult(x) print fr.exception #print "best number of workers is :%s:" % (benchmark_workers()) init() r = tmap(lambda x:x+1, range(1000)) print len(r) print "_use_workers :%s:" % _use_workers import sys sys.exit() if 1: # simple example. r = tmap(lambda x:x+1, range(1000)) if 1: r = tmap(lambda x:x, range(1000)) print r r2 = range(1000) print r == r2 wq, results = tmap(lambda x:x, range(1000), num_workers = 5, wait=False) wq.threadloop() r = map(lambda x:x.result, results) print r == r2 if 1: # how to call with different functions. def f(x): return x+1 def f2(x): return x+2 wq = WorkerQueue() fr = FuncResult(f) fr2 = FuncResult(f2) wq.do(fr, [1], {}) wq.do(fr2, [1], {}) wq.stop() wq.threadloop() print fr.result == 2 print fr2.result == 3