.

Imprimir

Sorting

Abstract Accumulation Sorter

Definition

abstract class AccumulationSorter[T: TypeTag](implicit TCTag: ClassTag[T]) extends Cell[AccumulationSorterConfig[T]]

Description

Component which sorts messages of type T in the order determined by the compare function lessThan provided within the config.

This component accumulates the results in a bounded buffer. The accumulation buffer is sent if the size reaches the limit set in the config or if EndOfData signal arrives.

Contract

def contract: Contract = contract(Receives[T], Sends[Seq[T]])

Configuration

trait AccumulationSorterConfig[T] extends CellCfg {
  def bufferSize: Int
  def lessThan: (T, T) => Boolean
}

Usage

Usage in a concrete ClientTrade sorting component:

object ClientTradeAccumulationSorterComp  {
  trait Config extends AccumulationSorterConfig[ClientTrade]{
    override def bufferSize: Int = 100
    override def lessThan: (ClientTrade, ClientTrade) => Boolean = (ct1, ct2) => ct1.trade.quantity < ct2.trade.quantity
  }

  @CellContract(
    Receives[ClientTrade],
    Sends[immutable.Seq[ClientTrade]]
  )
  case class ClientTradeAccumulationSorter() extends AccumulationSorter[ClientTrade]()
}

Abstract Block Sorter

Definition

abstract class BlockSorter [T: TypeTag](implicit SeqTCTag: ClassTag[Seq[T]]) extends Cell[BlockSorterConfig[T]]

Description

Component that receives unsorted Sequences and sends out a sorted sequences with the same elements as in the original sequence.

Contract

def contract: Contract = contract(Receives[Seq[T]], Sends[Seq[T]])

Configuration

trait BlockSorterConfig[T] extends CellCfg {
  def lessThan: (T, T) => Boolean // Function to determine order of 2 elements.
}

Usage

Example of concrete usage to sort blocks of BrokerTrades by price:

object BrokerTradeBlockSorterComp {
  trait Config extends BlockSorterConfig[BrokerTrade]{
    override def lessThan: (BrokerTrade, BrokerTrade) =>
      Boolean = (bt1: BrokerTrade, bt2: BrokerTrade) => bt1.trade.price < bt2.trade.price
  }

  @CellContract(
    Receives[immutable.Seq[BrokerTrade]],
    Sends[immutable.Seq[BrokerTrade]]
  )
  case class BrokerTradeBlockSorter() extends BlockSorter[BrokerTrade]()
}

Abstract Converge Sorter

Definition

abstract class ConvergeSorter[T: TypeTag](implicit SeqTCTag: ClassTag[Seq[T]]) extends Cell[ConvergeSorterConfig[T]]

Description

Component which receives a number of collections of messages of type T and accumulates them.
After receiving the last (EndOfData is received) it will merge-sort all collections in the order determined by the compare function provided in the config before sending out the sorted stream of message of type T.
If sortInput is set to True the component will sort each block of messages received before storing it. If this flag is set to false the input blocks are expected to be already sorted.

Contract

