エフェクトスタックを変換する

エフェクトスタックとは?

ここには言葉の誤用がある。「スタック」という名前はモナドトランスフォーマーについて話すときに使われる「モナドスタック」から来ている。しかしながら Eff では、エフェクトはスタックではなく、エフェクトのツリーとしてモデル化されている,

例えば4つのエフェクトT1, T2, T3, T4 の型レベル表現は次のように表される。

Fx.fx4[T1, T2, T3, T4]

// または

FxAppend[
  Fx1[T1],
  Fx3[T2, T3, T4]
]

なので、型レベルでエフェクトを操作するときは毎回、エフェクトのツリーを操作する。例えば、 エフェクト T3 を解釈すると次のツリーが残される。

FxAppend[
  Fx1[T1],
  Fx2[T2, T4]
]

このコードは次のことを示している。

// 今はまだ次の implicit の召喚はコンパイラーをクラッシュさせる
 val member_ : Member.Aux[T3, FxAppend[Fx1[T1], Fx3[T2, T3, T4]], FxAppend[Fx1[T1], Fx2[T2, T4]]] =
  implicitly[Member.Aux[T3, FxAppend[Fx1[T1], Fx3[T2, T3, T4]], FxAppend[Fx1[T1], Fx2[T2, T4]]]]

残念ながらコンパイラがこの型を取り扱うのは少しむずかしいので、メンバー値を取得するには、implicit 変換を使って「手動」でやるか、あるいは Aux の部分をなくしてメンバーインスタンスを召喚すればできる。

import org.atnos.eff._

// implicit を明示的に定義する必要がある
val member_ : Member.Aux[T3, FxAppend[Fx1[T1], Fx3[T2, T3, T4]], FxAppend[Fx1[T1], Fx2[T2, T4]]] =
  Member.MemberAppendR(Member.Member3M)

// これでも動く
val member: Member[T3, FxAppend[Fx1[T1], Fx3[T2, T3, T4]]] =
  implicitly[Member[T3, FxAppend[Fx1[T1], Fx3[T2, T3, T4]]]]

より重要なことは、上記の難しさがあるにも関わらず、コンパイラーは与えられたエフェクトの解釈の結果として生じる右側の型を把握できる。このため、次のコードはコンパイルできる。

import org.atnos.eff._

def runT3[R, U, A](e: Eff[R, A])(implicit m: Member.Aux[T3, R, U]): Eff[U, A] = ???
def runT2[R, U, A](e: Eff[R, A])(implicit m: Member.Aux[T2, R, U]): Eff[U, A] = ???
def runT1[R, U, A](e: Eff[R, A])(implicit m: Member.Aux[T1, R, U]): Eff[U, A] = ???
def runT4[R, U, A](e: Eff[R, A])(implicit m: Member.Aux[T4, R, U]): Eff[U, A] = ???

type S = FxAppend[Fx1[T1], Fx3[T2, T3, T4]]

runT1(runT4(runT2(runT3(Eff.send[T3, S, Int](???)))))

エフェクトを他のエフェクトに変換する

エフェクトの変換の典型的なユースケースは、Reader[S, *] エフェクトを持つスタックを Reader[B, *] エフェクトを持つスタックに変換することだ。ここで SB に「含まれる」(Big から Small へのマッピングが存在する)。例を示す。

import org.atnos.eff._, all._
import org.atnos.eff.syntax.all._
import cats._
import cats.data._

case class Conf(host: String, port: Int)

type ReaderPort[A] = Reader[Int, A]
type ReaderHost[A] = Reader[String, A]
type ReaderConf[A] = Reader[Conf, A]

type S1 = Fx.fx2[ReaderHost, Option]
type S2 = Fx.fx2[ReaderPort, Option]
type SS = Fx.fx2[ReaderConf, Option]

val readHost: Eff[S1, String] = for {
  c <- ask[S1, String]
  h <- OptionEffect.some[S1, String]("hello")
} yield h

val readPort: Eff[S2, String] = for {
  c <- ask[S2, Int]
  h <- OptionEffect.some[S2, String]("world")
} yield h

val fromHost = new (ReaderHost ~> ReaderConf) {
  def apply[X](r: ReaderHost[X]) = Reader((c: Conf) => r.run(c.host))
}

