Deferred Processing To Queues

As of the 0.9.2 release there is preliminary support for deferring handling of a mail message to a queue for another process to deal with in a separate handler. This support is rough at this time, but still useful and not too difficult to configure. As the feature gets more use it will improve and hopefully turn into a generic “defer to queue” system in Lamson.

What is meant by “defer to queue” is simply that you take messages your state function receives and you dump them into a maildir queue. You then have another separate process read from this queue and do the real work. Potentially you could have many processes deal with this work, and they could even be on multiple computers.

A More Concrete Example

Imagine that you have a blog posting system and you want to update a big “front page index” that shows recent posts by your users. However, you don’t want to generate this index on every blog post users make, since that could involve expensive computation and hold up other threads that need to deal with more urgent email.

The solution is to do the minimum quick processing you can in your POSTING state function, and then use the lamson.queue.Queue to queue up messages meant for “front page indexing”. Here’s how that code might go:

@route("(post_name)@osb\\.(host)")
def POSTING(message, post_name=None, host=None):
    # do the regular posting to blog thing
    name, address = parseaddr(message['from'])
    post.post(post_name, address, message)
    msg = view.respond('page_ready.msg', locals())
    relay.deliver(msg)

    # drop the message off into the 'posts' queue for later
    index_q = queue.Queue("run/posts")
    index_q.push(message)

    return POSTING

You can see that you just drop it into the queue with push(message) and it’s done. What you don’t see is how this then gets picked up by another process to actually do somehing with this email.

Configuring A config/queue.py

In Lamson you are given control over how your software boots, which gives you the ability to configure extra services how you need. By default the lamson gen command just outputs a basic config/boot.py and config/testing.py file so you can get working, and these will work for most development purposes.

In this tutorial you get to write a new boot configuration and tell Lamson how to use it. We’ll be copying the original boot file over first:

$ cp config/boot.py config/queue.py

Next you want to edit this file so that instead of running an SMTPReceiver it will use a QueueReceiver configured to pull out of the run/posts queue you are using in your POSTING handler.

...
# where to listen for incoming messages
settings.receiver = QueueReceiver(settings.queue_config['queue'],
                                  settings.queue_config['sleep'])

settings.database = configure_database(settings.database_config, also_create=False)

Router.defaults(**settings.router_defaults)
# NOTE: this is using a different handlers variable in settings
Router.load(settings.queue_handlers)
Router.RELOAD=True
...

I’ve removed the code above the … and below it since it’s the same in the two files. Notice that you have a QueueReceiver now, and that you are telling the Router that it will use settings.queue_handlers for the list of handlers to load and run.

You now add these two lines to your config/settings.py:

...
# this is for when you run the config.queue boot
queue_config = {'queue': 'run/posts', 'sleep': 10}

queue_handlers = ['app.handlers.index']

The queue_config variable is read by the config/queue.py file for the QueueReceiver and the queue_handlers is fed to the Router as described above.

Writing The Index Handler

You now have to write a new handler that is in app/handlers/index.py so that this config.queue boot setup will load it and run it whenever a message hits the run/queue. Here’s the code:

from lamson import queue
from lamson.routing import route, stateless
import logging

@route("(post_name)@osb\\.(host)")
@stateless
def START(message, post_name=None, host=None):
    logging.debug("Got message from %s", message['from'])

This simple demonstration will just log what messages it receives so you can make sure it is working.

There are two points to notice about this handler. First, it is marked stateless because it will run independent of the regular Lamson server, and you don’t want its parallel operations to interfere with your normal server’s state operations. Second, it uses a Router.defaults named post_name that you would add to your config.settings.router_defaults.

Once you have all this slightly complicated setup done you are ready to test it.

Also note that the examples in the source releases have code that does a deferred queue similar to this. Go look there for more code to steal.

Running Your Queue Receiver

Run your logger and lamson server like normal:

$ lamson log
$ lamson start

Next, go look in your logs and make sure it works by running your unit tests:

$ nosetests
................
----------------------------------------------------------------------
Ran 16 tests in 1.346s

OK

Your logs should look normal, but now you should see some files in the run/posts/new directory:

$ ls run/posts/new/
1244080328.M408474P3147Q4.mycomputer.local

That’s the results of your POSTING handler putting the messages it receives into your run/posts maildir queue.

Finally, you’ll want to run your queue receiver:

$ lamson start -boot config.queue -pid run/queue.pid

If you’re running the code given above then you should see this in the logs/lamson.log file:

...
DEBUG:root:Sleeping for 10 seconds...
DEBUG:root:Pulled message with key:
'1244080328.M408474P3147Q4.zed-shaws-macbook.local' off
DEBUG:root:Message received from Peer: 'run/posts', From:
'sender-1244080328.22@sender.com', to To
['test.blog.1244080328@osb.test.com'].
DEBUG:root:Got message from sender-1244080328.22@sender.com
DEBUG:root:Message to test.blog.1244080328@osb.test.com was handled by
app.handlers.index.START

Which means your queue receiver is running. You could in theory run as many of these as you wanted, as long as their handlers are stateless.

When you’re done you can stop the whole setup with the following command:

$ lamson stop -ALL run
Stopping processes with the following PID files:
['run/log.pid', 'run/queue.pid', 'run/smtp.pid']
Attempting to stop lamson at pid 3092
Attempting to stop lamson at pid 3157
Attempting to stop lamson at pid 3096

Further Advanced Usage

This configuration is debatable whether it is very usable or not, but it works and will improve as the project continues. To give you some ideas of what you can do with it:

  1. Defer activity to other machines or processes.
  2. Receive messages from other mail systems that know maildir.
  3. Deliver messages to other maildir aware systems.
  4. Process messages from a web application, and possibly even generic work.

It might also be possible to actually make your state functions transition to the queue handler states by simply having the function return the module.FUNCTION that should be next. Take care with this though as it means your end user’s actions are effectively blocked for that event until the next run of the queue receiver.

Call For Suggestions

Feel free to offer suggestions in improving this setup (or even better code).