August 27, 2022

Redis Clone: Improved IO Control

We left with unsuccessful attempts to improve the IO by only using Virtual Threads and the classing blocking IO classes in the previous post.

This time we are using the NIO network API to get more fined grained control to the IO pattern. As a recap: We try to avoid blocking while flushing each individual response. This way we take advantage of the pipelined requests: We get a bunch of requests from the client, answer all of them and amortize the flush over multiple responses.

More IO Control
Figure 1. More IO Control

Unfortunately, the NIO API is not great. It felt clunky 20 year ago (yes, it was introduced with Java 1.4 in 2002), and feels very clunky by now. There are improved APIs like the AsynchronousSocketChannel added later, but they didn’t quite fit my approach. Generally, I recommend using a library like Netty, Grizzly or others. I actually ended up using the Netty’s ByteBuf library, just to have a better buffer abstraction than Java’s ByteBuffer.

The non-blocking NIO is intended for multiplexing multiple socket operations on that single thread. That usually ends with some callback / event-driven code. However, I wanted to keep classic blocking code style code with the virtual threads. The NIO API certainly wasn’t designed for that =). Anyway, onwards with the code.

First, the code is now using the ServerSocketChannel API instead of the ServerSocket API. First, we start with the quite regular accepting of connections and scheduling it on the virtual threads.

Accept Connections:
var scheduler = Executors.newVirtualThreadPerTaskExecutor();
var socket = ServerSocketChannel.open();
socket.bind(new InetSocketAddress("0.0.0.0", 16379));

