.
-
Getting Started
-
Archipelago
-
Standard component library
Marshalling
Buffer String Unmarshaller
Definition
case class BufferStringUnmarshaller() extends Cell[Config]
Description
This component receives indexed byte buffers from a data source and interprets their content as a sequence of delimited strings codified with a certain charset.
The component will try to decode and send out the lines contained in every block of bytes. However for multi-byte characters it could be possible that a character has been split at the block boundary and therefore the beginning and end portions of the byte buffer must be treated specially. This is delegated to another component downstream.
Therefore the component sends out not only lines but also IndexedStartByteBuffer and IndexedEndByteBuffer plus the full received IndexedByteBuffer in case it can’t match a single delimiter within a byte buffer.
If the headers flag is set to true, then the first non-comment line will be wrapped in a Headers object and that will be send out. Otherwise all lines will be send out normally.
Contract
@CellContract(
Receives[IndexedByteBuffer],
Sends[String], Sends[Headers],
Sends[IndexedStartOfByteBuffer], Sends[IndexedEndOfByteBuffer], Sends[IndexedByteBuffer]
)
Configuration
trait Config extends CellCfg {
def lineDelimiter: String = "\n" // Line delimiter sequence to look up within the block
def charset: String = "UTF8" // Charset used to decode the bytes
def maxLineLength: Int = 16535 // Max line lenght allowed
def headers: Boolean = false // Set this to true if the first line are headers and you want it wrapped into a Headers class
}
Usage
Example of usage within a Reactor paired with a PartialBufferStringMatcher. Please note that as we do not make use of Headers the definition discards those messages (which will never be sent anyway)
cell[BufferStringUnmarshaller]("BufferStrUnmarshaller") {
cellConfig := new BufferStringUnmarshallerComp.Config {}
nodeCount := 2
route[String] inBatchesOf 500 to "DataExtractor"
route[Headers] to Nowhere
route[IndexedStartOfByteBuffer, IndexedEndOfByteBuffer, IndexedByteBuffer] to Next
}
cell[PartialBufferStringMatcher]("PartialBufferMatcher") {
cellConfig := new PartialBufferStringMatcherComp.Config {}
route[String] to Next
}
Partial Buffer String Matcher
Definition
case class PartialBufferStringMatcher() extends Cell[Config]
Description
This component will most likely be used in conjunction with BufferStringUnmarshallerComp and allows concurrency on unmarshallers upstream by combining the bits at the end / beginning of buffers that could not otherwise be
decoded. Component upstream can decode the middle parts of indexed bytebuffers in parallel and send here the parts they cant work out such as the beginning or ends or full buffers that contain no known delimiters.
This component will use continuous sequence chains to determine how to combine them and decode the strings contained in them and sent them downstream.
This component is not parallelizable. Therefore nodeCount must be 1
Contract
@CellContract(
Receives[IndexedStartOfByteBuffer], Receives[IndexedEndOfByteBuffer], Receives[IndexedByteBuffer],
Sends[String]
)
Configuration
trait Config extends CellCfg {
def charset: String = "UTF8" // charset the bytes are encoded with
def charBufferSize: Int = 16384 // buffer to decode chars from bytebuffers
}
Usage
Example of usage within a Reactor paired with a BufferStringUnmarshaller
cell[BufferStringUnmarshaller]("BufferStrUnmarshaller") {
cellConfig := new BufferStringUnmarshallerComp.Config {}
nodeCount := 2
route[String] inBatchesOf 500 to "DataExtractor"
route[Headers] to Nowhere
route[IndexedStartOfByteBuffer, IndexedEndOfByteBuffer, IndexedByteBuffer] to Next
}
cell[PartialBufferStringMatcher]("PartialBufferMatcher") {
cellConfig := new PartialBufferStringMatcherComp.Config {}
route[String] to Next
}