Async File IO with Akka and Java 7
Akka provides tons of nice facilities to deal with concurrent and asynchronous operations. However at the edges it often gets rougher when you deal with the non Akka world. For example traditional Java file access is synchronous. That can be annoying when you read large files, especially if those files are on another machine.
Java 7 brings a new nice API for doing asynchronous file operations: The AsynchronousFileChannel API. This allows us to read and write stuff asynchronously easily. I’ve written a small wrapper which integrates a little better with Akka.
Reading a File
First, the operations need an ExecutionContext, like other Akka constructs. The execution context is used to dispatch the results of asynchronous request. So we need to declare an execution context. Actors already have an execution context which you can use for this. After that you can open files with the FileIO.open method. All operation will return there result as a Akka future, so that you can all tools from the Akka world:
The API wraps results in Akka ByteStrings, which are immutable byte arrays. This means you can easily send the raw data around and don’t have to worry about any modifications.
Using Iteratees
Now when you read a file asynchronously data is usually transferred in chunks. That makes it harder to parse the data. One way to do this is with Iteratees. The FileIO instances can use Akka iteratees to process the file input. There two methods for this: The ‘readAll’ method reads the file until the Iteratee is done or the file ends. This is useful when you want to parse the file as a whole in a certain structure. The ‘readSegments’ method reads until the iteratee is done, collects that result and start over parsing the rest of the file. This is handy when you need to parse a file with a repeated structure, for example parse each line.
Parsing everything with an Iteratee:
Parsing segments with an Iteratee:
Reading Text
For text file you also can use the openText method. That one will return a TextIO-instance, which has text reading utilities like reading lines, split by some delimiter etc. The default encoding is UTF8 for these operations:
Writing to Files
In order to write to a file you need to open it with enough permissions. After that you can use the write methods. The write takes immutable ByteStrings and writes those to the file. Additional overloads also accept pure Java arrays and Java ByteBuffer. Those are mutable, so be very careful with those. If you mutate them during a asynchronous operation Dragons will appear:
Closing the Channel Automatically
So far we’ve always closed the file with an explicit call. Now when do you close the file? Because simply doing it in the finally clause doesn’t work, since the operation is running asynchronously in the background. A good option is to close the file when the last operation has finished.
For this there is a ‘withFile’ and a ‘withTextFile’ method. This method accepts a closure which uses the given file and returns a future. When that future is completed the file will be closed. This means you can do complex file reading operations in that closure, return the result as a future and don;t worry about closing the file:
IO Actors
For fault tolerance you try to move ‘dangerous’ operations into separate actors, so that you can supervise those. The IOActor is intended for that. It does the asynchronous operations for you. If any operation goes wrong, it will fail with a IOException. It leaves the decision what to do to its supervisor. Here’s an example of an actor which reads pictures:
By default the IOActor will close the file after 5 seconds of inactivity. You can set that timeout when creating it. Or completely deactivate it and close the file by either stopping the actor or sending a ‘Close’ message.
Chunked Reads with the IO Actor
When you read large files, you don’t want to read everything at once. You either can manually issue multiple read requests. Or you can use the ReadInChunks message. Then the actor will respond with multiple ReadInChunksResponse answers. It will send such a ReadChunk as soon as it has filled his internal buffers. When everything is done a last ReadChunk is sent with an EOF message. This is similar to Akka’s network API. So our picture reading actor would look something like this:
Buffer-Management
Well, async IO operations need to allocate a Java ByteBuffer to transfer the data. Currently this buffer management is as simple as possible. It allocates a buffer for every operation you start. So every time you call a read or write method it will allocate a buffer for that. Of course when you write a very large file with one call it will use the same buffer during that operation.
This is good enough for now =). In general: I haven’t done any decent performance testing yet.
Getting This Stuff
First: This stuff is not yet written in stone and the API may change. Anyway: The code is on GitHub. Also I’ve pushed SNAPSHOTS to a GitHub based Maven repository.
Repository for Maven: https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots
GroupID: info.gamlor.akkaasync
ArtifactID: akka-io_2.9.1
Version: 1.0-SNAPSHOT
For example in SBT:
That’s It
Improvements will certainly follow. Also I’ve a thin wrapper for a the Async Http Client in the works That’s the topic for a follow up post =)
- 언어를 잘 못 하는 것으로 어떻게 블로그-포스트 써요? How to Write Posts Without a Clue of the Language?
- 매시업 만드는 사람 좋아해요 / Mashup Creators Which I Like: Dan Mei, DJ Schmolli, Mashup Germany
Hey,
you’re relying of ordering in the execution of onX-callbacks. This is afallacy (why would we not be able to execute callbacks in parallel?)
To get ordering with a side-effect use “andThen” for each callback you want to order after eachother:
future andThen sideeffect andThenlastSideeffect
Hmm, I need to check than and think about it again. The intention is that I don’t rely on the ordering of the onX callbacks. But I could be wrong.
// do stuff with the future
readResultFuture.onSuccess({ case bytes:ByteString=> println(bytes.utf8String) })
// Close when we’re done reading
readResultFuture.onComplete(_=>file.close())
Definitely seems like it’s relying on the first to be executed prior to the second?
Should be:
readResultFuture.onSuccess({ case bytes:ByteString=> println(bytes.utf8String) }).andThen( _ => file.close() )
No, not really. When the future is completed all bytes are already have been read and copied to a byte string. So when it completes it shouldn’t matter if you first process the bytes string and the close the file, do it in reverse or in parallel.
But yes, I guess it can be confusing and make it harder to debug. I will change the examples tomorrow.
And of course, when multiple requests are started you should close it when all requests have completed.
Actually, the onSuccess should also be an andThen. This is fully described in my book, “Composable Futures with Akka 2.0” http://slinnbooks.com/books/futures/
Mike. No.
%s/iteraree/iteratee/g
Oh =(. But Iteraree sounds cooler, like Ferrari 😉
Excellent writing! Thanks!
Pingback: Async SQL and Akka | Gamlor