AWS Kinesis Plugin

The plugin provides functionality to interact with Amazon Kinesis.

Installation

  1. Copy the below line to dependencies section of the project build.gradle file

    Example 1. build.gradle
    implementation(group: 'org.vividus', name: 'vividus-plugin-aws-kinesis', version: '0.5.3')
  2. If the project was imported to the IDE before adding new plugin, re-generate the configuration files for the used IDE and then refresh the project in the used IDE.

Configuration

Authentication

The plugin attempts to find AWS credentials by using the default credential provider chain. The provider chain looks for credentials using the provided below options one by one starting from the top. If credentials are found at some point, the search stops and further options are not evaluated.

  1. Environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (the optional variable for session token is AWS_SESSION_TOKEN).

  2. The properties: system.aws.accessKeyId and system.aws.secretKey (the optional property for session token is system.aws.sessionToken).

  3. Web Identity Token credentials from the environment or container.

  4. In the default credentials file (the location of this file varies by platform).

  5. Credentials delivered through the Amazon EC2 container service if the AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and security manager has permission to access the variable.

  6. In the instance profile credentials, which exist within the instance metadata associated with the IAM role for the EC2 instance. This step is available only when running your application on an Amazon EC2 instance, but provides the greatest ease of use and best security when working with Amazon EC2 instances.

  7. If the plugin still hasn’t found credentials by this point, client creation fails with an exception.

See the official "Working with AWS Credentials" guide to get more details.

Region Selection

The plugin attempts to find AWS region by using the default region provider chain. The provider chain looks for a region using the provided below options one by one starting from the top. If region is found at some point, the search stops and further options are not evaluated.

  1. Environment variable: AWS_REGION.

  2. The property: system.aws.region.

  3. AWS shared configuration file (usually located at ~/.aws/config).

  4. Use the Amazon EC2 instance metadata service to determine the region of the currently running Amazon EC2 instance.

  5. If the plugin still hasn’t found a region by this point, client creation fails with an exception.

See the official "AWS Region Selection" guide to get more details.

Producer

Steps

Put the record

Write a single data record into an Amazon Kinesis data stream. You must specify the name of the stream that captures, stores, and transports the data; a partition key; and the data blob itself. The partition key is used by Kinesis Data Streams to distribute data across shards. Kinesis Data Streams segregates the data records that belong to a stream into multiple shards, using the partition key associated with each data record to determine the shard to which a given data record belongs. Partition keys are Unicode strings, with a maximum length limit of 256 characters for each key. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards. The step logs the shard ID of where the data record was placed and the sequence number that was assigned to the data record. Sequence numbers increase over time and are specific to a shard within a stream, not across all shards within a stream. After you write a record to a stream, you cannot modify that record or its order within the stream.

When I put record `$data` with partition key `$partitionKey` to Kinesis stream `$streamName`
  • $data - The data blob to put into the record.

  • $partitionKey - The partition key determining which shard in the stream the data record is assigned to.

  • $streamName - The name of the Amazon Kinesis data stream to put the data record into.

Examples

Example 2. Put the record to the Amazon Kinesis data stream
When I put record `Hello from Vividus!` with partition key `hello` to Kinesis stream `vividus-data-stream`

Consumer

Steps

Start the consumer

Create Amazon Kinesis shard iterators. A shard iterator expires 5 minutes after it is returned to the requester. A shard iterator specifies the shard position from which to start reading data records sequentially.

When I start consuming records from Kinesis stream `$streamName`
  • $streamName - The name of the Amazon Kinesis data stream.

Drain the consumed records

Get data records from a Kinesis data stream’s shard and drain the consumed records to the specified variable. The shard iterator created at step When I start consuming records from Kinesis stream \$streamName\`` specifies the position in the shard from which you want to start reading data records sequentially. If there are no records available in the portion of the shard that the iterator points to, an empty list of records is saved. Each draining moves the iterator to the position next after the last consumed record.

When I drain consumed Kinesis records to $scopes variable `$variableName`

Examples

Example 3. Consume records from the Amazon Kinesis data stream
When I start consuming records from Kinesis stream `vividus-data-stream`
!-- Perform any actions putting the records to the Kinesis stream
When I drain consumed Kinesis records to scenario variable `consumed-records`
Then `${consumed-records[0]}` is equal to `Hello from Vividus!`