AWS Kinesis Plugin
The plugin provides functionality to interact with Amazon Kinesis.
Installation
-
Copy the below line to
dependencies
section of the projectbuild.gradle
fileExample 1. build.gradleimplementation(group: 'org.vividus', name: 'vividus-plugin-aws-kinesis', version: '0.5.4')
gradle -
If the project was imported to the IDE before adding new plugin, re-generate the configuration files for the used IDE and then refresh the project in the used IDE.
Configuration
Authentication
The plugin attempts to find AWS credentials by using the default credential provider chain. The provider chain looks for credentials using the provided below options one by one starting from the top. If credentials are found at some point, the search stops and further options are not evaluated.
-
Environment variables:
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
(the optional variable for session token isAWS_SESSION_TOKEN
). -
The properties:
system.aws.accessKeyId
andsystem.aws.secretKey
(the optional property for session token issystem.aws.sessionToken
). -
Web Identity Token credentials from the environment or container.
-
In the default credentials file (the location of this file varies by platform).
-
Credentials delivered through the Amazon EC2 container service if the
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
environment variable is set and security manager has permission to access the variable. -
In the instance profile credentials, which exist within the instance metadata associated with the IAM role for the EC2 instance. This step is available only when running your application on an Amazon EC2 instance, but provides the greatest ease of use and best security when working with Amazon EC2 instances.
-
If the plugin still hasn’t found credentials by this point, client creation fails with an exception.
See the official "Working with AWS Credentials" guide to get more details.
Region Selection
The plugin attempts to find AWS region by using the default region provider chain. The provider chain looks for a region using the provided below options one by one starting from the top. If region is found at some point, the search stops and further options are not evaluated.
-
Environment variable:
AWS_REGION
. -
The property:
system.aws.region
. -
AWS shared configuration file (usually located at
~/.aws/config
). -
Use the Amazon EC2 instance metadata service to determine the region of the currently running Amazon EC2 instance.
-
If the plugin still hasn’t found a region by this point, client creation fails with an exception.
See the official "AWS Region Selection" guide to get more details.
Producer
Steps
Put the record
Write a single data record into an Amazon Kinesis data stream. You must specify the name of the stream that captures, stores, and transports the data; a partition key; and the data blob itself. The partition key is used by Kinesis Data Streams to distribute data across shards. Kinesis Data Streams segregates the data records that belong to a stream into multiple shards, using the partition key associated with each data record to determine the shard to which a given data record belongs. Partition keys are Unicode strings, with a maximum length limit of 256 characters for each key. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards. The step logs the shard ID of where the data record was placed and the sequence number that was assigned to the data record. Sequence numbers increase over time and are specific to a shard within a stream, not across all shards within a stream. After you write a record to a stream, you cannot modify that record or its order within the stream.
When I put record `$data` with partition key `$partitionKey` to Kinesis stream `$streamName`
-
$data
- The data blob to put into the record. -
$partitionKey
- The partition key determining which shard in the stream the data record is assigned to. -
$streamName
- The name of the Amazon Kinesis data stream to put the data record into.
Consumer
Steps
Start the consumer
Create Amazon Kinesis shard iterators. A shard iterator expires 5 minutes after it is returned to the requester. A shard iterator specifies the shard position from which to start reading data records sequentially.
When I start consuming records from Kinesis stream `$streamName`
-
$streamName
- The name of the Amazon Kinesis data stream.
Drain the consumed records
Get data records from a Kinesis data stream’s shard and drain the consumed records to the specified variable. The shard iterator created at step When I start consuming records from Kinesis stream \
$streamName\`` specifies the position in the shard from which you want to start reading data records sequentially. If there are no records available in the portion of the shard that the iterator points to, an empty list of records is saved. Each draining moves the iterator to the position next after the last consumed record.
When I drain consumed Kinesis records to $scopes variable `$variableName`
-
$variableName
- The variable name to store the records. The records are accessible via zero-based index, e.g.${my-var[0]}
will return the first received record.
Examples
When I start consuming records from Kinesis stream `vividus-data-stream`
!-- Perform any actions putting the records to the Kinesis stream
When I drain consumed Kinesis records to scenario variable `consumed-records`
Then `${consumed-records[0]}` is equal to `Hello from Vividus!`