.

Imprimir

I/O Components

Directory Watcher Component

Definition

case class DirectoryWatcher() extends Cell[Config] with ServiceSupport

Description

This component allows to watch several directories and sends out file events or notifies the request owner so it can orchestrate operations based on the events on the directory under supervision. To start a directory supervision, clients of this component must send a DirWatchRequest containing the path to supervise, the list of types of event to look for (See Watch Types below). If the DirWatchRequest is sent wrapped by a Tagged instance then the output FileEvent messages will also be wrapped in Tagged instances contain the same tag included in the request.

Watch Types:

    • Create: A new entry has been created within the directory.
    • Delete: An entry has been deleted from the directory.
    • Modify: An entry has been modified within the directory.
    • Overflow: An event might have been missed.
    • All: All of the above.

Contract

@CellContract(
  Receives[Tagged[DirWatchRequest]], Receives[DirWatchRequest], Receives[FileEvent],
  Sends[FileEvent], Sends[Tagged[FileEvent]], Sends[Ack]
)

Configuration

def pollInterval: Long = 2  // Defines the interval between polling in seconds. Defaults to 2

File Loader

Definition

case class FileLoader() extends Cell[Config]

Description

A multi-op component which loads data from files in accessible filesystems. The component receives load requests which result in context objects being created to track the state and progress of the associated load operations. The current active contexts are processed concurrently by a configurable pool of loaders which independently read successive contiguous chunks of bytes from the file into byte buffers which are dispatched via the output links (tagged or untagged).

Contract

@CellContract(
  Receives[FileLoadRequest], Receives[Tagged[FileLoadRequest]],
  Sends[IndexedByteBuffer],  Sends[Tagged[IndexedByteBuffer]]
)

Configuration

def bufferSize:          Int          = 32712       // Sets the data buffer max size        
def loaderCount:         Int          = 1           // Sets the number of loaders that can operate concurrently
override def maxLiveOps: Option[Int]  = Some(1)     // Sets the max concurrent operation limit
override def opMode:     OpMode       = MultiOpMode // Sets the operational mode for the component

Usage

cell[FileLoader]("FileLoader") {
    cellConfig() := new FileLoaderComp.Config {override def bufferSize: Int = 65536}
    route[IndexedByteBuffer] to Next
    route[Tagged[IndexedByteBuffer]] to Nowhere
}

File Writer

Definition

case class FileWriter() extends Cell[Config]

Description

A Multi-op component which writes received data to a file specified by write requests. The received data is wrapped in the DataWriteInstruction type which identifies the associated operation and delivers a ByteBuffer containing the data. The write operation ends when an EndData signal is received for the operation. Data is written to the target file in the order the DataWriteInstruction messages are received.

Contract

  @CellContract(Receives[FileWriteRequest], Receives[DataWriteInstruction])

Configuration

    def bufferSize:       Int    = 16384        // Sets the outbound buffer max size
    def charset:          String = "UTF-8"      // Sets the charset to use
    override val opMode:  OpMode = MultiOpMode  // Sets the OpMode for the component

Usage

cell[FileWriter] {
  cellConfig := new FileWriterComp.Config {
    override def bufferSize: Int = 65536
  }
}

HTTP Server Component

Definition

class HttpServer extends Cell[Config]

Description

This component provides a simple asynchronous HTTP request / response behaviour.

External HTTP requests arrive as the HttpRequest type which contains the HTTP method, the requested path and headers. The incoming requests are recorded and converted to the Archipelago internal type WebRequest which is sent via the configured links to connected Archipelago components, the internal responses to WebRequests are received as one or more WebResponse objects which are returned externally to the client via the HTTP subsystem as HttpResponse instances.

The server component can respond with complete or chunked HTTP responses.

Contract

@CellContract(Sends[Tagged[WebRequest]], Receives[WebResponse[String]])

Configuration

def host: String = "localhost"                         // Server hostname 
def port: Int = 8888                                   // Server port 
def isSecure: Boolean = false                          // Security enabled 
def keyStorePath: String = ""                          // Keystore path 
def keyStorePassword: String = ""                      // Keystore password 
def requestLimit: Int = 100                            // Maximum concurrent requests
def tooManyRequestsText: Any => String = ...           // Overrideable response text
def pathExpressions: List[(Regex, Id)] = List(AnyPath) // Paths

Usage

cell[HttpServer]("HttpServer") {
  cellConfig := new Config {
    override val isSecure: Boolean = true
    override val keyStorePath: String = "data/httpServerTest/keyStore.jks"
    override val keyStorePassword: String = "password"
  }
}

Rabbit MQ Queue Publisher

Definition

class RMQQueuePublisher() extends Cell[Config]

Description

A simple multi-op queue publisher for Rabbit MQ. The component receives the QueuePubRequest type which establishes a channel for the request and awaits QueuePublication objects wrapping a message in byte array form for the indicated operation.

The data is dispatched to the associated channel as an AMQP publication. The channel is closed on receipt of an EndOp signal for the associated operation.

Contract

@CellContract(Receives[QueuePubRequest], Receives[Array[Byte]])

Configuration

override val opMode:     OpMode      = MultiOpMode // Declares a multi-op component
override def maxLiveOps: Option[Int] = Some(1)     // Sets the max number of simultaneous ops 

Usage

cell[RMQQueuePublisher] extends Cell[Config]

Rabbit MQ Queue Subscriber

Definition

class RMQQueueSubscriber() extends Cell[Config]

Description

A simple multi-op component to create a Rabbit MQ subscription.

The component receives QueueSubRequest objects which are recorded and trigger the local declaration of the indicated queue, if necessary, and the creation of a consumer for the queue. Received data via the consumer is transmitted via the associated link for the operation. Data is transmitted as byte arrays.

The component(s) associated with the subscription asynchronously Ack or Nak the received data which is relayed back to the Rabbit MQ subsystem.

Receiving an EndOp or CancelOp closes the subscription and completes the associated operation.

Contract

@CellContract(Receives[QueueSubRequest], Sends[Array[Byte]])

Configuration

override val opMode:     OpMode      = MultiOpMode // Declares a multi-op component
override def maxLiveOps: Option[Int] = Some(1)     // Sets the max number of simultaneous ops 

Usage

cell[RMQQueueSubscriber] extends Cell[Config]

String Channel Writer

Definition

class StringChannelWriter() extends Cell[Config]

Description

Component which writes Strings into a byte “channel”. This component decodes into bytes a batch of strings adding a configurable delimiter to separate them and then generates a DataWriteInstruction. The channel in this context is the operation against the DataWriteInstruction is constructed. This component uses either the current operation if the batch does not carry a tag or looks up an operation which corresponds to a service with the same name of the batch tag.
Therefore, if a tag is given to the batch, it is expected to be a service id for some current operation in the context in which this component is running.

This mechanism allows to use this component as a “channel” selector, usually connected to a multi operation writer component such as a FileWriter which requires an operation to discern to which file data is going to be written.

Contract

@CellContract(Receives[BatchOf[String]], Sends[DataWriteInstruction])

Configuration

def charset:       String = "UTF8"  // Charset used to decode strings into bytes
def lineDelimiter: String = "\n"    // Sequence of characters representing a new line delimiter
def bufferSize: Int       = 32768   // Buffer size in bytes

Usage

cell[StringChannelWriter] {
  cellConfig := new StringChannelWriterComp.Config {}
  route[DataWriteInstruction] to Next
}
Table of Contents