val fromPort = new (ReaderPort ~> ReaderConf) {
  def apply[X](r: ReaderPort[X]) = Reader((c: Conf) => r.run(c.port))
}

val action: Eff[SS, String] = for {
  s1 <- readHost.transform(fromHost)
  s2 <- readPort.transform(fromPort)
} yield s1 + " " + s2

action.runReader(Conf("www.me.com", 8080)).runOption.run

> Some(hello world)

ReaderState には 特別版の transform が存在する。

エフェクトを複数のエフェクトに変換する

Eff でよくやることは、エフェクト(例えば Web サービス DSL)を複数の他のエフェクト (TimedFutureEvalEither など)に変換することだ。

たとえばこんなスタックがあるとする。

type S = Fx.fx3[Authenticated, TimedFuture, Either[AuthError, *]]

そして認証アクションを TimedFutureEither に翻訳するインタープリターを書きたいとする。

import org.atnos.eff._
import org.atnos.eff.syntax.eff._
import org.atnos.eff.future._
import org.atnos.eff.interpret._
import scala.concurrent.Future

// valid なトークンに対してアクセス権をリストにする
case class AccessRights(rights: List[String])

// 認証エラー
case class AuthError(message: String)

// ユーザー認証の DSL
sealed trait Authenticated[A]
case class Authenticate(token: String) extends Authenticated[AccessRights]
type _authenticate[U] = Authenticated |= U

type AuthErroEither[A] = Either[AuthError, A]
type _error[U] = AuthErroEither |= U

/**
 * implicit パラメーターの順序は型推論のために本当に重要!
 * 下を見てほしい
 */
def runAuth[R, U, A](e: Eff[R, A])(implicit
  authenticated: Member.Aux[Authenticated, R, U],
  future:        _future[U],
  either:        _error[U]): Eff[U, A] =

   translate(e)(new Translate[Authenticated, U] {
     def apply[X](ax: Authenticated[X]): Eff[U, X] =
       ax match {
         case Authenticate(token) =>
           // スタック U の TimedFuture エフェクトを送る
           fromFuture(authenticateImpl(token)).
           // スタック U の Either 値を送る
           collapse
       }
    })

// トークンを認証するためにサービスを呼び出す
def authenticateImpl(token: String): Future[Either[AuthError, AccessRights]] =
  Future.successful[Either[AuthError, AccessRights]] { Left(AuthError("token invalid!")) }

def authenticate[S :_authenticate](token: String) = Authenticate(token).send

type S1 = Fx.fx3[Authenticated, Either[AuthError, *], TimedFuture]
type R1 = Fx.fx2[Either[AuthError, *], TimedFuture]

val result: Eff[R1, AccessRights] = runAuth(authenticate[S1]("faketoken"))

上記の send 呼出はスタック UTimedFuture 値を送る必要がある。これが可能なのは implicit 引数 future によって示されているように TimedFutureU 内のエフェクトだからだ。

さらに、authenticateEither[AuthError, *] 値を返す。これをU に「つぶす」こともできる。なぜなら Either[AuthError, *]either によって示されているように U 内のエフェクトだからだ。

次のようなもっと直接的な型シグネチャーを使わないことが不思議かもしれない。

def runAuth2[R, U :_future :_error, A](e: Eff[R, A])(
  implicit authenticated: Member.Aux[Authenticated, R, U]): Eff[U, A]

その理由は scalac がこの糖衣構文を次のように変換するからだ。

def runAuth2[R, U, A](e: Eff[R, A])(
  implicit future:        _future[U],
           either:        _error[U],
           authenticated: Member.Aux[Authenticated, R, U]): Eff[U, A] =

そして authenticated は implicit パラメーターのリストの最後にあるので、型推論を導くのには使えないのだ。

エフェクトを「局所的に」解釈する

データベースのクエリーを実行するメソッドがあるとしよう。

import org.atnos.eff._
import org.atnos.eff.all._
import cats.data._

trait Db[A]
type _writerString[R] = Writer[String, *] |= R

def runDb[R, U, A](queries: Eff[R, A])(
  implicit db:     Member.Aux[Db, R, U],
           eval:   _eval[U],
           writer: _writerString[U]): Eff[U, A] = ???

