| tags:rust k:v store distributed cormorant categories:Rust

Boilerplate Beginning - Piccolo Part 1

Rustc version: rustc 1.4.0-nightly (7d78f2d33 2015-09-01)

Four months ago, I decided to build a toy distributed key:value store in Rust. As so often happens, real life got in the way and I tabled the project for a while.

But no longer! I've resurrected this idea and have started hacking out some code. My plan is largely the same with one major difference: I've decided to use Mioco instead of Mio directly. Mioco is a thin wrapper over Mio that provides coroutine support.

Mio and Mioco

In Mio, you register “interests” on the event loop, and are later notified when that interest is “ready”.

Say you want to read data off of a stream. You register an interest that says “Hi Mio! Please tell me when this stream has data”, then you carry on with your business. At some future point, Mio will notify you and say “Hey, that connection you asked about? It has some data now”.

Once notified, you can read/write data, etc. Mio is “just” an event loop, it is up to you to manage state. E.g. you'll likely have a per-connection struct which acts as metadata and a state machine for each connection.

Mioco removes some of that cognitive burden. Mioco allows you to spawn stackful coroutines (user-space “green threads” with their own stack, all cooperatively scheduled on a single kernel thread). Mioco then registers these coroutines into Mio's event loop.

As the developer, you write plain o'l blocking code. Under the covers, the operations are actually asynchronous. When your coroutine needs to block, such as waiting for data on a socket, mioco halts execution of that coroutine and resumes a different one. And because each coroutine maintains its own stack, it implicitly acts as a state machine for execution of that connection/work unit/whatever.

This is the first time I've worked with coroutines, so if anyone has tips for best practice / performance, please share them!

Let's dive in and start looking at the first batch of commits.

Commits #cb3f57d .. #ea35ae5

Piccolo is essentially a glorified, distributed echo server at the moment. It can accept command-line arguments, load and parse a config file, “cluster” with other nodes and spam each other with some data before disconnecting. Future installments should be more interesting as we build more complex functionality.

I'll highlight a few of the interesting bits.


Static configuration is stored in config.toml in the root directory, or specified via a commandline argument (--config). toml-rs supports the Decodable trait, so I can deserialize the config straight into a struct.

Given this config toml:

hosts = ["", ""]
bind_host = ""
bind_port = 19919

We can parse straight into the struct, due to the magic of #[derive(RustcDecodable)]:

