.

Imprimir

Miscellaneous

Record Tagger

Definition

class RecordTagger() extends Cell[Config]

Description

Component that receives Record instances and tags them according to a function specified in the config and applied to the Record itself. This component is useful to filter Records when routing them.

Contract

@CellContract(Receives[Record], Sends[Tagged[Record]])

Configuration

trait Config extends CellCfg {
  val tagFn: Record => String  // Function returning the tag to apply from the Record itself
}

Usage

Example of usage within a Reactor where records representing client trades are tagged “CLIENT” and records representing broker trades are marked as “BROKER” then those tags are used to direct the records to different destinations:

cell[RecordTagger]("RecordTagger") {
  cellConfig := new RecordTaggerComp.Config {
    override val tagFn: Record => String = r => r.findRawField[String]("origin").get
  }
  route[Tagged[Record]] whenTagged "CLIENT" to "EntityMapperCT"
  route[Tagged[Record]] whenTagged "BROKER" to "EntityMapperBT"
}

Abstract Accumulator

Definition

abstract class Accumulator[T: TypeTag](implicit TCTag: ClassTag[T]) extends Cell[AccumulatorConfig[T]]

Description

Component which accumulates messages of type T results in a bounded buffer.
The accumulation buffer is sent if the size reaches the limit or if EndOfData arrives.

Contrac

@CellContract(
  Receives[T],
  Sends[Seq[T]]
)

Configuration

trait AccumulatorConfig[T] extends CellCfg {
  def bufferSize: Int // Limit for the accumulating buffer
}

Usage

Example of usage for a concrete BrokerTrade accumulator:

object BrokerTradeAccumulatorComp {
  trait Config extends AccumulatorConfig[BrokerTrade]{
    override def bufferSize: Int = 50
  }

  @CellContract(
    Receives[BrokerTrade],
    Sends[immutable.Seq[BrokerTrade]]
  )
  case class BrokerTradeAccumulator() extends Accumulator[BrokerTrade]()
}

Buffer Accumulator

Definition

case class BufferAccumulator() extends Cell[Config] {

Description

Component to accumulate data via IndexedByteBuffer and send it out when EoD is reached.
Expected lifecycle of component use MUST be via this sequence of messages sent to the Cell BufferAccumulator:
StartData (1 time), IndexedByteBuffer (0 or more times), EndData(1 time)
The accumulated data will be sent when EndData is received.
Clients of this component CAN reset the state and discard accumulated data by a further StartData message.

This component will fail on the following states:

    • Message called without prior call to onStartData.
    • Op provided in StartData and EndData are different.
    • Accumulated data exceeds maximum size.

Contract

@CellContract(Receives[IndexedByteBuffer], Sends[IndexedByteBuffer])

Configuration

trait Config extends CellCfg {
  def maxSendBufferSize: Int = 16 * pow(2, 20).toInt //16 Mb max data
}
Table of Contents