Writing Redis in Python with asyncio 2: Shared State

Tue 20 September 2016 by James Saryerwinnie

I've been writing redis in python using asyncio. I started this project because I wanted to learn more about asyncio and thought that porting an existing project to asyncio would give me an excellent opportunity to learn about this new library.

Part 1 of this series covered how to implement a basic request/response for redis using asyncio. It covered how to use protocols, how they hooked into asyncio, and how you could parse and serialize requests and responses. Since part 1 was first published over a year ago (I know...), a few things have happened:

  1. Python 3.5 added async and await keywords which changed the recommended way for working with coroutines.
  2. I had the opportunity to speak about this topic at EuroPython 2016.

The slides for my talk are available on speakerdeak.

You can also check out the talk here.

If you've read Part 1 of this series, the EuroPython talk covered several additional topics:

  • PUBLISH/SUBSCRIBE
  • BLPOP/BRPOP (blocking queues)

Covering BLPOP/BRPOP also required a quick detour into async and await.

In the next series of posts, I wanted to discuss these topics in more detail, and cover some of the additional topics that I omitted from my talk due to time.

For the remainder of this post, we're going to look at how to implement the PUBLISH and SUBSCRIBE commands in redis using asyncio.

Publish/Subscribe

I'm assuming you're familiar with redis, but here's a quick reminder of the PUBSUB feature in redis, and what we're shooting for in this post:

And here's a video of this in action:

In the video, you see two clients SUBSCRIBE to a channel. Those clients will then block until another client comes along and issues a PUBLISH command to that channel. You can see that when the bottom client issues a PUBLISH command, the two top clients subscribed to the channel receive the published message. The redis docs on pubsub have a more detailed overview of this feature.

Let's look at how to do this in python using asyncio.

Sharing State

Whenever we receive a PUBLISH command from a client, we need to send the message being published to every previously subscribed client. Transports need to be able to talk to other transports. More generally, we need a way to share state across transports.

As a newcomer to the async world, this was one of the hardest things for me to figure out. How are you suppose to share state across connections?

What I found helpful was to first write code that assumed shared state and then figure out how to plumb it all together later. For this PUSUB feature, let's create a PubSub class that allows transports to subscribe and publish to channels:

class PubSub:
    def __init__(self):
        self._channels = {}

    def subscribe(self, channel, transport):
        self._channels.setdefault(channel, []).append(transport)
        return ['subscribe', channel, 1]

    def publish(self, channel, message):
        transports = self._channels.get(channel, [])
        message = serializer.serialize_to_wire(
            ['message', channel, message])
        for transport in transports:
            transport.write(message)
        return len(transports)

In the class above, We maintain a mapping of channel names (which are strings) to transports. Every time a client wants to subscribe to a channel we add them to the list of transports associated with that channel. Whenever a client wants to publish a message we iterate through every transport and write the message being published.

The way we'd use this class is in our RedisServerProtocol class where we'll assume we have an instance of this PubSub class available as the self._pubsub instance variable:

# In the RedisServerProtocol class:

class RedisServerProtocol:
    def __init__(self, pubsub):
        self._pubsub = pubsub

    def data_received(self, data):
        parsed = parser.parse_wire_protocol(data)
        # [COMMAND, arg1, arg2]
        command = parsed[0].lower()
        if command == b'subscribe':
            response = self._pubsub.subscribe(parsed[1], self.transport)
        elif command == b'publish':
            response = self._pubsub.publish(parsed[1], parsed[2])

For this code to work, there can only be a single instance of the PubSub class that's shared across all the incoming connections. We need a way to make sure that whenever we create a protocol instance, we can also inject a shared reference to a PubSub instance.

Let's refresh our memories first. In part 1 of this series, we talked protocols and transports. One of the main takeaways from that post is that every time a client connects to our server, there is a protocol instance and a transport instance associated with that connection. It looks like this:

A protocol factory is used to create a protocol instance which is associated with a single connection. This factory is just a callable that returns an instance of a protocol. Here's how a protocol factory is used in the asyncio code base, asyncio/selector_events.py:

 def _accept_connection2(
     self, protocol_factory, conn, extra, server=None):
     protocol = None
     transport = None
     try:
         protocol = protocol_factory()   # RedisServerProtocol
         waiter = futures.Future(loop=self)
         transport = _SelectorSocketTransport(self, sock, protocol,
                                        waiter, extra, server)
         # ...
     except Exception as exc:
         # ...
         pass

Because a protocol factory is instantiated with no args, we need some other way to bind our PubSub instance to this factory. We could use functools.partial (which is actually what's used in part 1), but I've found that having a distinct class for this has made things easier:

class ProtocolFactory:
    def __init__(self, protocol_cls, *args, **kwargs):
        self._protocol_cls = protocol_cls
        self._args = args
        self._kwargs = kwargs

    def __call__(self):
        # No arg callable is used to instantiate
        # protocols in asyncio.
        return self._protocol_cls(*self._args, **self._kwargs)

Now instead of passing the RedisServerProtocol to the loop.create_server call, we can pass an instance of the protocol factory class we just created. Here's how everything looks once it's wired together:

factory = ProtocolFactory(
    RedisServerProtocol, PubSub()
)
coro = loop.create_server(factory, hostname, port)
server = loop.run_until_complete(coro)
print("Listening on port {}".format(port))
try:
    loop.run_forever()
except KeyboardInterrupt:
    print("Ctrl-C received, shutting down.")
finally:
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    print("Server shutdown.")
return 0

And that's all you need to get a basic PUBSUB implementation up and running using asyncio.

Wrapping Up

To summarize what we've done:

  • Create a new Pubsub class that gives you the ability to subscribe a transport to a channel name as well as the ability to publish a message to a channel.
  • Update the RedisServerProtocol class to accept a reference to this object in its __init__.
  • Update RedisServerProtocol.data_received to use this _pubsub instance whenever we received a PUBLISH or SUBSCRIBE command.
  • Create a protocol factory that passes the same shared PubSub object to every protocol instance that gets created.

In the next post, we'll look at how you can implement BLPOP/BRPOP with asyncio.

One last thing. I'm in the process of getting this code on github. I'll update this post with a link once the repo is available, or you can follow me on twitter where I'll also post a link.


Comments