November 27, 2019

Synchronizing File Writes

Assuming we have a simple task to write events to a file, one event after another. Events can be written from multiple threads.

public interface EventLog {
    public final class Event{
        public final UUID id;
        public final ZonedDateTime created;
        public final String message;

        public Event(UUID id, ZonedDateTime created, String message) {
            this.id = id;
            this.created = created;
            this.message = message;
        }
    }

    void recordEvent(Event event);

    // Null Logger doing nothing to have a base line
    EventLog NULL_LOGGER = new NullLogger();

    class NullLogger implements EventLog {
        @Override
        public void recordEvent(Event event) {
            // do nothing;
        }

        @Override
        public void close() throws Exception {
        }
    }
}

In this blog post, I write a burst of events from different threads. Every few milliseconds a few hundred events are written. Then I record the latency processing these events. I’m using the HdrHistogram library to do that.

I did my test runs on a 16-core Digital Ocean cloud machine in OpenJDK 13. Do not trust any number. You need to benchmark for your specific scenario, on your hardware and you might want to use a benchmark framework like JMH. Anyway, for my small blog post, I’m rolling with my quick and dirty measurements of warming up for 2 minutes and then measure for 1 minute.

while(isRunning){
    // Random event burst once every few milliseconds
    try {
        Thread.sleep(1+rnd.nextInt(5));
    } catch (InterruptedException e) {
        isRunning = false;
        Thread.currentThread().interrupt();
    }
    // Record the time used to write a few hundred events
    var start = System.nanoTime();
    eventBurst(rnd);
    var time = System.nanoTime()-start;
    histogram.recordValue(time);
}
// Simulate writing a burst of events
private void eventBurst(Random rnd) {
    for (int i = 0; i < 200; i++) {
        byte[] data = new byte[96];
        rnd.nextBytes(data);
        var dataBase64 = Base64.getEncoder().encodeToString(data);
        var time = System.currentTimeMillis();
        var eventNo = time + "." + i;
        var timestamp = ZonedDateTime.now(ZoneOffset.UTC);
        logger.recordEvent(new EventLog.Event(UUID.randomUUID(), timestamp,
                "EventNo: " + eventNo + " Data:" + dataBase64 ));
        logger.recordEvent(new EventLog.Event(UUID.randomUUID(), timestamp,
                "EventNo: " + eventNo + " Data: Small " + rnd.nextLong()));
        logger.recordEvent(new EventLog.Event(UUID.randomUUID(),
                timestamp, "EventNo: " + eventNo + " Data: medium  " + dataBase64.substring(0, 48)));
    }
}

Simple Synchronized Writes

Let’s start with a straight forward implementation. We write the event to a file. To coordinate between threads we use a plain lock with the synchronized keyword.

Locking Approach
Figure 1. Locking Approach

LockedWriter.java

public class LockedWriter implements EventLog {
private final Writer stream;

    public LockedWriter(String file) throws Exception {
        this.stream = Files.newBufferedWriter(Paths.get(file), StandardCharsets.UTF_8,
                StandardOpenOption.CREATE, StandardOpenOption.WRITE) ;
    }

