.

Imprimir

Marshalling

Buffer String Unmarshaller

Definition

case class BufferStringUnmarshaller() extends Cell[Config]

Description

This component receives indexed byte buffers from a data source and interprets their content as a sequence of delimited strings codified with a certain charset.
The component will try to decode and send out the lines contained in every block of bytes. However for multi-byte characters it could be possible that a character has been split at the block boundary and therefore the beginning and end portions of the byte buffer must be treated specially. This is delegated to another component downstream.

Therefore the component sends out not only lines but also IndexedStartByteBuffer and IndexedEndByteBuffer plus the full received IndexedByteBuffer in case it can’t match a single delimiter within a byte buffer.

If the headers flag is set to true, then the first non-comment line will be wrapped in a Headers object and that will be send out. Otherwise all lines will be send out normally.

Contract

@CellContract(
  Receives[IndexedByteBuffer],
  Sends[String], Sends[Headers],
  Sends[IndexedStartOfByteBuffer], Sends[IndexedEndOfByteBuffer], Sends[IndexedByteBuffer]
)

Configuration

trait Config extends CellCfg {
  def lineDelimiter:  String = "\n"   // Line delimiter sequence to look up within the block
  def charset:        String = "UTF8" // Charset used to decode the bytes
  def maxLineLength:  Int = 16535     // Max line lenght allowed
  def headers: Boolean = false        // Set this to true if the first line are headers and you want it wrapped into a Headers class
}

Usage

Example of usage within a Reactor paired with a PartialBufferStringMatcher. Please note that as we do not make use of Headers the definition discards those messages (which will never be sent anyway)

cell[BufferStringUnmarshaller]("BufferStrUnmarshaller") {
  cellConfig := new BufferStringUnmarshallerComp.Config {}
  nodeCount := 2
  route[String] inBatchesOf 500 to "DataExtractor"
  route[Headers] to Nowhere
  route[IndexedStartOfByteBuffer, IndexedEndOfByteBuffer, IndexedByteBuffer] to Next
}

cell[PartialBufferStringMatcher]("PartialBufferMatcher") {
  cellConfig := new PartialBufferStringMatcherComp.Config {}

  route[String] to Next
}

Partial Buffer String Matcher

Definition

case class PartialBufferStringMatcher() extends Cell[Config]

Description

This component will most likely be used in conjunction with BufferStringUnmarshallerComp and allows concurrency on unmarshallers upstream by combining the bits at the end / beginning of buffers that could not otherwise be
decoded. Component upstream can decode the middle parts of indexed bytebuffers in parallel and send here the parts they cant work out such as the beginning or ends or full buffers that contain no known delimiters.
This component will use continuous sequence chains to determine how to combine them and decode the strings contained in them and sent them downstream.
This component is not parallelizable. Therefore nodeCount must be 1

Contract

@CellContract(
  Receives[IndexedStartOfByteBuffer], Receives[IndexedEndOfByteBuffer], Receives[IndexedByteBuffer],
  Sends[String]
)

Configuration

trait Config extends CellCfg {
  def charset: String     = "UTF8" // charset the bytes are encoded with
  def charBufferSize: Int = 16384  // buffer to decode chars from bytebuffers
}

Usage

Example of usage within a Reactor paired with a BufferStringUnmarshaller

cell[BufferStringUnmarshaller]("BufferStrUnmarshaller") {
  cellConfig := new BufferStringUnmarshallerComp.Config {}
  nodeCount := 2
  route[String] inBatchesOf 500 to "DataExtractor"
  route[Headers] to Nowhere
  route[IndexedStartOfByteBuffer, IndexedEndOfByteBuffer, IndexedByteBuffer] to Next
}

cell[PartialBufferStringMatcher]("PartialBufferMatcher") {
  cellConfig := new PartialBufferStringMatcherComp.Config {}

  route[String] to Next
}
Table of Contents