def contract: Contract = contract(Receives[Seq[T], Sends[T])

Configuration

trait ConvergeSorterConfig[T] extends CellCfg {
  def sortInput: Boolean = false
  def lessThan: (T, T) => Boolean
}

Usage

Example of concrete usage to merge blocks of ClientTrade:

object ClientTradeConvergeSorterComp {
  trait Config extends ConvergeSorterConfig[ClientTrade]{
    override def sortInput: Boolean = false
    override def lessThan: (ClientTrade, ClientTrade) => Boolean = (ct1, ct2) => ct1.trade.quantity < ct2.trade.quantity
  }

  @CellContract(
    Receives[immutable.Seq[ClientTrade]],
    Sends[ClientTrade]
  )
  case class ClientTradeConvergeSorter() extends ConvergeSorter[ClientTrade]()
}

Abstract File Converge Sorter

Definition

abstract class FileConvergeSorter[T: TypeTag](implicit SeqTCTag: ClassTag[Seq[T]], ct: ClassTag[T])

Description

Component which receives a number of collections of messages of type T and save into files. This component is similar to the Abstract Converge Sorter but stores the collections into files instead of keeping them in memory.
After receiving the last (EndOfData is received) it will merge-sort all collections in the order determined by the compare function provided in the config before sending out the sorted stream of message of type T.

Contract

def contract: Contract = contract(Receives[Seq[T]], Sends[T])

Configuration

The FileConvergeSorterConfig extends config from ConvergerSorter component and adds a few specific values.

trait FileConvergeSorterConfig[T] extends ConvergeSorterConfig[T] {
  def dataDirectory: Path = Paths.get(System.getProperty("java.io.tmpdir")) // A temporary location to store the files
  def serialise: T => String                                                // A serialise function for the type T to text
  def deserialise: String => T                                              // A deserialise function from text to type T
}

Usage

Example of usage for a concrete BrokerTrade sorting component:

object BrokerTradeFileConvergeSorterComp {
  trait Config extends FileConvergeSorterConfig[BrokerTrade]{
    override def sortInput: Boolean = false
    override def lessThan: (BrokerTrade, BrokerTrade) => Boolean = (bt1, bt2) => bt1.trade.price < bt2.trade.price
    override def dataDirectory: Path = Paths.get("./data/dataTest")
    override def serialise: BrokerTrade => String = bt => bt.trade.toCSVLine
    override def deserialise: String => BrokerTrade = s => BrokerTrade(Trade.fromCSV(s))
  }

  @CellContract(Receives[immutable.Seq[BrokerTrade]], Sends[BrokerTrade])
  case class BrokerTradeFileConvergeSorter() extends FileConvergeSorter[BrokerTrade]()
}

String Accumulation Sorter

Definition

This component is a concrete implementation of the Abstract Accumulation Sorter for Strings. This is the full definition of the component:

object StringAccumulationSorterComp {
  trait Config extends AccumulationSorterConfig[String]{
    override def bufferSize: Int = 100
    override def lessThan: (String, String) => Boolean = (s1, s2) => s1.length < s2.length
  }

  @CellContract(Receives[String], Sends[Seq[String]])
  case class StringAccumulationSorter() extends AccumulationSorter[String]()

Description

Component which receives a stream of Strings and accumulates them. When the buffer limit is reached or EndOfData is received the collection of Strings is sent out sorted according to the order function provided within the configuration. In this example the Strings are sorted not alphabetically but by the length of each of them.

Contract

See full definition above.

Configuration

See full definition above.

String Block Sorter

Definition

This component is a concrete implementation of the Abstract Block Sorter for Strings. This is the full definition of the component:

object StringBlockSorterComp {
  trait Config extends BlockSorterConfig[String]{
    override def lessThan: (String, String) => Boolean = (s1: String, s2: String) => s1.length < s2.length
  }

  @CellContract(Receives[Seq[String]], Sends[Seq[String]])
  case class StringBlockSorter() extends BlockSorter[String]()
}

Description

Component which receives a Sequences of Strings and sent them out sorted according to the order function provided within the configuration. In this example the Strings are sorted not alphabetically but by the length of each of them.

Contract

See full definition above.

Configuration

See full definition above.

String Converge Sorter

Definition

This component is a concrete implementation of the Abstract Converge Sorter for Strings. This is the full definition for it:

object StringConvergeSorterComp {
  trait Config extends ConvergeSorterConfig[String]{
    override def sortInput: Boolean = false
    override def lessThan: (String, String) => Boolean = (s1, s2) => s1.length < s2.length
  }

  @CellContract(Receives[Seq[String]], Sends[String])
  case class StringConvergeSorter() extends ConvergeSorter[String]()
}

Description

Component which receives a number of collections of strings and accumulates them.
After receiving the last (EndOfData is received) it will merge-sort all collections in the order determined by the compare function provided in the config before sending out the sorted stream of Strings.
If sortInput is set to True the component will sort each block of messages received before storing it. If this flag is set to false the input blocks are expected to be already sorted.

Contract

See full definition above.

Configuration

See full definition above.

String File Converge Sorter

Definition

This is the concrete implementation of the component Abstract File Converge Sorter for Strings. Here is the full implementation of it:

object StringFileConvergeSorterComp {
  trait Config extends FileConvergeSorterConfig[String]{
    override def sortInput: Boolean = false
    override def lessThan: (String, String) => Boolean = (s1, s2) => s1.length < s2.length
    override def serialise: String => String = s => s
    override def deserialise: String => String = s => s
  }

  @CellContract(Receives[Seq[String]], Sends[String])
  case class StringFileConvergeSorter() extends FileConvergeSorter[String]()
}

Description

Component which receives a number of collections of Strings and save them into files. This component is similar to the String Converge Sorter but stores the collections into files instead of keeping them in memory.
After receiving the last (EndOfData is received) it will merge-sort all collections in the order determined by the compare function provided in the config before sending out the sorted stream of Strings.

Contract

See full definition above.

Configuration

See full definition above.

Table of Contents