Scala Snippet: Unlawful Effects

| 1 minute | Comments

Unlawful/independent version of cats.effect.Effect from Cats Effect v2. Allows for converting (and executing) IO-like values to scala.concurrent.Future, being also good for a graceful migration to Cats Effect v3:

import cats.ApplicativeError
import cats.effect.{ Effect, IO }
import cats.implicits._
import simulacrum.typeclass
import scala.concurrent.{ Future, Promise }

/**
  * Type class defining an "unlawful" variant of `cats.effect.Effect`.
  *
  * This allows it to work with plain `Future`, which cannot implement `Effect`.
  * It also allows for a graceful migration to Cats Effect v3.
  *
  * NOTE: if the requirement is `F[_] : Sync : UnlawfulEffect` then this
  * is de facto equivalent with `Effect`, therefore `UnlawfulEffect` shouldn't
  * be used.
  */
@typeclass trait UnlawfulEffect[F[_]] {
  def unsafeToFuture[A](fa: F[A]): Future[A]
}

object UnlawfulEffect extends UnlawfulEffectLowLevelImplicits {
  /**
    * Standard `Future` instance, which couldn't be a lawful `Effect`.
    */
  implicit val forFuture: UnlawfulEffect[Future] =
    new UnlawfulEffect[Future] {
      override def unsafeToFuture[A](fa: Future[A]): Future[A] =
        fa
    }

  /**
    * Optimization for `cats.effect.IO`, even if this should be handled by
    * [[forAnyEffect]].
    */
  implicit val forIO: UnlawfulEffect[IO] =
    new UnlawfulEffect[IO] {
      override def unsafeToFuture[A](fa: IO[A]): Future[A] =
        fa.unsafeToFuture()
    }
}

trait UnlawfulEffectLowLevelImplicits { self: UnlawfulEffect.type =>
  /**
    * Converts from:
    * [[https://typelevel.org/cats-effect/typeclasses/effect.html]]
    */
  implicit def forAnyEffect[F[_]](implicit F: Effect[F]): UnlawfulEffect[F] =
    new UnlawfulEffect[F] {
      override def unsafeToFuture[A](fa: F[A]): Future[A] = {
        val p = Promise[A]()
        F.runAsync(fa) { result =>
          // Not really cool to not suspend side-effects here, but
          // we know the context in which we are in, and it's fine, this time;
          // Don't try this at home!
          p.complete(result.toTry)
          IO.unit
        }.unsafeRunSync()
        p.future
      }
    }
}

We can then interact with more impure APIs, that aren’t IO-driven:

import akka.stream.scaladsl.Flow

implicit class FlowOps[In, Out, Mat](flow: Flow[In, Out, Mat]) 
  extends AnyVal {

  def mapEffect[F[_]: UnlawfulEffect, Out2](
    parallelism: Int
  )(fa: Out => F[Out2]): Flow[In, Out2, Mat] = {
    flow.mapAsync(parallelism)(out => UnlawfulEffect.unsafeToFuture(fa(out)))
  }
}

Sample:

import cats.effect.IO

Flow[Int].mapEffect { num =>
  IO {
    println(s"Received: $num")
    num.toString
  }
}

This is similar to usage of TaskLike in Monix.

| Written by