Effect Runtime

| 2 minutes | Comments

Defining an EffectRuntime that is used to build IO effects. This would be a replacement for ContextShift (Cats Effect 2.x), with an integrated Logger, Scheduler (thus having access to Timer too), and utilities for monitoring.

The UnsafeLogger / SafeLogger and UnsafeMonitoring interfaces are left as an exercise for the reader.

import cats.effect._
import cats.implicits._
import monix.execution._
// ...

/**
  * Slice of [[EffectRuntime]], to be used only when a dependency on
  * [[UnsafeLogger]] is needed.
  */
trait UnsafeRuntimeLogger {
  def unsafe: UnsafeLoggerRef

  trait UnsafeLoggerRef {
    def logger: UnsafeLogger
  }
}

/**
  * Slice of [[EffectRuntime]], to be used only when a dependency 
  * on `ExecutionContext` / `monix.execution.Scheduler` is needed.
  */
trait UnsafeRuntimeScheduler {
  def unsafe: UnsafeSchedulerRef

  trait UnsafeSchedulerRef {
    def scheduler: Scheduler
  }
}

/**
  * Slice of [[EffectRuntime]], to be used only when a reference
  * to [[UnsafeMonitoring]] is needed.
  */
trait UnsafeRuntimeMonitoring {
  def unsafe: UnsafeMonitoringRef

  trait UnsafeMonitoringRef {
    def monitoring: UnsafeMonitoring
  }
}

/**
  * Slice of [[EffectRuntime]], to be used only in an "unsafe"context
  * (where side effects are not suspended in `F[_]`).
  */
trait UnsafeRuntime
  extends UnsafeRuntimeLogger
  with UnsafeRuntimeScheduler
  with UnsafeRuntimeMonitoring {

  def unsafe: Unsafe

  trait Unsafe extends UnsafeLoggerRef 
    with UnsafeSchedulerRef 
    with UnsafeMonitoringRef
}

/**
  * Slice of [[EffectRuntime]], to be used only when a reference
  * to [[SafeLogger]] is needed.
  */
trait EffectRuntimeLogger[F[_]] extends UnsafeRuntimeLogger {
  protected implicit def F: Sync[F]
  def logger: SafeLogger[F]
}

/**
  * Our evolved `cats.effect.ContextShift` that has everything 
  * we need in it to build IO effects.
  */
abstract class EffectRuntime[F[_]]
  extends ContextShift[F]
  with EffectRuntimeLogger[F]
  with UnsafeRuntime { self =>

  protected implicit def F: Async[F]

  def logger: SafeLogger[F]
  def scheduler: F[Scheduler]
  def blocker: Blocker
  def timer(implicit F: Concurrent[F]): Timer[F]

  def deferAction[A](f: Scheduler => F[A]): F[A] =
    self.scheduler.flatMap(f)
}

Sample for wrapping a Future-based API:

import scala.concurrent._
import scala.concurrent.duration._

def unsafeGetRequest(req: Request)(
  implicit ec: ExecutionContext
): Future[Response] = {
  ???
}

def getRequest[F[_]: Concurrent](req: Request)(
  implicit r: EffectRuntime[F]
): F[Response] =
  r.deferAction { implicit ec =>
    for {
      _ <- r.logger.debug(s"Triggering request: $req")
      resp <- Async
        .fromFuture(Sync[F].delay(unsafeGetRequest(req)))
        .handleErrorWith { err =>
          r.logger.error("Unexpected error, retrying", err) >>
          r.timer[F].sleep(1.second) >>
          getRequest(req)
        }
    } yield {
      resp
    }
  }

WARN: the retry logic isn’t a good one. For a better implementation, see Retry Failing Tasks with Cats and Scala.

| Written by
Tags: Cats Effect | FP | Scala | Snippet