System.out.println("App is listening on 0.0.0.0:16379");
var clone = new RedisClone();
while (true) {
    var client = socket.accept();
    scheduler.execute(() -> {
        try (client) {
            clone.handleConnection(client);
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
}

The main loop operation loop stays the more or less the same, using classic Java blocking style code, and relying on the virtual threads for the multiplexing on actual kernel threads. But we’ll use our special Writer and Reader classes to do the IO operations.

public void handleConnection(SocketChannel socket) throws Exception {
    var args = new ArrayList<String>();
    // COn
    socket.configureBlocking(false);
    // Configure the channel to be non-blocking: Our Writer and Reader class will control the blocking
    socket.configureBlocking(false);
    // Replace the JDK IO streams with our Reader and Writer
    var writer = new Writer(socket);
    var reader = new Reader(socket, writer);

    while (true) {
        args.clear();
        var line = reader.readLine();
        if (line == null)
            break;

        // Existing code from before

        var reply = executeCommand(args);
        if (reply == null) {
            writer.write("$-1\r\n");
        } else {
            writer.write("$" + reply.length() + "\r\n" + reply + "\r\n");
        }
    }
}

The Writer writes the data into a buffer. If enough data accumulated in the buffer, then we write it to the socket, without blocking. Since the answers are small, we already flush after a kilo byte in this example. Plus there is a method to explicilty flush.

Writer:
class Writer {
    final SocketChannel socket;
    final ByteBuf writeBuffer = Unpooled.buffer(4 * 1024);

    public Writer(SocketChannel socket) throws IOException {
        this.socket = socket;
        assert !socket.isBlocking();
    }

    public void write(String toWrite) throws Exception {
        var bytes = toWrite.getBytes(UTF_8);
        ensureAvailableWriteSpace(writeBuffer);
        writeBuffer.writeBytes(bytes);

        final var AUTO_FLUSH_LIMIT = 64;
        if (AUTO_FLUSH_LIMIT < writeBuffer.readableBytes()) {
            // A bit confusing in this use site: We read the buffers content into the channel: aka write to the channel
            var written = writeBuffer.readBytes(socket, writeBuffer.readableBytes());
            // If we want proper handling of the back pressure by waiting for the channel to be writable.
            // But for this example we ignore such concerns and just grow the writeBuffer defiantly
        }
    }

    public void flush() throws IOException {
        if (writeBuffer.readableBytes() > 0) {
            writeBuffer.readBytes(socket, writeBuffer.readableBytes());
        }
    }

    // The Netty ByteBufs are not circular buffer: Writes always go to the end and may grow the buffer
    // I assume the underlying reason is to make it more efficient to interact with Java NIO.
    // So, if we're running out of writeable space, discard the bytes already written and
    // copy the not yet read bytes to the start of the buffer, giving it enough space to write more at the end.
    static int ensureAvailableWriteSpace(ByteBuf buf) {
        final var MIN_WRITE_SPACE = 1024;

        if (buf.writableBytes() < MIN_WRITE_SPACE) {
            buf.discardReadBytes();
        }
        return Math.max(MIN_WRITE_SPACE, buf.writableBytes());
    }
}

The Reader is a bit more elaborate. We check if we find new line characters in our read buffer, and return the lines. Otherwise, we read from the socket into our read buffer. If we can’t read more data, we flush any pending answers and then wait until more data arrives. Here we are using the NIO Selector purely for waiting for more data and not as a multiplexer as intended.

Reader:
class Reader {
    final SocketChannel socket;
    final Writer writer;
    final ByteBuf readBuffer = Unpooled.buffer(8 * 1024);
    private final Selector selector;


    public Reader(SocketChannel socket, Writer writer) throws IOException {
        this.socket = socket;
        this.writer = writer;
        this.selector = Selector.open();
        socket.register(selector, SelectionKey.OP_READ, this);
    }

    public String readLine() throws Exception {
        var eof = false;
        while (!eof) {
            var readIndex = readBuffer.readerIndex();
            var toIndex = readBuffer.readableBytes();
            // Find the next line in the read content
            var foundNewLine = readBuffer.indexOf(readIndex, readIndex + toIndex, (byte) '\n');
            if (foundNewLine >= 0) {
                var length = foundNewLine - readIndex;
                String line = readBuffer.toString(readIndex, length - 1, UTF_8);
                readBuffer.readerIndex(readIndex + length + 1);
                return line;
            } else {
                // Otherwise, read from the socket
                int readSize = ensureAvailableWriteSpace(readBuffer);
                // A bit confusing in this use case: We write the content of the socket into the buffer: aka read from the channel
                var read = readBuffer.writeBytes(socket, readSize);
                if (read < 0) {
                    eof = true;
                } else if (read == 0) {
                    // If we read nothing, ensure we flushed our previous reponses
                    writer.flush();
                    // And then wait until the socket becomes readable again
                    selector.select(key -> {
                        if (!key.isReadable()) {
                            throw new AssertionError("Expect to be readable again");
                        }
                    });
                }
            }
        }
        return null;
    }

}

Performance Numbers

What is the performance after all this work? It did improve from our original naive solution by up to 15% in throughput. Plus the p99% latency nearly halved from 8ms to 4.7ms.

Improved performance:
============================================================================================================================
Type         Ops/sec     Hits/sec   Misses/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec
----------------------------------------------------------------------------------------------------------------------------
Sets       382280.79          ---          ---         1.82333         1.59900         4.79900        45.31100    113448.09
Gets      3822767.78   1548404.66   2274363.12         1.82050         1.59900         4.79900        45.05500    540549.78
Waits           0.00          ---          ---             ---             ---             ---             ---          ---
Totals    4205048.56   1548404.66   2274363.12         1.82076         1.59900         4.79900        45.05500    653997.87

Profiling Again

I also profiled this new solution as before

This solution also improved the memory pressure as a side effect. We have less GC pressure than before and less memory churn. However, a lot of boxed integers are allocated, grrr. These boxed integers seem to come from Java’s virtual thread implementation. Again virtual threads are cheap and give you more understandable code, but they are not for free.

GC Summary
Figure 2. GC Summary
Memory Usage
Figure 3. Memory Usage

Also, if we look at the wall clock profiling, then now all the time is waiting with 'select' for the socket to become readable. The reading, lookup, writing, and flushing are a tiny sliver timewise by now. Great, that was the goal.

Memory Usage
Figure 4. CPU Wall Time

Next Steps

The next planned step it to reduce the memory churn further.

Stay tuned ;)

Full Code

▼Click for full source code:
package info.gamlor.redis;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

import static info.gamlor.redis.Writer.ensureAvailableWriteSpace;
import static java.nio.charset.StandardCharsets.UTF_8;

public class RedisMain {

    public static void main(String[] args) throws Exception {
        var scheduler = Executors.newVirtualThreadPerTaskExecutor();
        var socket = ServerSocketChannel.open();
        socket.bind(new InetSocketAddress("0.0.0.0", 16379));

        System.out.println("App is listening on 0.0.0.0:16379");
        var clone = new RedisClone();
        while (true) {
            var client = socket.accept();
            scheduler.execute(() -> {
                try (client) {
                    clone.handleConnection(client);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

class RedisClone {
    private final ConcurrentHashMap<String, String> state = new ConcurrentHashMap<>();

    public void handleConnection(SocketChannel socket) throws Exception {
        var args = new ArrayList<String>();
        // Configure the channel to be non-blocking: Our Writer and Reader class will control the blocking
        socket.configureBlocking(false);
        // Replace the JDK IO streams with our Reader and Writer
        var writer = new Writer(socket);
        var reader = new Reader(socket, writer);

        while (true) {
            args.clear();
            var line = reader.readLine();
            if (line == null)
                break;

            if (line.charAt(0) != '*')
                throw new RuntimeException("Cannot understand arg batch: " + line);

            var argsv = Integer.parseInt(line.substring(1));
            for (int i = 0; i < argsv; i++) {
                line = reader.readLine();
                if (line == null || line.charAt(0) != '$')
                    throw new RuntimeException("Cannot understand arg length: " + line);
                var argLen = Integer.parseInt(line.substring(1));
                line = reader.readLine();
                if (line == null || line.length() != argLen)
                    throw new RuntimeException("Wrong arg length expected " + argLen + " got: " + line);

                args.add(line);
            }

            var reply = executeCommand(args);
            if (reply == null) {
                writer.write("$-1\r\n");
            } else {
                writer.write("$" + reply.length() + "\r\n" + reply + "\r\n");
            }
        }
    }

    String executeCommand(List<String> args) {
        switch (args.get(0)) {
            case "GET":
                return state.get(args.get(1));
            case "SET":
                state.put(args.get(1), args.get(2));
                return null;
            default:
                throw new IllegalArgumentException("Unknown command: " + args.get(1));
        }
    }
}

class Writer {
    final SocketChannel socket;
    final ByteBuf writeBuffer = Unpooled.buffer(4 * 1024);

    public Writer(SocketChannel socket) throws IOException {
        this.socket = socket;
        assert !socket.isBlocking();
    }

    public void write(String toWrite) throws Exception {
        var bytes = toWrite.getBytes(UTF_8);
        ensureAvailableWriteSpace(writeBuffer);
        writeBuffer.writeBytes(bytes);

        final var AUTO_FLUSH_LIMIT = 1024;
        if (AUTO_FLUSH_LIMIT < writeBuffer.readableBytes()) {
            // A bit confusing in this use case: We read the buffers content into the socket: aka write to the socket
            var written = writeBuffer.readBytes(socket, writeBuffer.readableBytes());
            // If we want proper handling of the back pressure by waiting for the channel to be writable.
            // But for this example we ignore such concerns and just grow the writeBuffer defiantly
        }
    }

    public void flush() throws IOException {
        if (writeBuffer.readableBytes() > 0) {
            writeBuffer.readBytes(socket, writeBuffer.readableBytes());
        }
    }

    // The Netty ByteBufs are not circular buffer: Writes always go to the end and may grow the buffer
    // I assume the underlying reason is to make it more efficient to interact with Java NIO.
    // So, if we're running out of writeable space, discard the bytes already written and
    // copy the not yet read bytes to the start of the buffer, giving it enough space to write more at the end.
    static int ensureAvailableWriteSpace(ByteBuf buf) {
        final var MIN_WRITE_SPACE = 1024;

        if (buf.writableBytes() < MIN_WRITE_SPACE) {
            buf.discardReadBytes();
        }
        return Math.max(MIN_WRITE_SPACE, buf.writableBytes());
    }
}

class Reader {
    final SocketChannel socket;
    final Writer writer;
    final ByteBuf readBuffer = Unpooled.buffer(8 * 1024);
    private final Selector selector;


    public Reader(SocketChannel socket, Writer writer) throws IOException {
        this.socket = socket;
        this.writer = writer;
        this.selector = Selector.open();
        socket.register(selector, SelectionKey.OP_READ, this);
    }

    public String readLine() throws Exception {
        var eof = false;
        while (!eof) {
            var readIndex = readBuffer.readerIndex();
            var toIndex = readBuffer.readableBytes();
            // Find the next line in the read content
            var foundNewLine = readBuffer.indexOf(readIndex, readIndex + toIndex, (byte) '\n');
            if (foundNewLine >= 0) {
                var length = foundNewLine - readIndex;
                String line = readBuffer.toString(readIndex, length - 1, UTF_8);
                readBuffer.readerIndex(readIndex + length + 1);
                return line;
            } else {
                // Otherwise, read from the socket
                int readSize = ensureAvailableWriteSpace(readBuffer);
                // A bit confusing in this use case: We write the content of the socket into the buffer: aka read from the channel
                var read = readBuffer.writeBytes(socket, readSize);
                if (read < 0) {
                    eof = true;
                } else if (read == 0) {
                    // If we read nothing, ensure we flushed our previous reponses
                    writer.flush();
                    // And then wait until the socket becomes readable again
                    selector.select(key -> {
                        if (!key.isReadable()) {
                            throw new AssertionError("Expect to be readable again");
                        }
                    });
                }
            }
        }
        return null;
    }

}

Twitter Comments

Ron Pressler, who’s working on Java’s virtual threads had a few good comments on Twitter.

Summary:

  • The Integers are not allocated by the virtual threads. That is from something else ;)

  • The Selector API isn’t intended to be consumed on virtual threads. Use something like Netty on regular threads if you want a nicer API.

Tags: Performance Java Development Redis-Clone