Async File IO with Akka and Java 7

Akka provides tons of nice facilities to deal with concurrent and asynchronous operations. However at the edges it often gets rougher when you deal with the non Akka world. For example traditional Java file access is synchronous. That can be annoying when you read large files, especially if those files are on another machine.

Java 7 brings a new nice API for doing asynchronous file operations: The AsynchronousFileChannel API. This allows us to read and write stuff asynchronously easily. I’ve written a small wrapper which integrates a little better with Akka.

Really, don't wait while the guy is reading

Really, don't wait while the guy is reading

Reading a File

First, the operations need an ExecutionContext, like other Akka constructs. The execution context is used to dispatch the results of asynchronous request. So we need to declare an execution context. Actors already have an execution context which you can use for this. After that you can open files with the FileIO.open method. All operation will return there result as a Akka future, so that you can all tools from the Akka world:

import akka.dispatch.{ ExecutionContext, Promise }
import info.gamlor.io.FileIO
 
// A plain execution context
implicit val dispatcher = ExecutionContext.fromExecutorService(yourExecutorServiceGoesHere)

// or within an actor
implicit val dispatcher = context.dispatcher

val file = FileIO.open("myFile.data")
// read 200 bytes from the beginning of the file
val readResultFuture = file.read(0,200)

// do stuff with the future
readResultFuture.onSuccess({
  case bytes:ByteString=>{
println(bytes.utf8String)
  }
}).andThen{ case _ => file.close()} // Close when we're done reading

The API wraps results in Akka ByteStrings, which are immutable byte arrays. This means you can easily send the raw data around and don’t have to worry about any modifications.

Using Iteratees

Now when you read a file asynchronously data is usually transferred in chunks. That makes it harder to parse the data. One way to do this is with Iteratees. The FileIO instances can use Akka iteratees to process the file input. There two methods for this: The ‘readAll’ method reads the file until the Iteratee is done or the file ends. This is useful when you want to parse the file as a whole in a certain structure. The ‘readSegments’ method reads until the iteratee is done, collects that result and start over parsing the rest of the file. This is handy when you need to parse a file with a repeated structure, for example parse each line.
Parsing everything with an Iteratee:

case class Page(header: String, body: String, footer: String)


val parser = for {
headerLine <- IO.takeUntil(ByteString("\n"))
body <- IO.takeUntil(ByteString("[End-Body]"))
footer <- IO.takeUntil(ByteString("[End-Footer]"))
} yield Page(headerLine.utf8String, body.utf8String, footer.utf8String)
// We can use an iteraree as parser.
// The parse result will be in the future.
// There are overload available to read from certain positions.
val readResultFuture = file.readAll(parser)

readResultFuture.onSuccess {
  case Page(header, body, footer) => {
println(header)
println(body)
println(footer)
  }
}

Parsing segments with an Iteratee:
case class LineItem(number: Int, content:String)
   
val parser = for {
numberOfItem <- IO.takeUntil(ByteString(":"))
lineContent <- IO.takeUntil(ByteString("\n"))
} yield LineItem(Integer.parseInt(numberOfItem.utf8String), lineContent.utf8String)

// We can use an iteraree as parser.
// This keep parsing until file ends/max amount is reached.
// Every time the iteraree is done parsing it will add that
// to the result.
// There are overload available to read from certain positions.
val readResultFuture = file.readSegments(parser)

readResultFuture.onSuccess {
  case items:Seq[LineItem] => {
items.foreach({i=>
println(i.number)
println(i.content)
})
  }
}

Reading Text

For text file you also can use the openText method. That one will return a TextIO-instance, which has text reading utilities like reading lines, split by some delimiter etc. The default encoding is UTF8 for these operations:

val textFile = FileIO.openText("lines.txt")

val allLinesFuture = textFile.readAllLines()

allLinesFuture.onSuccess({
  case line:Seq[String]=>{
println(line)
  }
}).andThen{ case _ => file.close()}

Writing to Files

In order to write to a file you need to open it with enough permissions. After that you can use the write methods. The write takes immutable ByteStrings and writes those to the file. Additional overloads also accept pure Java arrays and Java ByteBuffer. Those are mutable, so be very careful with those. If you mutate them during a asynchronous operation Dragons will appear:

val file = FileIO.open(Paths.get("myFile.data"),StandardOpenOption.CREATE,StandardOpenOption.WRITE,StandardOpenOption.READ)

file.write(ByteString("data data"),0).onComplete{
  _=>file.close()
}

Closing the Channel Automatically

