1 Introduction

1.1 What are Iteratees?

An iteratee is a stream processor, it takes a number of elements from an input stream, performs a calculation with them, and returns the value of the calculation. After completion, the elements used by the iteratee should be removed from the stream John W Lato (one of the author of the Haskell iteratee package)

Iteratees are used in Play2 to perform reactive programming to deal with IO where threads don’t get blocked while waiting for data. The source of the data control the execution. The consumer of the data can pass control signal like continue, done or error.

1.2 Examples

import play.api.libs.iteratee._
val fruits = Enumerator("apple", "banana", "orange")
val vegetables = Enumerator("salad", "brocoli")

val iter = Iteratee.fold[String, String]("")((acc, e) => acc + "." + e)

val res1 = fruits >>> vegetables |>> iter
Iteratee.flatten(res1).run.value
// Redeemed(.apple.banana.orange.salad.brocoli)

val res2 = (fruits >>> vegetables &> Enumeratee.take(4)) |>> iter
Iteratee.flatten(res2).run.value
// Redeemed(.apple.banana.orange.salad)

val res3 = fruits >>> vegetables |>> (Enumeratee.take(4)) &>> iter)
Iteratee.flatten(res3).run.value
// Redeemed(.apple.banana.orange.salad)

val res4 = (fruits &> Enumeratee.map(_.length) 
  |>> Iteratee.fold("")((a, i) => a + " " + i))
Iteratee.flatten(res4).run.value
// Redeemed( 5 6 6)

Some definitions:

Enumerator
enumerates the data until the computation yields a result. Enumerators can be combined enum1 andThen enum2 to feed enum1 and then enum2. >>> is an alias for andThen. Think of it as a source.
Iteratee
takes data input sequentially and return a continuation to take more input or a final result. Iteratee can be composed using for expression or map or flatMap. Think of it as a sink.
Enumeratee
they can play the role of an Enumerator or an Iteratee depending on what we compose them with. Think of it as a pipe.

Computation is being defined using the various composition/combination properties of enumerators, enumeratees and iteratees. Finally the computation is performed using the Iteratee.run method. Because the computation is performed in the context of run, there a single point where to control release of resources.

2 From foldLeft to Iteratees

Iteratees are foldLeft on steroids.

2.1 Anatomy of a foldLeft

def foldLeft[A, B](z: B)(f: (B, A) => B): B 

For instance:

val ints = Vector(1, 2, 3)
ints.foldLeft(0)((acc, i) => acc + i)
//               (acc, i) =>
//               (0  , 1) => 0   + 1
//               (1  , 2) => 1   + 2
//               (3  , 3) => 3   + 3

Which computes the following sum 6

With fold left, we can:

2.2 foldLeft implementation

One possible fold left implementation

def foldLeft[A, B](seq: Seq[A], z: B)(f: (B, A) => B): B = {
  def loop(v: Seq[A], acc: B): B = v match {
    case Seq() => acc
    case Seq(x, xs@_*) => loop(xs, f(acc, x))
  }
  loop(seq, z)
}

FoldLeft.foldLeft(Vector(1, 2, 3), "*")((acc, i) => acc + "->" + i)
// returns *.1.2.3

Noteworthy elements:

Downside: we have to process the whole list - which is not always desirable. For instance what if we need to concatenate only the first 10 elements…

2.3 Terminating Early with Cont or Done

We wrap the accumulator into a Step trait to indicate whether the fold has to continue or stop.

sealed trait Step[+B]
case class Cont[+B](value: B) extends Step[B]
case class Done[+B](value: B) extends Step[B]

def foldLeft[A, B](list: List[A], z: B)(f: (B, A) => Step[B]): B

Benefits:

Implementation:

def foldLeft[A, B](seq: Seq[A], z: B)(f: (B, A) => Step[B]): B = {
  def loop(v: Seq[A], acc: B): B = v match {
    case Seq() => acc
    case Seq(x, xs@_*) => f(acc, x) match {
      case Done(b) => b
      case Cont(b) => loop(xs, b)
    }
  }
  loop(seq, z)
}

Concatenate the first 10 elements:

case class State(acc: String, count: Int)
val data = Vector(1 to 20: _*)    
foldLeft(data, State("*", 10)){ case (State(acc, count), i) =>
  if (count == 0) Done(State(acc, 0))
  else Cont(State(acc + "." + i, count - 1))
}
// returns State(*.1.2.3.4.5.6.7.8.9.10,0)

We can now terminate as soon as we have the result, but it’s cumbersome to have to define a state and pass it around and it to enable composition it is better if we can refactor to internalize the state into the f function…

2.4 Internal State

Instead of passing the state back to foldLeft, we can keep the state hidden in the Step object. Now the function f takes an input a: A and:

sealed trait Step[A, +B]
case class Cont[A, +B](next: A => Step[A, B]) extends Step[A, B]
case class Done[A, +B](value: B) extends Step[A, B]

def foldLeft[A, B](seq: Seq[A])(f: A => Step[A, B]): B = {
  def loop(v: Seq[A], k: A => Step[A, B]): B = v match {
    case Seq() => sys.error("not enough input to get a result")
    case Seq(x, xs@_*) => k(x) match {
      case Done(b) => b
      case Cont(k) => loop(xs, k)
    }
  }
  loop(seq, f)
}

Notes:

The implementation of f is tricky. It has to carry the state when input is fed to it.

val f: (Int) => Step[Int, String] = {
  case class State(acc: String, count: Int)
  def step(s: State, i: Int): Step[Int, String] = {
    if (s.count == 0) Done(s.acc)
    else Cont(step(State(s.acc + "." + i, s.count -1), _))
  }
  step(State("*", 10), _)  
}