データベースクエリー(Db エフェクト)は Eval エフェクト内部の runDb メソッドによって実行されている。そして何を実行しているかをログ出力するために WriterString エフェクトを使う。

けれども、このコンポーネントのクライアントのいくつかは、実装の詳細であるログには関心がなく WriterString エフェクトを持ちたくないとする。

そんなときはこの追加メソッドを提供したいだろう。

def executeOnDb[R, U, A](queries: Eff[R, A])(
  implicit db:   Member.Aux[Db, R, U],
           eval: _eval[U]): Eff[U, A] = ???

どうすれば runDb を使って executeOnDb を実装できるだろうか?

import org.atnos.eff.all._
import org.atnos.eff.syntax.all._

def executeOnDb[R, U, A](queries: Eff[R, A])(
  implicit db:   Member.Aux[Db, R, U],
           eval: _eval[U]): Eff[U, A] = {

  type S = Fx.prepend[WriterString, R]
  runDb(queries.into[S]).runWriterNoLog[String]

}

prepend メソッドを使って WriterString エフェクトを含んだ局所的 local なスタックを作る。 それから Db エフェクトを実行し、ログを捨てて、最終的に Eff[U, A] を返す。

スタックをマージする

与えられたエフェクトスタック用にエフェクトを作ることができる。 例えば Hadoop クラスタとやりとりするために、あるいは、AWS S3 にデータを保存・読出するために。

import org.atnos.eff._, all._
import cats.data._
import cats.Eval

object HadoopStack {

  case class HadoopConf(mappers: Int)

  type HadoopReader[A] = Reader[HadoopConf, A]
  type WriterString[A] = Writer[String, A]
  type Hadoop = Fx.fx3[HadoopReader, WriterString, Eval]

  def readFile(path: String): Eff[Hadoop, String] =
    for {
      c <- ask[Hadoop, HadoopConf]
      _ <- tell[Hadoop, String]("Reading from "+path)
    } yield c.mappers.toString

  def runHadoopReader[R, U, A](conf: HadoopConf)(e: Eff[R, A])(implicit r: Member.Aux[HadoopReader, R, U]): Eff[U, A] =
    ReaderEffect.runReader(conf)(e)

}

object S3Stack {

  case class S3Conf(bucket: String)

  type S3Reader[A] = Reader[S3Conf, A]
  type WriterString[A] = Writer[String, A]

  type S3 = Fx.fx3[S3Reader, WriterString, Eval]

  def writeFile(key: String, content: String): Eff[S3, Unit] =
    for {
      c <- ask[S3, S3Conf]
      _ <- tell[S3, String]("Writing to bucket "+c.bucket+": "+content)
    } yield ()

  def runS3Reader[R, U, A](conf: S3Conf)(e: Eff[R, A])(implicit r: Member.Aux[S3Reader, R, U]): Eff[U, A] =
    ReaderEffect.runReader(conf)(e)
}

では、S3 と Hadoop の両方を使いたいときはどうするのだろう? 上記の定義から分かるようにこれら2つのスタックにはいくつか共通のスタックがあるので、その結果として生じる、扱いたいスタックは次のようになる。

import org.atnos.eff._
import cats.Eval
import HadoopStack._
import S3Stack.{WriterString=>_,_}

type HadoopS3 = Fx.fx4[S3Reader, HadoopReader, WriterString, Eval]

そして、この共通のスタックにそれぞれのスタックからエフェクトを注入するために、into メソッドが使える。

import S3Stack._
import HadoopStack._
// これは `into` と runXXX シンタックスをインポートする
import org.atnos.eff.syntax.all._

val action = for {
  // Hadoop からファイルを読み取る
  s <- readFile("/tmp/data").into[HadoopS3]

  // S3 にファイルを書き込む
  _ <- writeFile("key", s)  .into[HadoopS3]
} yield ()

// そして合成されたアクションを実行する
action.runReader(S3Conf("bucket")).runReader(HadoopConf(10)).runWriter.runEval.run

> ((),List(Reading from /tmp/data, Writing to bucket bucket: 10))

このアプローチの完全な実例は shared/src/test/org/atnos/example/StacksSpec で見つけられる。