アプリカティブ評価

並列実行

Eff 値のデフォルトの解釈はモナド的(monadic)である。つまり、エフェクトのある値は順番に評価されていく。これは FutureEffect を含む値のリストをトラバース処理すると明らかになる。

import org.atnos.eff._, all._, future._, syntax.all._
import cats.Eval
import cats.data.Writer
import cats.syntax.traverse._
import cats.instances.list._
import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import org.atnos.eff.syntax.future._

type WriterString[A] = Writer[String, A]
type _writerString[R] = WriterString |= R

type S = Fx.fx3[Eval, TimedFuture, WriterString]

implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext

def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] =
  for {
    i1 <- delay(i)
    i2 <- futureDelay(i1)
    _  <- tell(i2.toString)
  } yield i2

val action: Eff[S, List[Int]] =
  List(1000, 500, 50).traverse(execute[S])

Await.result(action.runEval.runWriterLog.runSequential, 2.seconds)

> List(1000, 500, 50)

また一方で、 Effアプリカティブ(applicative)評価を使うことで、これらの計算を同時に走らせることもできる。

val action: Eff[S, List[Int]] =
  List(1000, 500, 50).traverseA(execute[S])

Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, global), 2.seconds)

> List(1000, 500, 50)

ここでは traverseAtraverse の代わり)を使っている。アプリカティブなトラバーサルを実行し、Future を同時に処理して、速いアクションから先に完了させていくためだ。

一括処理

アプリカティブなエフェクトの他の強みとして、複数の異なるリクエストをインターセプトし、ひとつのリクエストとしてバッチ処理できる。例えば、

import org.atnos.eff._, all._, syntax.all._
import cats.implicits._

// データベースからユーザーを取得するエフェクト。
// 呼出は単独でもバッチ処理でもよい
case class User(i: Int)
sealed trait UserDsl[+A]

case class GetUser(i: Int) extends UserDsl[User]
case class GetUsers(is: List[Int]) extends UserDsl[List[User]]
type _userDsl[R] = UserDsl /= R

def getUser[R :_userDsl](i: Int): Eff[R, User] =
  send[UserDsl, R, User](GetUser(i))

この DSL 用のインタープリターを作ってみよう。

// 実際には Web サービスを呼び出す
def getWebUser(i: Int): User = User(i)
def getWebUsers(is: List[Int]): List[User] = is.map(i => User(i))

// このインタープリターは Web サービスを呼び出し、実行結果のトレースを返す単純なものだ
def runDsl[A](eff: Eff[Fx1[UserDsl], A]): (A, Vector[String]) = {
  @tailrec
  def go(e: Eff[Fx1[UserDsl], A], trace: Vector[String]): (A, Vector[String]) =
    e match {
      case Pure(a,_) => (a, trace)
      case Impure(UnionTagged(GetUser(i), _), c, _)   => go(c(getWebUser(i)), trace :+ "getWebUser")
      case Impure(UnionTagged(GetUsers(is), _), c, _) => go(c(getWebUsers(is)), trace :+ "getWebUsers")
      case ap @ ImpureAp(_, _, _)                     => go(ap.toMonadic, trace)
      case Impure(_, _, _)                            => sys.error("this should not happen with just one effect")
  }
  go(eff, Vector())
}

2つの呼出を1つにまとめる Batchable インスタンスを提供することで、 UserDsl プログラムを最適化することもできる。

implicit def BatchableUserDsl: Batchable[UserDsl] = new Batchable[UserDsl] {
  type Z = List[User]
  type E = User

  def distribute(z: List[User]) = z

  def batch[X, Y](tx: UserDsl[X], ty: UserDsl[Y]): Option[UserDsl[Z]] = Option {
    (tx, ty) match {
      case (GetUser(i),   GetUser(j))   => GetUsers(List(i, j))
      case (GetUser(i),   GetUsers(is)) => GetUsers(i :: is)
      case (GetUsers(is), GetUser(i))   => GetUsers(is :+ i)
      case (GetUsers(is), GetUsers(js)) => GetUsers(is ++ js)
    }
  }
}

さあ、 User DSL をアプリカティブ呼出と一緒に使った、最適化できるプログラムを作ってみよう。

def program[R :_userDsl]: Eff[R, List[User]] =
  Eff.traverseA(List(1, 2, 3))(i => getUser(i))

その最適化バージョンは、

def optimised[R :_userDsl]: Eff[R, List[User]] =
  program.batch

プログラムの最適化バージョンとそうでないバージョンの実行は、同じ結果を生み出さねばならない。

show(runDsl(program[Fx1[UserDsl]]), runDsl(optimised[Fx1[UserDsl]]))
original:  User(1), User(2), User(3)
  trace: getWebUser, getWebUser, getWebUser

optimised: User(1), User(2), User(3)
  trace: getWebUsers