val data = Vector(1 to 20: _*)
foldLeft(data)(f)
// returns *.1.2.3.4.5.6.7.8.9.10

Note that if we don’t have enough elements we will get sys.error. Wouldn’t it be nice if we could get instead the Cont object so that we can feed more input to it?

2.5 More than One Input

The input values are wrapped inside an Input class.

sealed trait Input[+A]
case class Element[+A](value: A) extends Input[A]
case object EOF extends Input[Nothing]

sealed trait Step[A, +B]
case class Cont[A, +B](next: Input[A] => Step[A, B]) extends Step[A, B]
case class Done[A, +B](value: B) extends Step[A, B]
case class Error[A](msg: String) extends Step[A, Nothing]

The implementation is nearly identical to the previous step, but to allow processing more than one input, we don’t return B instead we return the function in its Cont state if it its not done or in its Done state:

def foldLeft[A, B](seq: Seq[A])(f: Input[A] => Step[A, B]): Step[A, B] = {
  def loop(v: Seq[A], k: Input[A] => Step[A, B]): Step[A, B] = v match {
    case Seq() => Cont(k)
    case Seq(x, xs@_*) => k(Element(x)) match {
      case Done(b) => Done(b)
      case Cont(k) => loop(xs, k)
    }
  }
  loop(seq, f)
}
val f: (Input[Int]) => Step[Int, String] = {
  case class State(acc: String, count: Int)
  def step(s: State, e: Input[Int]): Step[Int, String] = e match {
    case EOF => Done(s.acc)
    case Element(i) =>
      if (s.count == 0) Done(s.acc)
      else Cont(step(State(s.acc + "." + i, s.count -1), _))
  }
  step(State("*", 10), _)  
}

val data1 = Vector(1, 2, 3, 4, 5)
val data2 = Vector(6, 7, 8, 9, 10, 11)
val tmp = foldLeft(data1)(f)
val res = foldLeft(data2)(tmp match { case Cont(k) => k })
(tmp, res)
// returns (Cont(<function1>),Done(*.1.2.3.4.5.6.7.8.9.10))

Returning tmp as Cont(k) allows to extract k and then feed data2 to it.

Iteratees are really the Step[A, +B] type. foldLeft is modified slightly to take an Iteratee as its parameter and implements the Enumerator trait.

3 Iteratees in Play20

3.1 Implementation

Input
framework/src/play/src/main/scala/play/api/libs/iteratee/Iteratee.scala#L159
Iteratee
framework/src/play/src/main/scala/play/api/libs/iteratee/Iteratee.scala#L179
Enumerator
framework/src/play/src/main/scala/play/api/libs/iteratee/Iteratee.scala#L363
Enumeratee
framework/src/play/src/main/scala/play/api/libs/iteratee/Iteratee.scala#L417

3.2 Concatenate 10 elements

import play.api.libs.iteratee._
val join10 = {
  def step(e: Input[Int], count: Int, acc: String): Iteratee[Int, String] = {
    if (count == 0) Done(acc, e)
    else e match {
      case Input.EOF => Done(acc, Input.EOF)
      case Input.Empty => Cont(step(_, count, acc))
      case Input.El(i) => Cont(step(_, count - 1, acc + "." + i))
    }
  }
  Cont(step(_: Input[Int], 10, "*"))
}
val data = Enumerator(1 to 20: _*)
Iteratee.flatten(data |>> join10).run.value
// Redeemed(*.1.2.3.4.5.6.7.8.9.10)

Or

import play.api.libs.iteratee._
val join = Iteratee.fold[Int, String]("*")((acc, s) => acc + "." + s)
val data = Enumerator(1 to 20: _*)
Iteratee.flatten(Enumerator.eof |>> Iteratee.flatten(data &> Enumeratee.take(10) |>> join)).run.value
// Redeemed(*.1.2.3.4.5.6.7.8.9.10)

3.3 Operator Cheat Sheet

Enumerator:

enumerator  through   enumeratee // returns an enumerator
enumerator    &>      enumeratee // same as through
enumerator  andThen   enumerator // returns an enumerator
enumerator    >>>     enumerator // same as andThen    
enumerator   apply    iteratee   // returns Promise[Iteratee[E, A]]
enumerator    |>>     iteratee   // same as apply
enumerator interleave enumerator     
enumerator    >-      enumerator // same as interleave     
enumerator    map     f: (E)=>U  // returns Enumerator[U]
enumerator  mapInput  f          // returns Enumerator[U]

Enumeratee:

enumeratee  apply    iteratee[To, A]  // returns Iteratee[From, Iteratee[To, A]]
enumeratee  applyOn  iteratee     // same as apply
enumeratee    &>     iteratee     // same as apply and applyOn
enumeratee transform iteratee     // returns Iteratee[From, A]
enumeratee    &>>    iteratee     // alias for tranform
enumeratee  compose  enumeratee   // returns Enumeratee[From, To2]
enumeratee    ><>    enumeratee   // alias for compose

3.4 Writing an Enumeratee and an Iteratee

enumTap is an enumeratee that should log the input provided by the enumerator. iterTap will allow logging the state changes of the iteratee given an input. See code on github.

val data = Enumerator(1, 2, 3)
Iteratee.flatten(
  data &> enumTap("test") 
  |>> iterTap("fold") ?<< Iteratee.fold(0)((sum, (e)) => sum + e)
).run.value
\\ Redeemed(6)

3.5 map and flatMap examples

See the example code to parse PNG file.

PNG.read("http://localhost:9000/assets/images/favicon.png")

4 Further Reading

4.1 Scala

4.2 Haskell


Comments?