RFC: my iterthreader module



I have this iterthreader module that I've been working on for a while
now. It is similar to itertools.imap, but it calls each function in
its own thread and uses Queues for moving the data around. A better
name for it would probably be ithreadmap, but anyway...

The short explanation of it is if you have a loop like
for item in biglist:
print "The value for %s is %s" % (item, slowfunc(item))
or
for item,val in ((item, slowfunc(item)) for item in biglist):
print "The value for %s is %s" % (item, val)

you can simply rewrite it as

for item,val in iterthreader.Threader(slowfunc, biglist):
print "The value for %s is %s" % (item, val)

and it will hopefully run faster. The usual GIL issues still apply of
course.... You can also subclass it in various ways, but I almost
always just call it in the above manner.

So, can anyone find any obvious problems with it? I've been meaning to
re-post [1] it to the python cookbook, but I'd like to hear what
others think first. I'm not aware of any other module that makes this
particular use of threading this simple.

[1] I _think_ I posted it before, but that may have just been in a
comment

import threading
import Queue

class Threader:
def __init__(self, func=None, data=None, numthreads=2):
if not numthreads > 0:
raise AssertionError("numthreads should be greater than 0")

if func:
self.handle_input=func
if data:
self.get_input = lambda : data

self._numthreads=numthreads
self.threads = []
self.run()


def __iter__(self):
return self

def next(self):
still_running, input, output = self.DQ.get()
if not still_running:
raise StopIteration
return input, output

def get_input(self):
raise NotImplementedError, "You must implement get_input as a
function that returns an iterable"

def handle_input(self, input):
raise NotImplementedError, "You must implement handle_input as
a function that returns anything"

def _handle_input(self):
while 1:
work_todo, input = self.Q.get()
if not work_todo:
break
self.DQ.put((True, input, self.handle_input(input)))

def cleanup(self):
"""wait for all threads to stop and tell the main iter to
stop"""
for t in self.threads:
t.join()
self.DQ.put((False,None,None))


def run(self):
self.Q=Queue.Queue()
self.DQ=Queue.Queue()
for x in range(self._numthreads):
t=threading.Thread(target=self._handle_input)
t.start()
self.threads.append(t)

try :
for x in self.get_input():
self.Q.put((True, x))
except NotImplementedError, e:
print e
for x in range(self._numthreads):
self.Q.put((False, None))

threading.Thread(target=self.cleanup).start()


--
- Justin

.



Relevant Pages

  • Re: abstract method in Ruby
    ... recognized in the static world and therefore would make a good fit in Ruby. ... raise NotImplementedError ... def process_incoming ... i think it'd be great of rdoc understood and abstract_method hook just like it ...
    (comp.lang.ruby)
  • Re: itertools.izip brokeness
    ... At the root of it is the iterator protocol not having an ungetmethod for pushing back unused elements of the data stream. ... def __iter__: """As in the traditional iterable protocol, returns an iterator over this object. ... raise NotImplementedError def __xiter__: ...
    (comp.lang.python)
  • Command config, quitting, binary, Timer
    ... I am still ignorant about Tkinter. ... def binary_conv: ... and calls a function for each tick. ... I think that maybe it can be added to the threading standard module. ...
    (comp.lang.python)
  • A better way to timeout a class method?
    ... Is there a concise Pythonic way to write a method with a timeout? ... def get_value: ... from threading import Timer ... from threading import Thread ...
    (comp.lang.python)
  • Re: namespace & imported modules
    ... I had to dust off my Learning Python book, which I had read cover to ... see the function (class instance method to be exact) defined in the ... # A customized class to terminate a thread using the threading ... def stethoscope(): ...
    (comp.lang.python)