So far we’ve always closed the file with an explicit call. Now when do you close the file? Because simply doing it in the finally clause doesn’t work, since the operation is running asynchronously in the background. A good option is to close the file when the last operation has finished.
For this there is a ‘withFile’ and a ‘withTextFile’ method. This method accepts a closure which uses the given file and returns a future. When that future is completed the file will be closed. This means you can do complex file reading operations in that closure, return the result as a future and don;t worry about closing the file:

IO Actors

For fault tolerance you try to move ‘dangerous’ operations into separate actors, so that you can supervise those. The IOActor is intended for that. It does the asynchronous operations for you. If any operation goes wrong, it will fail with a IOException. It leaves the decision what to do to its supervisor. Here’s an example of an actor which reads pictures:

class PictureIO extends Actor {

import info.gamlor.io.IOActors._


override val supervisorStrategy = OneForOneStrategy(5, Duration(60, TimeUnit.SECONDS)) {
case ex: IOException => {
println("Couldn't read file. Giving up on this file "+ex)
Stop
}
case ex: Exception => Escalate
}

protected def receive = {
case path: Path => {
// will be created with the context,
// so this actor supervises this file reading actor
val fileReadingActor = IOActors.createForFile(path)
// The actor will respond with a ReadResponse
// which will contain the read data
fileReadingActor ! Read(0, Int.MaxValue)
}
case ReadResponse(data, _, _) => {
processTheBytesOfThisPicture(data)
}
}

def processTheBytesOfThisPicture(data: ByteString) {
// do something
}
}

By default the IOActor will close the file after 5 seconds of inactivity. You can set that timeout when creating it. Or completely deactivate it and close the file by either stopping the actor or sending a ‘Close’ message.

Chunked Reads with the IO Actor

When you read large files, you don’t want to read everything at once. You either can manually issue multiple read requests. Or you can use the ReadInChunks message. Then the actor will respond with multiple ReadInChunksResponse answers. It will send such a ReadChunk as soon as it has filled his internal buffers. When everything is done a last ReadChunk is sent with an EOF message. This is similar to Akka’s network API. So our picture reading actor would look something like this:

protected def receive = {
  case path: Path => {
// will be created with the context,
// so this actor supervises this file reading actor
val fileReadingActor = IOActors.createForFile(path)
// The actor will respond with a ReadResponse
// which will contain the read data
fileReadingActor ! ReadInChunks(0, Int.MaxValue,path)
  }
  case ReadInChunksResponse(data, _) => {
data match{
case IO.Chunk(bytes) =>processPartOfPicture(bytes)
case IO.EOF => finishPicture()
}

  }
}

def processPartOfPicture(string: ByteString){
  // do stuff
}

def finishPicture(){
  // done
}

Buffer-Management

Well, async IO operations need to allocate a Java ByteBuffer to transfer the data. Currently this buffer management is as simple as possible. It allocates a buffer for every operation you start. So every time you call a read or write method it will allocate a buffer for that. Of course when you write a very large file with one call it will use the same buffer during that operation.
This is good enough for now =). In general: I haven’t done any decent performance testing yet.

Getting This Stuff

First: This stuff is not yet written in stone and the API may change. Anyway: The code is on GitHub. Also I’ve pushed SNAPSHOTS to a GitHub based Maven repository.

Repository for Maven: https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots
GroupID: info.gamlor.akkaasync
ArtifactID: akka-io_2.9.1
Version: 1.0-SNAPSHOT

For example in SBT:

resolvers += "Gamlor-Repo" at "https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots"

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0"
libraryDependencies += "info.gamlor.akkaasync" %% "akka-io" % "1.0-SNAPSHOT"

That’s It

Improvements will certainly follow. Also I’ve a thin wrapper for a the Async Http Client in the works That’s the topic for a follow up post =)

, ,

9 Comments

언어를 잘 못 하는 것으로 어떻게 블로그-포스트 써요? How to Write Posts Without a Clue of the Language?

내가 한국어 아직 많이 모르지만 아직 한국어로 블로그-포스트 쓰고 있어요. 전 아직 초급이에요. 그래서 어떻게 블로그-포스트 써요? 여기 모른 언어로 블로그-포스트 쓰는 조언이 있어요.

  • 당신은 잘못을 하는 것 무섭지 마세요! 언어를 배우기 안 쉬워요. 그래서 시작에서 잘못이 많아요. 근데 더 의국 언어를 하면 더 빨리 배워요.
  • 구글 번역은 조심히 용하세요. 구글 번역은 좋은 사전이에요. 단어를 번역하는 결과 몇개 주조 있어요. 항상 결과를 클릭해야 돼서 더 결과 보여요.  그래서 아직 결정해야 돼요.
  • 근데, 구글 번역으로 다 문장이 번역하지마!  문장을 번역 못 해요.
  • 구글 이미지 용하세요. 내가 단어를 찾았면 구글 이미지를 용해서 사진 보여봐요. 괜찮은 사진이 이면 괜찮은 단어 있어요.
  • 문장 예는 도움이에요.  좋은 예 진짜 도울 수 있어요.
  • 단어나 문장이 확실하지면  구글 검색해서 더 예 찾아요.
  • 포기 하지 마세요! =)
