Eff
値のデフォルトの解釈はモナド的である。つまり、エフェクトのある値は順番に評価されていく。これは
FutureEffect
を含む値のリストをトラバース処理すると明らかになる。
import org.atnos.eff._, all._, future._, syntax.all._
import cats.Eval
import cats.data.Writer
import cats.syntax.traverse._
import cats.instances.list._
import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import org.atnos.eff.syntax.future._
type WriterString[A] = Writer[String, A]
type _writerString[R] = WriterString |= R
type S = Fx.fx3[Eval, TimedFuture, WriterString]
implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext
def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] =
for {
i1 <- delay(i)
i2 <- futureDelay(i1)
_ <- tell(i2.toString)
} yield i2
val action: Eff[S, List[Int]] =
List(1000, 500, 50).traverse(execute[S])
Await.result(action.runEval.runWriterLog.runSequential, 2.seconds)
> List(1000, 500, 50)
また一方で、 Eff
にアプリカティブ評価を使うことで、これらの計算を同時に走らせることもできる。
val action: Eff[S, List[Int]] =
List(1000, 500, 50).traverseA(execute[S])
Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, global), 2.seconds)
> List(1000, 500, 50)
ここでは traverseA
(
traverse
の代わり)を使っている。アプリカティブなトラバーサルを実行し、Future
を同時に処理して、速いアクションから先に完了させていくためだ。
アプリカティブなエフェクトの他の強みとして、複数の異なるリクエストをインターセプトし、ひとつのリクエストとしてバッチ処理できる。例えば、
import org.atnos.eff._, all._, syntax.all._
import cats.implicits._
// データベースからユーザーを取得するエフェクト。
// 呼出は単独でもバッチ処理でもよい
case class User(i: Int)
sealed trait UserDsl[+A]
case class GetUser(i: Int) extends UserDsl[User]
case class GetUsers(is: List[Int]) extends UserDsl[List[User]]
type _userDsl[R] = UserDsl /= R
def getUser[R :_userDsl](i: Int): Eff[R, User] =
send[UserDsl, R, User](GetUser(i))
この DSL 用のインタープリターを作ってみよう。
// 実際には Web サービスを呼び出す
def getWebUser(i: Int): User = User(i)
def getWebUsers(is: List[Int]): List[User] = is.map(i => User(i))
// このインタープリターは Web サービスを呼び出し、実行結果のトレースを返す単純なものだ
def runDsl[A](eff: Eff[Fx1[UserDsl], A]): (A, Vector[String]) = {
@tailrec
def go(e: Eff[Fx1[UserDsl], A], trace: Vector[String]): (A, Vector[String]) =
e match {
case Pure(a,_) => (a, trace)
case Impure(UnionTagged(GetUser(i), _), c, _) => go(c(getWebUser(i)), trace :+ "getWebUser")
case Impure(UnionTagged(GetUsers(is), _), c, _) => go(c(getWebUsers(is)), trace :+ "getWebUsers")
case ap @ ImpureAp(_, _, _) => go(ap.toMonadic, trace)
case Impure(_, _, _) => sys.error("this should not happen with just one effect")
}
go(eff, Vector())
}
2つの呼出を1つにまとめる
Batchable
インスタンスを提供することで、
UserDsl
プログラムを最適化することもできる。
implicit def BatchableUserDsl: Batchable[UserDsl] = new Batchable[UserDsl] {
type Z = List[User]
type E = User
def distribute(z: List[User]) = z
def batch[X, Y](tx: UserDsl[X], ty: UserDsl[Y]): Option[UserDsl[Z]] = Option {
(tx, ty) match {
case (GetUser(i), GetUser(j)) => GetUsers(List(i, j))
case (GetUser(i), GetUsers(is)) => GetUsers(i :: is)
case (GetUsers(is), GetUser(i)) => GetUsers(is :+ i)
case (GetUsers(is), GetUsers(js)) => GetUsers(is ++ js)
}
}
}
さあ、 User
DSL
をアプリカティブ呼出と一緒に使った、最適化できるプログラムを作ってみよう。
def program[R :_userDsl]: Eff[R, List[User]] =
Eff.traverseA(List(1, 2, 3))(i => getUser(i))
その最適化バージョンは、
def optimised[R :_userDsl]: Eff[R, List[User]] =
program.batch
プログラムの最適化バージョンとそうでないバージョンの実行は、同じ結果を生み出さねばならない。
show(runDsl(program[Fx1[UserDsl]]), runDsl(optimised[Fx1[UserDsl]]))
original: User(1), User(2), User(3)
trace: getWebUser, getWebUser, getWebUser
optimised: User(1), User(2), User(3)
trace: getWebUsers