    @Override
    public synchronized void recordEvent(Event event) {
        try {
            var date = DateTimeFormatter.ISO_DATE_TIME.format(event.created);
            this.stream.append(date);
            this.stream.append(' ');
            this.stream.append(event.id.toString());
            this.stream.append(' ');
            this.stream.append(event.message);
            this.stream.append('\n');
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // ...
}

Here are my latency numbers and latency graph:

LockedWriter Results:
#[Mean    =       19.843, StdDeviation   =        8.568]
#[Max     =      241.566, Total count    =        41896]
#[Buckets =           18, SubBuckets     =         2048]
LockedWriter Latency
Figure 2. LockedWriter Latency

Synchronize Writes

Let’s try to do better. The simple implementation converted the object into bytes while holding the lock. Instead of that long, we covert the message and only lock the stream for the final write:

Locking Stream Approach
Figure 3. Locking Stream Approach
LockedStream.java
public class LockedStream implements EventLog {
    // The final output stream, writes are synchronized on it.
    private final OutputStream stream;
    // A thread local buffer to assemble the messages. Avoiding allocating a buffer for every invocation.
    private final ThreadLocal<Buffer> buffer = ThreadLocal.withInitial(Buffer::new);

    static class Buffer {
        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        OutputStreamWriter writer = new OutputStreamWriter(bytes);
    }

    public LockedStream(String file) throws Exception {
        this.stream = new BufferedOutputStream(new FileOutputStream(file));
    }

    @Override
    public void recordEvent(Event event) {
        Buffer buffer = this.buffer.get();
        var bytes = buffer.bytes;
        var writer = buffer.writer;
        try {
            addToBuffer(event, writer);
            synchronized (this.stream){
                bytes.writeTo(this.stream);
            }
            bytes.reset();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void addToBuffer(Event event, Writer writer) throws IOException {
        var date = DateTimeFormatter.ISO_DATE_TIME.format(event.created);
        writer.append(date);
        writer.append(' ');
        writer.append(event.id.toString());
        writer.append(' ');
        writer.append(event.message);
        writer.append("\n");
        // Flush to ensure the bytes are written to the buffer
        writer.flush();
    }

    //...
}

This change is a good improvement in the tail latency. Probably because we do get as much contention on the lock.

LockedStream Latency:
#[Mean    =       17.440, StdDeviation   =        3.846]
#[Max     =       62.194, Total count    =        46795]
#[Buckets =           16, SubBuckets     =         2048]
LockedStream Latency
Figure 4. LockedStream Latency

Lock-Free Writes

So far we locked around the writes. We actually can remove the locks entirely. Instead, we get a write position by increasing the write position atomically and then write to that position.

Atomic Position Approach
Figure 5. Atomic Position Approach
AtomicPositionWrites.java
public class AtomicPositionWrites implements EventLog {
    private final FileChannel file;
    // A thread local buffer to assemble the messages. Avoiding allocating a buffer for every invocation.
    private final ThreadLocal<Buffer> buffer = ThreadLocal.withInitial(Buffer::new);
    // Position where the next write is placed.
    private final AtomicLong writePos = new AtomicLong();

    public AtomicPositionWrites(String file) throws IOException {
        this.file = FileChannel.open(Paths.get(file), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    }

    static class Buffer {
        ByteArrayBuffer bytes = new ByteArrayBuffer();
        OutputStreamWriter writer = new OutputStreamWriter(bytes);
    }

    @Override
    public void recordEvent(Event event) {
        var buffer = this.buffer.get();

        try {
            var byteBuf = buffer.bytes;
            addToBuffer(event, buffer.writer);

            var pos = writePos.getAndAdd(byteBuf.size());
            file.write(byteBuf.toByteBuffer(), pos);
            byteBuf.reset();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //...
}

Indeed the worst-case latency got better. However, the average latency is slightly higher.

AtomicPositionWrites Latency
#[Mean    =       18.896, StdDeviation   =        2.133]
#[Max     =       36.209, Total count    =        43633]
#[Buckets =           16, SubBuckets     =         2048]
AtomicPositionWrites Latency without LockedWriter
Figure 6. AtomicPositionWrites Latency

Buffering the Writes

With the switch to the atomic position writes we also dropped be buffering from the BufferedOutputStream. The implementation does a write for every little bit of data. That creates tons of syscalls and might add to the latency. Let’s try to add the buffering back:

Buffered Approach
Figure 7. Buffered Approach
BufferedWrites.java
public class BufferedWrites implements EventLog {
    // How much data is buffered before it's written to disk
    private final static int BlockSize = 8*1024;
    private final FileChannel channel;
    // Keeps a list of buffers created by each thread, to flush pending data on close.
    // Synchronized accesss!
    private final ArrayList<Buffer> buffers = new ArrayList<>();

    // A thread local buffer to assemble the messages. Avoiding allocating a buffer for every invocation.
    private final ThreadLocal<Buffer> buffer = ThreadLocal.withInitial(()->{
        var b = new Buffer();
        synchronized (buffers){
            buffers.add(b);
        }
        return b;
    });
    // Position where the next write is placed.
    private final AtomicLong writePos = new AtomicLong();


    public BufferedWrites(String file) throws IOException {
        this.channel = FileChannel.open(Paths.get(file), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
    }

    static class Buffer{
        ByteArrayBuffer bytes = new ByteArrayBuffer();
        OutputStreamWriter writer = new OutputStreamWriter(bytes);
    }

    @Override
    public void recordEvent(Event event) {
        var bf = this.buffer.get();

        try {
            var byteBuf = bf.bytes;
            addToBuffer(event, bf.writer);

            if(byteBuf.size() > BlockSize){
                write(byteBuf);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        // On close flush the buffers first
        synchronized (buffers){
            for (Buffer buff : buffers) {
                write(buff.bytes);
            }
        }
        this.channel.close();
    }

    private void write(ByteArrayBuffer byteBuf) throws IOException {
        var pos = writePos.getAndAdd(byteBuf.size());
        channel.write(byteBuf.toByteBuffer(), pos);
        byteBuf.reset();
    }
    // ...
}

Buffering reduces the median latency. However, it creates higher latency outlier because when a write happens more data is written.

BufferedWrites Latency
#[Mean    =       11.851, StdDeviation   =        4.852]
#[Max     =       56.820, Total count    =        64255]
#[Buckets =           16, SubBuckets     =         2048]
BufferedWrites Latency
Figure 8. BufferedWrites Latency

Background Writes

We might avoid the latency spikes of the bulk write by doing it in the background. When a buffer is full we queue it. A background thread takes filled buffers of the queue and writes it out. That way the application threads do not wait for the writes.

Background Writes Approach
Figure 9. Background Writes Approach

BufferedWritesOffThread.java

public class BufferedWritesOffThread implements EventLog {
// How much data is buffered before it's written to disk
private final static int BlockSize = 8*1024;
// Background thread writing to disk
private final WriterThread writer;
// Filled buffers ready to be written by the WriterThread
private final ConcurrentLinkedQueue<Buffer> complete = new ConcurrentLinkedQueue<>();
// After writing a buffer it is cleared and can be reused to avoid allocation of tons of buffers
private final ConcurrentLinkedQueue<Buffer> free = new ConcurrentLinkedQueue<>();
// Buffers which have some data in it and are not yet full. The close method will write these buffers out
private final ConcurrentHashMap<Thread, Buffer> dirty = new ConcurrentHashMap<>();


    public BufferedWritesOffThread(String file) throws IOException {
        var channel = FileChannel.open(Paths.get(file), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        this.writer = new WriterThread(channel);
        this.writer.start();
    }

    static class Buffer{
        ByteArrayBuffer bytes = new ByteArrayBuffer();
        OutputStreamWriter writer = new OutputStreamWriter(bytes);
    }

    class WriterThread extends Thread{
        private final FileChannel channel;
        private volatile boolean isRunning = true;

        WriterThread(FileChannel channel) {
            this.channel = channel;
        }

        @Override
        public void run() {
            try{
                while(isRunning){
                    var buffer = complete.poll();
                    if(buffer == null){
                        Thread.sleep(1);
                    } else{
                        var bytes = buffer.bytes;
                        try {
                            channel.write(bytes.toByteBuffer());
                            bytes.reset();
                            free.offer(buffer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } catch (InterruptedException e){
                // Completed
            }
        }

        public void close() throws IOException{
            this.isRunning = false;
            try {
                this.join();
                this.channel.close();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void recordEvent(Event event) {
        var bf = dirty.computeIfAbsent(Thread.currentThread(), (t)->newBuffer());

        try {
            var byteBuf = bf.bytes;
            addToBuffer(event, bf.writer);

            if(byteBuf.size() > BlockSize){
                complete.offer(bf);
                this.dirty.put(Thread.currentThread(), newBuffer());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        for (Buffer buff : dirty.values()) {
            complete.offer(buff);
        }
        this.writer.close();
    }

    private Buffer newBuffer() {
        var buffer = free.poll();
        if(buffer == null){
            buffer = new Buffer();
        }
        return buffer;
    }
    // ...
}

This indeed reduces the latency and brings us back close to the direct writes. The worst-case seems slightly higher, but to verify an actual difference we need a better benchmark.

BufferedWritesOffThread Latency

#[Mean    =       11.322, StdDeviation   =        4.152]
#[Max     =       39.682, Total count    =        66660]
#[Buckets =           16, SubBuckets     =         2048]
BufferedWritesOffThread Latency
Figure 10. BufferedWritesOffThread Latency

Final Results

ImplementationMean msMax msStd. DeviationTotal count

LockedWriter

19.843

241.566

8.568

41896

LockedStream

17.440

62.194

3.846

46795

AtomicPositionWrites

18.896

36.209

2.133

43633

BufferedWrites

11.851

56.820

4.85

64255

BufferedWritesOffThread

11.322

39.682

4.152

66660

Final Result
Figure 11. Final Result
Final Result Without Locked Writer
Figure 12. Final Result Without Locked Writer

Conclusion

With a tiny bit of though and reducing the lock contention, you can increase the tail latency massively.

Of course, you could drive my toy examples further. We could reduce allocations, try using DirectBuffers to avoid copying data, etc. Anyway, at that point, you might also want to check some high-performance libraries, like OpenHFT, Agrona or others if there is a helpful classes for your case.

Tags: Java