Building a program in functional way using cats and cats-effect

I was trying to teach myself more functional programming using Scala and I’m now looking to build an app using cats and cats-effect.

The program I have in mind is a WebSocket client. It connects to an endpoint, sends an initial message(s). Then it mostly reacts to the messages sent by the server. No user interaction is required.

This line of though lead me to also include fs2, sttp libraries and write the scaffolding below.

  • Is this the right direction as far as writing functional Scala apps?
  • How can I develop the WebSocketApp trait to be able to build an using composable functions? I need to parse the incoming messages, handle then, call the database, prepare response, encode it into JSON, etc. How can I do this leveraging cats and functional paradigm?
  • How can I further extend this program to work with multiple WebSocket endpoints/stream in parallel?
import fs2._
import cats.effect._
import sttp.ws.WebSocketFrame
import sttp.client3._
import sttp.client3.httpclient.fs2.HttpClientFs2Backend
import sttp.capabilities.fs2.Fs2Streams
import sttp.model.Uri

trait WebSocketApp:
  def init: Seq[WebSocketFrame]
  def apply(frame: WebSocketFrame): IO[Seq[WebSocketFrame]]

object Main extends IOApp:
  val app: WebSocketApp = ???
  val uri: Uri = ???

  def wsFramePipe(
      app: WebSocketApp
  ): Pipe[IO, WebSocketFrame.Data[?], WebSocketFrame] = { input =>
    Stream.emits(app.init) ++
      input.evalMap(app.apply).flatMap(Stream.emits)
  }

  def wsStart(app: WebSocketApp): IO[Unit] = HttpClientFs2Backend
    .resource[IO]()
    .use { backend =>
      basicRequest
        .response(asWebSocketStream(Fs2Streams[IO])(wsFramePipe(app)))
        .get(uri)
        .send(backend)
        .void
    }
  override def run(args: List[String]): IO[ExitCode] =
    for {
      _ <- IO.println("hello world")
      _ <- wsStart(app)
    } yield ExitCode.Success

Well, it kind of depends on what you mean with “functional Scala. But, yes, this will be the right approach to start working with the “Programs as Values” paradigm; which is usually what people mean when they say they want to be more “functional”.

So there are two questions here.

  1. How to properly model the WebSocketApp logic.
  2. How to do common computational operations.

For 2 check common libraries of the typelevel ecosystem like circe, doobie, skunk, etc.

For 1 I would say a state machine is a good way to handle this.
You will have a Ref to contain the current State, and a step function that receives the current State and an Input and produces the new State and an Action to execute.
Something like this:

// Properly model these based on your use case.
type State
type Input
type Action = IO[Seq[WebSocketFrame]]

final class MyWebSocketApp private (
  currentState: Ref[IO, State],
  // You may need other stuff here like a json parser, a db connection, etc.
) extendsWebSocketApp:
  override val init: Seq[WebSocketFrame] =
    ???

  override def apply(frame: WebSocketFrame): IO[Seq[WebSocketFrame]] =
    currentState.flatModify(step(input = parseFrame(frame)))

  // Maybe this may return an IO[Input] instead, adjust accordingly.
  private def parseFrame(frame: WebSocketFrame): Input =
    ???

  // Runs a single step of the state machine.
  private def step(input: Input): State => (State, Action) =
    // Usually implemented as a bunch of nested pattern matches on the input and then on the state.
    ???

object MyWebSocketApp:
  def make(
    // External dependencies like a db connection.
  ): IO[MyWebSocketApp] =
    IO.ref(State.initial).map { currentState =>
      new MyWebSocketApp(currentState, ...)
    }
end MyWebSocketApp 

Note that the creation of a MyWebSocketApp is now effectual, and its dependencies may be initialized as a Resource. Meaning that you can’t have a simple val in your Main and rather you need to thread everything together in your run.

I don’t fully recall the web sockets APIs, but IIRC you basically can have a Stream of connections and then you can parEvalMap that to process each connection in parallel.


BTW, check IOApp.Simple to simplify your run method.


Finally, since this is quite a big question with multiple sub-questions. I would advise two things:

  1. Follow the type-tetris approach to split your problem into smaller ones. And then focus on solving each individual bit.
  2. It may be easier to follow up on this as a chat in Discord: Scala
1 Like

Or for questions about the Typelevel stack in specific, there’s a Discord for the Typelevel community, with lots of focused channels for things like cats, cate-effect, fs2, and so on.

2 Likes