Async File IO with Akka and Java 7
Posted by gamlerhart in java, software-development on March 23, 2012
Akka provides tons of nice facilities to deal with concurrent and asynchronous operations. However at the edges it often gets rougher when you deal with the non Akka world. For example traditional Java file access is synchronous. That can be annoying when you read large files, especially if those files are on another machine.
Java 7 brings a new nice API for doing asynchronous file operations: The AsynchronousFileChannel API. This allows us to read and write stuff asynchronously easily. I’ve written a small wrapper which integrates a little better with Akka.
Reading a File
First, the operations need an ExecutionContext, like other Akka constructs. The execution context is used to dispatch the results of asynchronous request. So we need to declare an execution context. Actors already have an execution context which you can use for this. After that you can open files with the FileIO.open method. All operation will return there result as a Akka future, so that you can all tools from the Akka world:
import akka.dispatch.{ ExecutionContext, Promise }import info.gamlor.io.FileIO // A plain execution contextimplicit val dispatcher = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)
// or within an actorimplicit val dispatcher = context.dispatcher
val file = FileIO.open("myFile.data")// read 200 bytes from the beginning of the fileval readResultFuture = file.read(0,200)
// do stuff with the futurereadResultFuture.onSuccess({ case bytes:ByteString=>{ println(bytes.utf8String) }}).andThen{ case _ => file.close()} // Close when we're done reading
The API wraps results in Akka ByteStrings, which are immutable byte arrays. This means you can easily send the raw data around and don’t have to worry about any modifications.
Using Iteratees
Now when you read a file asynchronously data is usually transferred in chunks. That makes it harder to parse the data. One way to do this is with Iteratees. The FileIO instances can use Akka iteratees to process the file input. There two methods for this: The ‘readAll’ method reads the file until the Iteratee is done or the file ends. This is useful when you want to parse the file as a whole in a certain structure. The ‘readSegments’ method reads until the iteratee is done, collects that result and start over parsing the rest of the file. This is handy when you need to parse a file with a repeated structure, for example parse each line.
Parsing everything with an Iteratee:
case class Page(header: String, body: String, footer: String)
val parser = for { headerLine <- IO.takeUntil(ByteString("\n")) body <- IO.takeUntil(ByteString("[End-Body]")) footer <- IO.takeUntil(ByteString("[End-Footer]"))} yield Page(headerLine.utf8String, body.utf8String, footer.utf8String)// We can use an iteraree as parser.// The parse result will be in the future.// There are overload available to read from certain positions.val readResultFuture = file.readAll(parser)
readResultFuture.onSuccess { case Page(header, body, footer) => { println(header) println(body) println(footer) }}Parsing segments with an Iteratee:
case class LineItem(number: Int, content:String) val parser = for { numberOfItem <- IO.takeUntil(ByteString(":")) lineContent <- IO.takeUntil(ByteString("\n"))} yield LineItem(Integer.parseInt(numberOfItem.utf8String), lineContent.utf8String)
// We can use an iteraree as parser.// This keep parsing until file ends/max amount is reached.// Every time the iteraree is done parsing it will add that// to the result.// There are overload available to read from certain positions.val readResultFuture = file.readSegments(parser)
readResultFuture.onSuccess { case items:Seq[LineItem] => { items.foreach({i=> println(i.number) println(i.content) }) }}Reading Text
For text file you also can use the openText method. That one will return a TextIO-instance, which has text reading utilities like reading lines, split by some delimiter etc. The default encoding is UTF8 for these operations:
val textFile = FileIO.openText("lines.txt")
val allLinesFuture = textFile.readAllLines()
allLinesFuture.onSuccess({ case line:Seq[String]=>{ println(line) }}).andThen{ case _ => file.close()}Writing to Files
In order to write to a file you need to open it with enough permissions. After that you can use the write methods. The write takes immutable ByteStrings and writes those to the file. Additional overloads also accept pure Java arrays and Java ByteBuffer. Those are mutable, so be very careful with those. If you mutate them during a asynchronous operation Dragons will appear:
val file = FileIO.open(Paths.get("myFile.data"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ)
file.write(ByteString("data data"),0).onComplete{ _=>file.close()}Closing the Channel Automatically
So far we’ve always closed the file with an explicit call. Now when do you close the file? Because simply doing it in the finally clause doesn’t work, since the operation is running asynchronously in the background. A good option is to close the file when the last operation has finished.
For this there is a ‘withFile’ and a ‘withTextFile’ method. This method accepts a closure which uses the given file and returns a future. When that future is completed the file will be closed. This means you can do complex file reading operations in that closure, return the result as a future and don;t worry about closing the file:
IO Actors
For fault tolerance you try to move ‘dangerous’ operations into separate actors, so that you can supervise those. The IOActor is intended for that. It does the asynchronous operations for you. If any operation goes wrong, it will fail with a IOException. It leaves the decision what to do to its supervisor. Here’s an example of an actor which reads pictures:
class PictureIO extends Actor {
import info.gamlor.io.IOActors._
override val supervisorStrategy = OneForOneStrategy(5, Duration(60, TimeUnit.SECONDS)) { case ex: IOException => { println("Couldn't read file. Giving up on this file "+ex) Stop } case ex: Exception => Escalate }
protected def receive = { case path: Path => { // will be created with the context, // so this actor supervises this file reading actor val fileReadingActor = IOActors.createForFile(path) // The actor will respond with a ReadResponse // which will contain the read data fileReadingActor ! Read(0, Int.MaxValue) } case ReadResponse(data, _, _) => { processTheBytesOfThisPicture(data) } }
def processTheBytesOfThisPicture(data: ByteString) { // do something }}
By default the IOActor will close the file after 5 seconds of inactivity. You can set that timeout when creating it. Or completely deactivate it and close the file by either stopping the actor or sending a ‘Close’ message.
Chunked Reads with the IO Actor
When you read large files, you don’t want to read everything at once. You either can manually issue multiple read requests. Or you can use the ReadInChunks message. Then the actor will respond with multiple ReadInChunksResponse answers. It will send such a ReadChunk as soon as it has filled his internal buffers. When everything is done a last ReadChunk is sent with an EOF message. This is similar to Akka’s network API. So our picture reading actor would look something like this:
protected def receive = { case path: Path => { // will be created with the context, // so this actor supervises this file reading actor val fileReadingActor = IOActors.createForFile(path) // The actor will respond with a ReadResponse // which will contain the read data fileReadingActor ! ReadInChunks(0, Int.MaxValue,path) } case ReadInChunksResponse(data, _) => { data match{ case IO.Chunk(bytes) =>processPartOfPicture(bytes) case IO.EOF => finishPicture() }
}}
def processPartOfPicture(string: ByteString){ // do stuff}
def finishPicture(){ // done}Buffer-Management
Well, async IO operations need to allocate a Java ByteBuffer to transfer the data. Currently this buffer management is as simple as possible. It allocates a buffer for every operation you start. So every time you call a read or write method it will allocate a buffer for that. Of course when you write a very large file with one call it will use the same buffer during that operation.
This is good enough for now =). In general: I haven’t done any decent performance testing yet.
Getting This Stuff
First: This stuff is not yet written in stone and the API may change. Anyway: The code is on GitHub. Also I’ve pushed SNAPSHOTS to a GitHub based Maven repository.
Repository for Maven: https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots
GroupID: info.gamlor.akkaasync
ArtifactID: akka-io_2.9.1
Version: 1.0-SNAPSHOT
For example in SBT:
resolvers += "Gamlor-Repo" at "https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"libraryDependencies += "info.gamlor.akkaasync" %% "akka-io" % "1.0-SNAPSHOT"That’s It
Improvements will certainly follow. Also I’ve a thin wrapper for a the Async Http Client in the works That’s the topic for a follow up post =)
언어를 잘 못 하는 것으로 어떻게 블로그-포스트 써요? How to Write Posts Without a Clue of the Language?
Posted by gamlerhart in 42, 한국어 배우기 on March 17, 2012
내가 한국어 아직 많이 모르지만 아직 한국어로 블로그-포스트 쓰고 있어요. 전 아직 초급이에요. 그래서 어떻게 블로그-포스트 써요? 여기 모른 언어로 블로그-포스트 쓰는 조언이 있어요.
- 당신은 잘못을 하는 것 무섭지 마세요! 언어를 배우기 안 쉬워요. 그래서 시작에서 잘못이 많아요. 근데 더 의국 언어를 하면 더 빨리 배워요.
- 구글 번역은 조심히 용하세요. 구글 번역은 좋은 사전이에요. 단어를 번역하는 결과 몇개 주조 있어요. 항상 결과를 클릭해야 돼서 더 결과 보여요. 그래서 아직 결정해야 돼요.
- 근데, 구글 번역으로 다 문장이 번역하지마! 문장을 번역 못 해요.
- 구글 이미지 용하세요. 내가 단어를 찾았면 구글 이미지를 용해서 사진 보여봐요. 괜찮은 사진이 이면 괜찮은 단어 있어요.
- 문장 예는 도움이에요. 좋은 예 진짜 도울 수 있어요.
- 단어나 문장이 확실하지면 구글 검색해서 더 예 찾아요.
- 포기 하지 마세요! =)
이것 다예요. 더 얘기하기 없어요 =).
I still don’t know much about Korean and yet I write Blog posts in it. I’m still just a beginner. So how do I write these blog posts? Here are my few advices how to write a blog post in a now well known language.
- You should not be afraid of mistakes! Learning languages isn’t easy. So at the beginning there will be a lot of mistakes. But when you use the language more you will learn faster =).
- Use Google Translate with care. Google Translate is a quite a good dictionary. Often there are multiple translations for a word. Always click on the result to see more translations. In the end you still have to decide which translation to pick.
- BUT: Do not translate sentences with Google Translate. Google Translate cannot really translate sentences.
- Use Google Image Search (or others). When I’ve found a word I often search for it. When the pictures are alright I know that I’ve the right word.
- Example sentences are helpful. Good example sentences can be a real help.
- When I’m not sure about a word or sentence I google for it to find examples.
- Don’t give up.
That’s it. Nothing else to tell.






(3.5/5)