가피 엔 페이스트 또 도워요.

가피 엔 페이스트 또 도워요.

이것 다예요. 더 얘기하기 없어요 =).

 

I still don’t know much about Korean and yet I write Blog posts in it. I’m still just a beginner. So how do I write these blog posts? Here are my few advices how to write a blog post in a now well known language.

  • You should not be afraid of mistakes! Learning languages isn’t easy. So at the beginning there will be a lot of mistakes. But when you use the language more you will learn faster =).
  • Use Google Translate with care. Google Translate is a quite a good dictionary. Often there are multiple translations for a word. Always click on the result to see more translations. In the end you still have to decide which translation to pick.
  • BUT: Do not translate sentences with Google Translate. Google Translate cannot really translate sentences.
  • Use Google Image Search (or others). When I’ve found a word I often search for it. When the pictures are alright I know that I’ve the right word.
  • Example sentences are helpful. Good example sentences can be a real help.
  • When I’m not sure about a word or sentence I google for it to find examples.
  • Don’t give up.
Copy and Past also helps

Copy and Past also helps

That’s it. Nothing else to tell.

 

 

4 Comments

Ijon Tichy 시리즈 2기 / Ijon Tichy: Raumpilot 2nd Series

(More clear English Version below)
전 다시 2기 시리즈 얘기해서 죄송해요 (여기 낡은 글이에요). 새로운 스리즈를 애기할 거예요. 약속해요. Ijon Tichy는 독일 공상 과학 코미디예요. 덕일 SF-시리즈 안 많아서 이 시리즈 희귀한 장르예요.

소파 싸우기 힘들어요

소파 싸우기 힘들어요

Ijon Tichy의 스타일 보이는 게 매우 희귀하는데 모든 외계인 가구로 만들었어요. 어느 외계인 램프이고, 어느 외계인 소파에요, 그래서 이 스타일 너마 멋져요. 이 스타일은 80년대에 비싼 SF-시리즈처럼 보여요. Ijon Tichy를 만드는 사람은 너마 상상이 진짜 많아요. 저는 다른 시리즈가 그 스타일처럼 몰라요.

Ijon Tichy의 미친 이야기는 재미있어요. Ijon Tichy씨가 시간 여행하고, 이상한 것들을 만들고, 새로운 행성 가고, 새로운 외계인 만나요.

Ijon Tichy에는 다 사람들이 독일어 잘 못 말해요. 스타 워즈의 요다처럼 말해요. 근데 Halluzinelle씨만 독일어 잘 말하는데. 저는 독일어 잘 못 말하는 게 별로 안 좋아해요.

2기 시리즈로 새로운 이야기 있고, 새로운 Ijon Tichy의 친구 있고, 나쁜 사람 있어요. 그리고 스타일이랑 유머 변화하지 않았어요.

Ijon Tichy는 영어-자막 없는 갓 같아요 =(. 근데 유투브에 시리즈 2기 제 2화 영어-자막으로 있어요.

제 평가: starstarstarhalf-starempty-star (3.5/5)

여기 트레일러거 독일어로 있어요:


Ok, again a small update to an already reviewed series. Sorry for that. I promise to write about new series. Anyway, Ijon Tichy is a German Sci-Fi comedy series, which is a very rare thing.

Fighting Sofas is tiring

Fighting Sofas is tiring

Ijon Tichy lives from its unique visual style. Everything is made of everyday items, mostly furniture. For example aliens are lamps, sofas or something like that. This style is just awesome. It looks on purpose like a cheap 80s Sci-Fi series. The guys who make this series really have a lot fantasy. I personally don’t know any other series with this kind of style.

The stories are crazy and fun. Ijon Tichy, the hero, does all kinds of crazy shit like time travel, built strange things, go to new planets and meet new aliens.

Ijon Tichy himself and all people/aliens in this series talk in bad German. It sounds a bit like Yoda. Except the ‘Halluzinelle’ hologram, which speaks perfectly normal. I don’t really like this bad German talking, but can tolerate it.

The 2nd series has a new story, some new friends for Tichy and new bad guys. The style and the humour are a kept like the in first series.

Unfortunately there are no English subtitles around for Ijon Tichy. At least on Youtube there are two episodes of the old series with English subtitles available.

Star-O-Meter: starstarstarhalf-starempty-star (3.5/5) (Yep, I increased it for the second series)

Finally, the German trailer:

, , , ,

No Comments