Integrating Twisted Into Your Application: An Exercise in Refactoring

Twisted

OMG TWISTED IS AWESOME!!

SERIOUSLY YOU SHOULD TOTALLY USE IT

A True Story

OK, not really...but it could have been true! I am going to tell a story, which could have been true, and is a simplification of many true stories. Of course, like in all stories, we will have a hero, he will have to battle his way through the obstacles, and we will have a happy end. Promise!

Our Hero

Our hero is Joe K. Programmer. JKP is responsible for a server, and a small program to communicate with it. He designed the protocol himself, called it "JNHP" (Joe's Non-distributed Hash Protocol), and is very proud of it. His protocol goes something like this:

Client->Server: "STORE name value\r\n"
Server->Client: "W00T!\r\n"
Client->Server: "STORE name value2\r\n"
Server->Client: "RARGH!\r\n"
Client->Server: "EXISTS name\r\n"
Server->Client: "AWESOME!\r\n"
Client->Server: "EXISTS name1\r\n"
Server->Client: "RARGH!\r\n"
Client->Server: "GET name\r\n"
Server->Client: "value\r\n"
Client->Server: "GET name\r\n"
Server->Client: "value\r\n"
Client->Server: "GET name1\r\n"
Server->Client: "\r\n"

So JKP looks around the standard library. JKP find the SocketServer module. "Oh, man!" thinks JKP to himself, "this rocks! I only have to write a few easy functions and everything just works? Man, no wonder they told me Python was easy to use!"

myDict = {}

class JNHP(SocketServer.BaseRequestHandler):
    def handle(self):
        for line in self.rfile:
            args = line.split()
            getattr(self, 'handle_'+args[0], *args[1:])
    def handle_STORE(self, name, value):
        if name in myDict:
            self.wfile.write('RARGH!\r\n')
        else:
            myDict[name] = value
            self.wfile.write('W00T!\r\n')
    def handle_EXISTS(self, name, value):
        if name not in myDict:
            self.wfile.write('RARGH!\r\n')
        else:
            self.wfile.write('AWESOME!\r\n')
    def handle_GET(self, name, value):
        self.wfile.write(myDict.get(name,''))
        self.wfile.write('\r\n')

def main():
    server = TCPServer(('', 8111), JNHP)
    server.serve_forever()

if __name__ == '__main__':
    main()

Now, our friend Joe, he is not stupid. He heard there is something called Twisted. He looks at it five minutes, and decides he will definitely use it...on his next project, when he has time and when he needs all these features. But for now, SocketServer is just fine, isn't it?

Well, not really.

For example, Joe's users continuously complain that when a client is connected, no other client is connected. Of course, Joe only tested with the one client, but some people have the audacity to expect more than one user for a server. After Joe is convinced that he will actually have to implement this feature, he gives in. He reads a little bit about SocketServer and comes up with the best idea EVER.

Refusal of the Call

def main():
    server = ForkingTCPServer(('', 8111), JNHP)

Now, there's only this one small tiny itsy bitsy problem. See, everything works fine...until a client disconnects. And, then, well, you see...

def main():
    server = ThreadingTCPServer(('', 8111), JNHP)

Which works great...until...two clients do STOREs at the same time. The protocol is supposed to guarantee a successful store, well, actually stores. Unfortunately, there's a race condition.

Supernatural Aid

Poor ol' Joe is sad. Nothing works. Perhaps he should use Twisted anyway? He quickly reads up on the finger tutorial, and sees it's not that hard.

As the "all-knowing observer", let me give you the five-minute summary of Twisted programming. Brace yourself: we're going to fly through the finger tutorial at the speed of light. Keep your arms and heads inside, and watch out for the Doppler effect.

Event-based

Twisted is an event-based framework. One writes event handlers, and instead of actively pursuing the events, takes a Zen-like approach of laying back and letting the events come. In a network program, usually the most interesting event is dataReceived.

High-level

Here's how to write an ECHO server in Twisted:

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)
factory = protocol.Factory()
factory.protocol = Echo
reactor.listenTCP(1024, factory)

Here's how to write a simplified finger server in Twisted:

class Finger(protocol.Protocol):
    def lineReceived(self, line):
        if line == 'moshez':
            self.transport.write('Giving a talk, silly!')
        else:
            self.transport.write('At moshez's talk, probably.')
factory = protocol.Factory()
factory.protocol = Finger
reactor.listenTCP(1024, factory)

With Good Abstractions

Specifically, Deferred:

d = defer.Deferred()
def _(o):
    print o
d.addCallback(_)
o.callback('hello world') # will print "hello world"

d = defer.Deferred()
def _(o):
    return o+1
def p(o):
    print "it is", o
d.addCallback(_)
d.addCallback(p)
o.callback(4) # will print "it is 5"

d = defer.Deferred()
def divide(o):
    return 1/o
def catch(o):
    return "oh, no, it didn't work"
def awesome(o):
    return "it is %s" % o
def pr(o):
    print o
d.addCallback(divide)
d.addCallbackErrback(callback=awesome, errback=catch)
d.addCallback(pr)
d.callback(0) # will print "oh, no, it didn't work"

The Crossing of the First Threshold

myDict = {}

class JNHP(basic.LineReceiver):
    wfile = property(lambda self: return self.transport)
    def lineReceived(self, line):
        args = line.split()
        getattr(self, 'handle_'+args[0], *args[1:])
    # handle_* methods are the same

def main():
    factory = protocol.Factory()
    factory = JNHP
    reactor.listenTCP(8111, factory)
    reactor.run()

Yes, Joe is blown away. It was certainly easy enough...wasn't it?

The Belly of the Whale

Unbeknowest to Joe, the JNHP server has really taken off in the world, and millions are people are using it. Joe was smart enough to release it under an Open Source license, so he is getting patches back -- naturally, patches against JNHP V1 (with the old Threaded SocketServer implementation -- this was the version Joe released before realizing it doesn't hold under pressure.)

It turns out that some users needed to run a program to ascertain each 'STORE' operation was valid, and rejecting it otherwise. A whole bunch of programs have been written -- querying signatures against databases, making sure mp3s posted are not copyrighted, etc. etc.

The problem? Many of those programs take a long time to complete. When Joe tried integrating the patch directly into the new handle_STORE method, it turns out to hang all of Twisted...users across the world are annoyed at the the beta version, which hangs often.

The Road of Trials

Luckily, Joe hears about Twisted's process handling API. Based on reactor.spawnProcess, he writes the following model:

class DetermineAllowed(protocol.ProcessProtocol):
    def connectionMade(self):
        self.lineParser = basic.LineReceiver()
        self.lineParser.makeConnection(None)
        self.lineParser.lineReceived = self.lineReceived
        self.d = defer.Deferred()
    def outReceived(self, data):
        self.lineParser.dataReceived(data)
    def lineReceived(self, line):
        if line.startswith('JNHP:'):
            self.d.callback(int(line.split(':', 1)[1]))
        self.d = None
    def processEnded(self, statusObject):
        if self.d:
            self.d.errback(failure.Failure(ValueError("No JNHP data")))

class JNHP(basic.LineReceiver):
    # ...
    def handle_STORE(self, name, value):
        if name in myDict:
            self.wfile.write('RARGH!\r\n')
        protocol = DetermineAllowed()
        reactor.spawnProcess(protocol, 'jnhp-determine',
                                        ['jnhp-determine', value])
        d = protocol.d
        d.addErrback(log.err)
        def _(val):
            if val :
                myDict[name] = value
                self.wfile.write('W00T!\r\n')
            else:
                self.wfile.write('RARGH!\r\n')
        d.addCallback(_)

Temptation from the True Path

Joe happily releases this version, complete with patch for running external program and all. However, all is not well in the JNHP user community: many V1-JNHP users, have, apparently, been working on a complete modular framework to verify JNHP STORE requests. The problem? The interface is blocking, and...you can guess the rest. Joe considers just going back to ThreadedTCPServer, and just using mutexes. There are dark times ahead!

Luckily, Joe has learned about Twisted's thread support, and is now trying to see if it can be used instead:

class JNHP(basic.LineReceiver):
    # ...
    def handle_STORE(self, name, value):
        if name in myDict:
            self.wfile.write('RARGH!\r\n')
        d = deferred.deferToThread(determineAllowed, value)
        d.addErrback(log.err)
        def _(val):
            if val :
                myDict[name] = value
                self.wfile.write('W00T!\r\n')
            else:
                self.wfile.write('RARGH!\r\n')
        d.addCallback(_)

Atonement with the Father

Now, Joe attempts to unify all these variants into one big happy API:

class JNHP(basic.LineReceiver):
    # ...
    def handle_STORE(self, name, value):
        # ...
        d = deferred.maybeDeferred(self.factory.determineAllowed, value)
        # ...

class ProcessFactory(protocol.Factory):
    protocol = JNHP
    def def __init__(self, progname):
        self.progname = progname
    def determineAllowed(self, value):
        protocol = DetermineAllowed()
        reactor.spawnProcess(protocol, 'jnhp-determine',
                                        ['jnhp-determine', value])
        d = protocol.d
        return d

class FunctionFactory(protocol.Factory):
    protocol = JNHP
    def def __init__(self, function):
        self.function = function
    def determineAllowed(self, value):
        return self.function(value)