#[derive(RustcDecodable, Debug)]
pub struct Config  {
    pub discovery: DiscoveryConfig

#[derive(RustcDecodable, Debug)]
pub struct DiscoveryConfig  {
    pub hosts: Vec<String>,
    pub bind_host: String,
    pub bind_port: u16

pub fn parse(path: String) -> Config {

    // ... bunch of code to load the file, parse the toml ... //

    // Once we have a Toml Table, we can call ::decode and
    // serialize straight into Config
    let config = Value::Table(toml.unwrap());
    match toml::decode(config) {
        Some(t) => t,
        None => panic!("Error while deserializing config")


Once the config has been loaded, the main method wraps the config in an Arc<RWLock<>> (so multiple threads can access, and sometimes modify, the config). It then starts the networking stack by invoking NetworkHandler::start()

This method attempts to bind and listen on the host:port defined in your config. If the port is taken, it retries successively higher ports until a free one is found. This just makes it easier to spin up local clusters for testing:

let sock = try!(TcpSocket::v4());

// Try to bind the requested host:port, but if the port is taken,
// keep incrementing until we find one
loop {
    match sock.bind(&addr) {
        Ok(_) => break,
        Err(_) => {
            error!("Port [{}] taken, trying higher", addr.port());
            addr = build_address(bind_host, addr.port() + 1);
let listener = try!(sock.listen(1024));

// If we had to choose a new port, obtain a write-lock and update the config
if addr.port() != bind_port {
    let mut c = config.write().unwrap();
    c.discovery.bind_port = addr.port();

Once we are listening for new connections, we perform two actions: start the “Discovery” service and begin accepting connections. Let's talk about accepting connections first, since it is simpler.

Accepting Connections

When our listening socket receives a new connection, it unblocks the coroutine and returns a TcpStream. Rather than dealing with the connection in this coroutine, we spawn a new one and go back to blocking on accept(). This lets us offload the management of individual connections to other coroutines:

// Accept connections forever
loop {
    let conn = try!(listener.accept());

    // If we get one, spawn a new coroutine and go back to
    // listening for more connections
    mioco.spawn(move |mioco| {
        connection(mioco, conn)

The contents of connection() is straight forward. We wrap the stream with Mioco, create a buffer, then start trying to read data off the socket. The recv() method will block until data is available for reading, at which point we echo it straight back and then go back to recv'ing.

Discovery Service

The Discovery service is a little more interesting. The job of Discovery is to reach out to other nodes and attempt to establish a node-to-node connection. It loads the list of “other nodes” from our config, filters out our own address and then spawns a coroutine per-host

    let reader = config.read().unwrap();
    for host in reader.discovery.hosts.iter().filter(|&x| x != &*bind_host) {
        let addr: SocketAddr = FromStr::from_str(&*host).unwrap_or_else(|err|panic!("{}: [{}]", err, host));

        mioco.spawn(move |mioco| {
            discovery(mioco, addr)

What happens inside of discover() is similar to accepting connections, but with a few more complications. We first create a socket, then instruct it to connect to an address. Both of these operations are non-blocking and return immediately.

This is confusing at first, since the socket will “connect” and return a TcpStream… but that stream is (usually) not writeable yet. The reason is simple: establishing a TCP connection between hosts takes time (three way handshake, etc). If the mio waited for the stream to finish connecting, the entire event loop would stall on that operation.

Instead, mio creates the stream and returns it immediately while the TCP negotiation happens in the background. To determine if/when the connection is complete, you must:

  1. Check if complete == true. If it is, you won the lottery and the stream connected immediately
  2. If complete == false, ask mioco to block the coroutine until the socket is writeable
  3. When the coroutine unblocks, ask mio if there are any errors on the socket
  4. If no errors, yay, you connected!
  5. If there are errors, the connection was likely refused and you should try something else
// Mio returns a stream and a completion flag.  If `complete` is
// true, the socket connected immediately and we can start writing.
// It usually returns false
let socket = TcpSocket::v4().unwrap();
let (stream, mut complete) = socket.connect(&addr).unwrap();
let stream = mioco.wrap(stream);

// If the socket didn't connect immediately, select_write() will
// block this coroutine until it is writeable, then we check for errors
if !complete {
    complete = match stream.with_raw(|s| s.take_socket_error()) {
        Ok(_) => true,
        Err(e) => {
            debug!("err: {:?}", e);

This is a bit clunky, but the author of mioco is working on some syntactic sugar to make it more friendly.

Once we have actually connected to a remote host, we need to talk to it. We'll spawn a new coroutine to handle it, one coroutine per remote host.

However, unlike the listener socket from before, the currently running coroutine does not need to manage multiple connections; it is dedicated to the single remote host. So rather than spawning a coroutine and looping back to the top, we will spawn a coro and then block on a mailbox.

Mailboxes are mioco's wrapper around mio notifications, and basically act as channels that can block/unblock coroutines. For now, the mailbox will simply alert that the coroutine has failed with a boolean, signaling that the discovery coroutine should start trying to reconnect.

// ... spawn a coroutine to handle talking to the remote node
if complete {

    // We will use the mailboxes to notify our discovery coro if
    // the remote peer fails at some point in the future, so we can
    // can try to reconnect
    let (mail_send, mail_recv) = mioco::mailbox::<bool>();

    mioco.spawn(move |mioco| {
        remote_node_handler(mioco, stream, mail_send)

    // And then block this coroutine waiting for a failure notification
    let mut mail_recv = mioco.wrap(mail_recv);
    let failed = mail_recv.recv();
    debug!("Remote connection failed, sleep and try to reconnect...");

It's not clear to me if spawning then parking a coroutine is the best idea. I could have instead called a plain ol’ function which returns when failure is detected. It would probably be cheaper (function call vs coroutine context switch) and and probably reduce the burden on mio by registering fewer interests. If anyone has opinions, I'd be happy to hear.

Finally, the work done in remote_node_handler() is very simple. We just write some data, wait a few seconds, then drop the connection.


So, what does it look like when two nodes are talking to each other?

INFO:piccolo::network_handler: Binding
INFO:piccolo::network_handler: Server bound on V4(
DEBUG:piccolo::network_handler: Connecting to external node []...
DEBUG:piccolo::network_handler: Event { id: EventSourceId(0), rw: Both }
DEBUG:piccolo::network_handler: err: Error { repr: Os { code: 61, message: "Connection refused" } }
DEBUG:piccolo::network_handler: Accepting connection from []
DEBUG:piccolo::network_handler: Read [4] bytes from []
DEBUG:piccolo::network_handler: Writing [4] bytes to []
DEBUG:piccolo::network_handler: Event { id: EventSourceId(2), rw: Write }
DEBUG:piccolo::network_handler: Wrote [4] bytes to remote connection
DEBUG:piccolo::network_handler: Shutting down remote connection...
DEBUG:piccolo::network_handler: Remote connection failed, sleep and try to reconnect...
DEBUG:piccolo::network_handler: Accepting connection from []
DEBUG:piccolo::network_handler: Read [4] bytes from []
DEBUG:piccolo::network_handler: Writing [4] bytes to []
DEBUG:piccolo::network_handler: Event { id: EventSourceId(4), rw: Write }
DEBUG:piccolo::network_handler: Wrote [4] bytes to remote connection
DEBUG:piccolo::network_handler: Read [0] bytes from []
DEBUG:piccolo::network_handler: HUP from []

The node basically starts listening, attempts to connect to the remote socket repeatedly until the other node comes online, then spams messages at each other until closed.


That's all for now. Lots of boilerplate to get out of the way, hopefully future installments will be more interesting as I get into more nuanced features/problems. If you have any opinions about content (more / less code, more / less high-level design, etc) please let me know!

comments powered by Disqus