Worker Pools with Circuits

For some time now I’ve been wanting to greatly improve the concurrency support in circuits and implement worker pools (using threads and/or processes). A few weeks ago I finally did that. I thought it would be hard at first until it hit me;

circuits has all the tools necessary

It turns out implementing “worker pools” in circuits is quite simple. All you need to do is construct a component that will start a number of other components as workers (either in thread mode or process mode).

This component (which I call a Pool) is responsible for managing the tasks sent to it and utilizing the workers it is managing.

So here’s the code (available since circuits-1.4):

from time import time
from uuid import uuid4 as uuid
from random import seed, choice

from circuits.core.workers import Task, Worker
from circuits.core import handler, BaseComponent

seed(time())

class Pool(BaseComponent):

    channel = "pool"

    def __init__(self, min=5, max=10, process=False, channel=channel):
        super(Pool, self).__init__(channel=channel)

        self._workers = []

        for i in range(min):
            self._workers.append(Worker(process=process, channel=str(uuid())))

    @handler("task")
    def _on_task(self, f, *args, **kwargs):
        workers = float(len(self._workers))
        tasks = [float(len(worker)) for worker in self._workers]
        total = sum(tasks)
        _avg = total / workers

        assigned = None

        for worker in self._workers:
            if len(worker) < _avg:
                assigned = worker.channel
                return worker.push(Task(f, *args, **kwargs), target=worker)

        if not assigned:
            worker = choice(self._workers)
            assigned = worker.channel
            return worker.push(Task(f, *args, **kwargs), target=worker)

That’s it!

And here’s the code for a Worker:

from circuits.core import handler, BaseComponent, Event

class Task(Event):
    """Task Event"""

class Worker(BaseComponent):

    channel = "worker"

    def __init__(self, process=False, channel=channel):
        super(Worker, self).__init__(channel=channel)

        if process:
            self.start(link=self, process=True)
            self.start()
        else:
            self.start()

    @handler("task")
    def _on_task(self, f, *args, **kwargs):
        return f(*args, **kwargs)

Oh and here’s a simmple example of how you would use this:

from circuits import Task, Pool

def f():
    x = 0
    i = 0
    while i < 1000000:
        x += 1
        i += 1
    return x

p = Pool()
p.start()

# Asynchronous Task
x = p.push(Task(f))

# After a while...

print x

p.stop()