Help with the aggregate implementation

I’m looking at the implementation of aggregate apparently in scala-library-2.12.8-sources.jar TraversableOnce.scala. Here is the code I see.

  def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)

I don’t understand how combop gets called?

Is this implementation assuming that combop is the identity function simply because it has the same type signature?

Calling combop is optional. If you split input into multiple parts and then foldLeft each part, then you need combop to combine the results. The implementation you’ve shown doesn’t split input into multiple parts, so only the sequential reduction operation is needed.

Also combop is not assumed to be identity function. It even can’t as identity function IIUC must be B => B, not (B, B) => B.

1 Like

OIC, the combop is some sort of flatMap of identity such as union of sets, or append of lists?

in light of that, I think have misnamed my treeAggregate function posted here.

what should I call a function which takes a sequence-like-container, effectively transforms it to a sequence of a different type, but does so lazily without actually producing a new sequence, then folds the result using an initial zero element? for example a function which takes a list of strings, and sums their lengths, and is called with the a plus function and a stringLength function? or a function which takes a Set of Sets, and calculates the union of intersections given a set-union function and a set-intersection function?

Documentation for the method aggregate states that:

The implementation you’ve shown chosen one partition so combop wasn’t needed to be invoked.

In general following law should apply (it’s not enough, though):

// given
val list = List(a1, a2)
def zero = <something1>
val seqop = <something2>
val combop = <something3>

// all 3 below expression should evaluate to the same value
list.aggregate(zero)(seqop, combop)
list.foldLeft(zero)(seqop) // serial implementation
// parallel implementation, foldLeft's can be done in separate threads
combop(List(a1).foldLeft(zero)(seqop), List(a2).foldLeft(zero)(seqop))

What’s the signature of your function?

Here is the declaration, with what I admit is the wrong name. Plus I’d like to generalize it to work on more than just TraversableOnce[A].

def treeAggregate[A, B](objects: TraversableOnce[A], init: B, seqop: A => B, combop: (B, B) => B): B = {...}

Where’s the laziness? You’re returning B immediately not anything like () => B. I think your function is a classic example of map-reduce :] and could be named as such. If you want to have some laziness you can do that:

def mapReduce[A, B](objects: TraversableOnce[A], init: B, seqop: A => B, combop: (B, B) => B): B =
  objects.view.map(seqop).fold(init)(combop) // view makes many non-terminal operations lazy

But I would just combine seqop with combop in a single step:

def mapReduce[A, B](objects: TraversableOnce[A], init: B, seqop: A => B, combop: (B, B) => B): B =
  objects.foldLeft(init)((b, a) => combop(b, seqop(a)))

When you look at the implementation, is should be apparent why seqop and combop have to be different.

The idea is that it specifically groups into an optimal a tree structure as possible.

((1+2)+(3+4)) + ((5+6)+(7+8))

or
(("a".length + "ab".length)+("abc".length+"abcd".length)) + (("abcde".length+"abcdef".length)+("abcdefg".length+"abcdefgh".length))

You are right,my description of lazy was misleading. What I should have said, is that I want to delay the calls to combop as long as possible, and dereference the return values of seqop as early as possible. The implementation maintains a stack of at most log_2(n) values returned from seqop. (n being the length of the input list/set/etc)

  def treeAggregate[A, B](objects: TraversableOnce[A], init: B, seqop: A => B, combop: (B, B) => B): B = {
    def consumeStack(stack: List[(Int, B)]): List[(Int, B)] = {
      stack match {
        case (i, a1) :: (j, a2) :: tail if i == j => consumeStack((i + 1, combop(a1, a2)) :: tail)
        case _ => stack
      }
    }

    val stack = objects.foldLeft((1, init) :: Nil) { (stack: List[(Int, B)], ob: A) =>
      (1, seqop(ob)) :: consumeStack(stack)
    }
    // there may be up to log_2 of objects.size many pairs left on the stack
    // but there is not precisely 0 such pairs on the stack.
    assert(stack != Nil)

    stack.tail.map(_._2).fold(stack.head._2)(combop)
  }

aggregate from parallel collections does exactly what you want, with the exception that it doesn’t use combop to combine every element. Instead it does use seqop for folding small sublists sequentially and then only that is combined using combop. This is done to reduce overhead of distributing tasks over multiple threads with the assumption that seqop is significantly faster than combop.