Aggregate Component


abstract class Aggregate [T: TypeTag, K: TypeTag, R: TypeTag](implicit TCTag: ClassTag[T]) extends Cell[AggregateConfig[T, K, R]]


Abstract component receives messages of an arbitrary type T and produces an aggregation according to the component configuration. This component is not usable on its own and needs to be extended. The extended aggregating components will bound the type of data T, the grouping Keys type K and a reduction operation result of type R. The relationship of these three types will be specified by overriding the aggregation functions in the component configuration type.

An aggregation is a the combination of a partitioning operation around some keys derived from T plus a reduction function on each partition.

This component config allows to express an arbitrary key type from the data messages of type T by overriding the groupBy function which takes T and returns K

For every K the reduction function aggregateOp will be called where a result R will be returned by combining the previously accumulated Result R

An initial Result value R can be provided by overriding initialValue in this component config.

This component operation draws similarities from SQL group by reduction functions.


receives(T), receives(AggregationRelease), receives(AggregationReset) 
sends(AggregationResult[K, R])


AggregationRelease: when this message is received the Aggregate component will send the current aggregation out.
AggregationReset : when this message is received the Aggregate component will discard current aggregated data T :

    • This is the data which is being aggregated


trait AggregateConfig[T, K, R] extends CellCfg {
  def groupBy: T => K                     // A function used to group elements into partitions by key.

  def initialValue: R                     // Initial value used to aggregate result for elements in partition for each key.

  def aggregateOp: (R, T) => R            // A function used to accumulate result in partition.

  def releaseOnEndData: Boolean = false   // Switch to indicate if the aggregated results should be released when End of Data is received.


An example of usage of this component will probably clarify what looks a more complicate setup than it is in reality.

Consider a series of records where each record represents the position for a particular formula one driver at the end of a particular race. Each record contains three fields:

Record: [driverId, raceId, position]

The data set contains all the formula1 races in the last 20 years and for each race it contains the position of every driver who participated in that race. All id’s are Numbers and are unique within their group (Drivers, Races).

The objective is to aggregate the sum of poles, seconds and thirds each driver had in the last 20 years. For this we will define a DriverStatsAggregateComp extending the Aggregate Component.


From the above we can deduce that the type of the series will be a “Record”, that is the T within the Aggregate component definition above.

We also want to partition by driver Id which is a number and therefore the type K within the aggregator will be Int.

Finally we want to define the accumulating Result type (R within the Aggregate component types) This type will be another Record with 4 fields: driverId, poles, seconds, thirds.

val PolesRecord = Record.define(IntField("driverId"), IntField("poles"), IntField("seconds"), IntField("thirds"))

So the definition of our component will be like:

@CellContract(Receives[Record], Sends[AggregationResult[Int, Record]])
case class DriverStatsAggregate() extends Aggregate[Record, Int, Record]()

All that is left now is to implement the configuration functions to perform the correct partitioning and aggregation.

We want to group by driverId, that means we need a function from Record to Int (T => K). We also want an aggregateOp function (T, R) => R to accumulate the drivers results and finally an initial value for the accumulator (R). The full definition of this new component would be something like this:

object DriverStatsAggregateComp {
  val PolesRecord = Record.define(IntField("driverId"), IntField("poles"), IntField("seconds"), IntField("thirds"))

  object Config extends AggregateConfig[Record, Int, Record] {
    override def releaseOnEndData = true
    override def groupBy: Record => Int = _.findField[Int]("driverId").get
    override def initialValue: Record = PolesRecord.recordWithValues(0, 0, 0, 0)
    override def aggregateOp: (Record, Record) => Record = (acc, raceInfo) => {
      var poles:   Int = acc.findField[Int]("poles").get
      var seconds: Int = acc.findField[Int]("seconds").get
      var thirds:  Int = acc.findField[Int]("thirds").get

      if (raceInfo.findField[Int]("position").get == 1) poles   += 1
      if (raceInfo.findField[Int]("position").get == 2) seconds += 1
      if (raceInfo.findField[Int]("position").get == 3) thirds  += 1

      PolesRecord.recordWithValues(raceInfo.findField[Int]("driverId").get, poles, seconds, thirds)

  @CellContract(Receives[Record], Sends[AggregationResult[Int, Record]])
  case class DriverStatsAggregate() extends Aggregate[Record, Int, Record]()

And the component would be used right away in a system spec:

cell[DriverStatsAggregate] {
  cellConfig := DriverStatsAggregateComp.Config
  route[AggregationResult[Int,Record]] to Next
Table of Contents