Sorry if this is a little longer, but I cannot really ask for help without explaining some things first.
I am building a library that helps with using collections in a Functional Reactive Programming context. The idea is that having i.e. an Observable[Seq[A]]
is really inperformant. Let’s say that you do the following:
val stream: Observable[Seq[Int]] = ???
val sorted = stream.map(_.map(_ * 2))
In this case, the whole collection would be mapped anew every time onNext
was called on the Observable.
Instead my library is supposed to provide the following functionality:
val input: VBuffer[Double, Subject] = VBuffer[Double](PublishSubject)
val output: RSeq[Double, Observable] = input.map(n => n * 2)
val result: Observable[RSeqEvent[Double]] = output.stream
result.foreach(println)
input += 1 // => Append(2)
input += 2 // => Append(4)
input.prepend(0) // => Prepend(0)
input.insert(2, 1.5) // => Insertion(2, 3)
input ++= List(10, 11, 12) // => AppendAll(List(20, 22, 24))
input.patch(4, List(5, 6, 7), 0) // => Patch(3, List(10, 12, 14), 0)
Since I do not want to depend on a specific streaming lib, I have a typeclass called Sink
which abstracts over streaming types.
trait Sink[-F[_]] {
def onNext[A](sink: F[A])(value: A): Unit
def onError[A](sink: F[A])(error: Throwable): Unit
}
I know that returning Unit
is not functionally pure and I would normally return something else, like an IO[Ack]
maybe, or a generic effect type F[_] : Effect
, but since I do not want to re-implement all of monix for this example, Unit
will do.
To make things easier to understand, I modeled things after scalas stdlib collections. (also because they already are pretty awesome)
This is my current structure
trait RIterableOps[+A, +G[_], +CC[_, _[_]], +C] {
type E
}
trait RIterable[+A, +G[_]] extends RIterableOps[A, G, RIterable, RIterable[A, G]] {
def stream: G[E]
}
sealed trait RSeqEvent[+A]
case class Append[+A](elem: A) extends RSeqEvent[A]
trait RSeq[+A, +G[_]] extends RIterable[A, G] {
type E <: RSeqEvent[A]
}
abstract class VBuffer[A, G[_] : Sink] extends RSeq[A, G] {
def streamEvent(event: RSeqEvent[A])(implicit G: Sink[G]): Unit = G.onNext(stream)(event)
override def append(elem: A): this.type = {
streamEvent(Append(elem))
this
}
}
The type A
is the content of the collection and G[_]
is the streaming type, i.e. a monix Observable
. E
is the type of the events that can occur on that collection, i.e. Append
or Insert
. I made E
a type-member to not overcomplicate the type parameters. Two already is wayy too much.
This worked out nicely so far, but when Implementing the VBuffer
, I am getting a compile error.
type mismatch;
found : event.type (with underlying type Playground.this.RSeqEvent[A])
required: VBuffer.this.E
Here is a scastie that reproduces the error. https://scastie.scala-lang.org/lYeSJEnVQnOD3BIaLMRd1g
I am pretty inexperienced with using type variances, so I don’t really know where to start describing this.
The basic idea is that VBuffer
provides a way to input events into the stream, using an interface that is similar to scala.collection.Buffer
. How could I work around this type error?
Or should I revamp my design entirely?