Component that receives Record instances and tags them according to a function specified in the config and applied to the Record itself. This component is useful to filter Records when routing them.
trait Config extends CellCfg {
val tagFn: Record => String // Function returning the tag to apply from the Record itself
}
Usage
Example of usage within a Reactor where records representing client trades are tagged “CLIENT” and records representing broker trades are marked as “BROKER” then those tags are used to direct the records to different destinations:
cell[RecordTagger]("RecordTagger") {
cellConfig := new RecordTaggerComp.Config {
override val tagFn: Record => String = r => r.findRawField[String]("origin").get
}
route[Tagged[Record]] whenTagged "CLIENT" to "EntityMapperCT"
route[Tagged[Record]] whenTagged "BROKER" to "EntityMapperBT"
}
Abstract Accumulator
Definition
abstract class Accumulator[T: TypeTag](implicit TCTag: ClassTag[T]) extends Cell[AccumulatorConfig[T]]
Description
Component which accumulates messages of type T results in a bounded buffer. The accumulation buffer is sent if the size reaches the limit or if EndOfData arrives.
Contrac
@CellContract(
Receives[T],
Sends[Seq[T]]
)
Configuration
trait AccumulatorConfig[T] extends CellCfg {
def bufferSize: Int // Limit for the accumulating buffer
}
Usage
Example of usage for a concrete BrokerTrade accumulator:
object BrokerTradeAccumulatorComp {
trait Config extends AccumulatorConfig[BrokerTrade]{
override def bufferSize: Int = 50
}
@CellContract(
Receives[BrokerTrade],
Sends[immutable.Seq[BrokerTrade]]
)
case class BrokerTradeAccumulator() extends Accumulator[BrokerTrade]()
}
Buffer Accumulator
Definition
case class BufferAccumulator() extends Cell[Config] {
Description
Component to accumulate data via IndexedByteBuffer and send it out when EoD is reached. Expected lifecycle of component use MUST be via this sequence of messages sent to the Cell BufferAccumulator: StartData (1 time), IndexedByteBuffer (0 or more times), EndData(1 time) The accumulated data will be sent when EndData is received. Clients of this component CAN reset the state and discard accumulated data by a further StartData message.
This component will fail on the following states:
Message called without prior call to onStartData.
Op provided in StartData and EndData are different.