Async File IO with Akka and Java 7

Akka provides tons of nice facilities to deal with concurrent and asynchronous operations. However at the edges it often gets rougher when you deal with the non Akka world. For example traditional Java file access is synchronous. That can be annoying when you read large files, especially if those files are on another machine.

Java 7 brings a new nice API for doing asynchronous file operations: The AsynchronousFileChannel API. This allows us to read and write stuff asynchronously easily. I’ve written a small wrapper which integrates a little better with Akka.

Really, don't wait while the guy is reading

Really, don't wait while the guy is reading

Reading a File

First, the operations need an ExecutionContext, like other Akka constructs. The execution context is used to dispatch the results of asynchronous request. So we need to declare an execution context. Actors already have an execution context which you can use for this. After that you can open files with the FileIO.open method. All operation will return there result as a Akka future, so that you can all tools from the Akka world:

The API wraps results in Akka ByteStrings, which are immutable byte arrays. This means you can easily send the raw data around and don’t have to worry about any modifications.

Using Iteratees

Now when you read a file asynchronously data is usually transferred in chunks. That makes it harder to parse the data. One way to do this is with Iteratees. The FileIO instances can use Akka iteratees to process the file input. There two methods for this: The ‘readAll’ method reads the file until the Iteratee is done or the file ends. This is useful when you want to parse the file as a whole in a certain structure. The ‘readSegments’ method reads until the iteratee is done, collects that result and start over parsing the rest of the file. This is handy when you need to parse a file with a repeated structure, for example parse each line.
Parsing everything with an Iteratee:

case class Page(header: String, body: String, footer: String)


val parser = for {
	  headerLine <- IO.takeUntil(ByteString("\n"))
	  body <- IO.takeUntil(ByteString("[End-Body]"))
	  footer <- IO.takeUntil(ByteString("[End-Footer]"))
} yield Page(headerLine.utf8String, body.utf8String, footer.utf8String)
// We can use an iteraree as parser.
// The parse result will be in the future.
// There are overload available to read from certain positions.
val readResultFuture = file.readAll(parser)

readResultFuture.onSuccess {
  case Page(header, body, footer) => {
	println(header)
	println(body)
	println(footer)
  }
}

Parsing segments with an Iteratee:

Reading Text

For text file you also can use the openText method. That one will return a TextIO-instance, which has text reading utilities like reading lines, split by some delimiter etc. The default encoding is UTF8 for these operations:

val textFile = FileIO.openText("lines.txt")

val allLinesFuture = textFile.readAllLines()

allLinesFuture.onSuccess({
  case line:Seq[String]=>{
	println(line)
  }
}).andThen{ case _ => file.close()}

Writing to Files

In order to write to a file you need to open it with enough permissions. After that you can use the write methods. The write takes immutable ByteStrings and writes those to the file. Additional overloads also accept pure Java arrays and Java ByteBuffer. Those are mutable, so be very careful with those. If you mutate them during a asynchronous operation Dragons will appear:

val file = FileIO.open(Paths.get("myFile.data"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ)

file.write(ByteString("data data"),0).onComplete{
  _=>file.close()
}

Closing the Channel Automatically

So far we’ve always closed the file with an explicit call. Now when do you close the file? Because simply doing it in the finally clause doesn’t work, since the operation is running asynchronously in the background. A good option is to close the file when the last operation has finished.
For this there is a ‘withFile’ and a ‘withTextFile’ method. This method accepts a closure which uses the given file and returns a future. When that future is completed the file will be closed. This means you can do complex file reading operations in that closure, return the result as a future and don;t worry about closing the file:

case class LineItem(number: Int, content: String)

val parser = for {
  numberOfItem <- IO.takeUntil(ByteString(":"))
  lineContent <- IO.takeUntil(ByteString("\n"))
} yield LineItem(Integer.parseInt(numberOfItem.utf8String), lineContent.utf8String)

// will close the file when returned future has finished
val onlyCoolLines = FileIO.withFile(Paths.get("aFile.txt")) {
  file =>
	val linesFuture = file.readSegments(parser)
	val coolLinesFuture = linesFuture.map(
	  lines => lines.filter(
		line => line.content.contains("cool")))
	coolLinesFuture
}

onlyCoolLines.onSuccess {
  case LineItem(no, line) => println(line)
}

IO Actors

For fault tolerance you try to move ‘dangerous’ operations into separate actors, so that you can supervise those. The IOActor is intended for that. It does the asynchronous operations for you. If any operation goes wrong, it will fail with a IOException. It leaves the decision what to do to its supervisor. Here’s an example of an actor which reads pictures:

class PictureIO extends Actor {

	import info.gamlor.io.IOActors._


	override val supervisorStrategy = OneForOneStrategy(5, Duration(60, TimeUnit.SECONDS)) {
	  case ex: IOException => {
		println("Couldn't read file. Giving up on this file "+ex)
		Stop
	  }
	  case ex: Exception => Escalate
	}

	protected def receive = {
	  case path: Path => {
		// will be created with the context,
		// so this actor supervises this file reading actor
		val fileReadingActor = IOActors.createForFile(path)
		// The actor will respond with a ReadResponse
		// which will contain the read data
		fileReadingActor ! Read(0, Int.MaxValue)
	  }
	  case ReadResponse(data, _, _) => {
		processTheBytesOfThisPicture(data)
	  }
	}

	def processTheBytesOfThisPicture(data: ByteString) {
	  // do something
	}
}

By default the IOActor will close the file after 5 seconds of inactivity. You can set that timeout when creating it. Or completely deactivate it and close the file by either stopping the actor or sending a ‘Close’ message.

Chunked Reads with the IO Actor

When you read large files, you don’t want to read everything at once. You either can manually issue multiple read requests. Or you can use the ReadInChunks message. Then the actor will respond with multiple ReadInChunksResponse answers. It will send such a ReadChunk as soon as it has filled his internal buffers. When everything is done a last ReadChunk is sent with an EOF message. This is similar to Akka’s network API. So our picture reading actor would look something like this:

protected def receive = {
  case path: Path => {
	// will be created with the context,
	// so this actor supervises this file reading actor
	val fileReadingActor = IOActors.createForFile(path)
	// The actor will respond with a ReadResponse
	// which will contain the read data
	fileReadingActor ! ReadInChunks(0, Int.MaxValue,path)
  }
  case ReadInChunksResponse(data, _) => {
	data match{
	  case IO.Chunk(bytes) =>processPartOfPicture(bytes)
	  case IO.EOF => finishPicture()  
	}
	
  }
}

def processPartOfPicture(string: ByteString){
  // do stuff
}

def finishPicture(){
  // done
}

Buffer-Management

Well, async IO operations need to allocate a Java ByteBuffer to transfer the data. Currently this buffer management is as simple as possible. It allocates a buffer for every operation you start. So every time you call a read or write method it will allocate a buffer for that. Of course when you write a very large file with one call it will use the same buffer during that operation.
This is good enough for now =). In general: I haven’t done any decent performance testing yet.

Getting This Stuff

First: This stuff is not yet written in stone and the API may change. Anyway: The code is on GitHub. Also I’ve pushed SNAPSHOTS to a GitHub based Maven repository.

Repository for Maven: https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots
GroupID: info.gamlor.akkaasync
ArtifactID: akka-io_2.9.1
Version: 1.0-SNAPSHOT

For example in SBT:

resolvers += "Gamlor-Repo" at "https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots"

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"
libraryDependencies += "info.gamlor.akkaasync"  %% "akka-io" % "1.0-SNAPSHOT"

That’s It

Improvements will certainly follow. Also I’ve a thin wrapper for a the Async Http Client in the works That’s the topic for a follow up post =)

Tagged on: , ,

10 thoughts on “Async File IO with Akka and Java 7

  1. Hey,
    you’re relying of ordering in the execution of onX-callbacks. This is afallacy (why would we not be able to execute callbacks in parallel?)

    To get ordering with a side-effect use “andThen” for each callback you want to order after eachother:

    future andThen sideeffect andThenlastSideeffect

  2. gamlerhart Post author

    Hmm, I need to check than and think about it again. The intention is that I don’t rely on the ordering of the onX callbacks. But I could be wrong.

  3. // do stuff with the future
    readResultFuture.onSuccess({ case bytes:ByteString=> println(bytes.utf8String) })

    // Close when we’re done reading
    readResultFuture.onComplete(_=>file.close())

    Definitely seems like it’s relying on the first to be executed prior to the second?

    Should be:

    readResultFuture.onSuccess({ case bytes:ByteString=> println(bytes.utf8String) }).andThen( _ => file.close() )

  4. gamlerhart Post author

    No, not really. When the future is completed all bytes are already have been read and copied to a byte string. So when it completes it shouldn’t matter if you first process the bytes string and the close the file, do it in reverse or in parallel.

    But yes, I guess it can be confusing and make it harder to debug. I will change the examples tomorrow.

    And of course, when multiple requests are started you should close it when all requests have completed.

  5. Pingback: Async SQL and Akka | Gamlor