Kafka Plugin
The plugin provides integration with an open-source distributed event streaming platform Apache Kafka.
Installation
-
Copy the below line to
dependencies
section of the projectbuild.gradle
filePlease make sure to use the same version for all VIVIDUS dependencies. Example 1. build.gradleimplementation(group: 'org.vividus', name: 'vividus-plugin-kafka', version: '0.6.0')
-
If the project was imported to the IDE before adding new dependency, re-generate the configuration files for the used IDE and then refresh the project in the used IDE.
Producer
Properties
The properties marked with bold are mandatory. |
Property Name | Acceptable values | Default | Description |
---|---|---|---|
|
|
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. |
All other optional producer configs can be set by prefixing regular Kafka properties with kafka.producer.<producer-key>
.
Where <producer-key>
is the key of the producer configuration which should be used as steps parameter.
Examples
When I send event with value `my-data` to `dev` Kafka topic `my-topic`
Send event with key and value
Sends the event with the key and the value to the provided topic. Events with the same event key are written to the [same partition](https://kafka.apache.org/documentation/#intro_concepts_and_terms), and Kafka guarantees that any consumer of a given topic-partition will always read that partition’s events in exactly the same order as they were written.
When I send event with key `$key` and value `$value` to `$producerKey` Kafka topic `$topic`
-
$key
- The event key, all the events with the same event key are written to the same partition. -
$value
- The event value. -
$producerKey
- The key of Kafka producer configuration. -
$topic
- The topic name.
Consumer
Properties
The properties marked with bold are mandatory. |
Property Name | Acceptable values | Default | Description |
---|---|---|---|
|
|
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. |
|
|
|
A unique string that identifies the consumer group this consumer belongs to. |
All other optional consumer configs can be set by prefixing regular Kafka properties with kafka.consumer.<consumer-key>.
.
Where <consumer-key>
is the key of the consumer configuration which should be used as steps parameter.
Steps
Start the consumer
Starts the Kafka consumer with the provided configuration to listen the specified topics. The consumer must be stopped when it’s not needed.
When I start consuming events from `$consumerKey` Kafka topics `$topics`
-
$consumerKey
- The key of the Kafka consumer configuration. -
$topics
- The comma-separated set of topics to listen.
Drain/Peek the consumed events
Drains/Peeks the consumed events to the specified variable. If the consumer is not stopped, the new events might arrive after the draining. If the consumer is stopped, all the events received from the consumer start or after the last draining operation are stored to the variable.
When I $queueOperation consumed `$consumerKey` Kafka events to $scopes variable `$variableName`
-
$queueOperation
-DRAIN
- saves the events consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed event,PEEK
- saves the events consumed since the last drain or from the consumption start and doesn’t change the consumer cursor position -
$consumerKey
- The key of the Kafka consumer configuration. -
$variableName
- The variable name to store the events. The events are accessible via zero-based index, e.g.${my-var[0]}
will return the first received event.
Wait for the events
Waits until the count of the consumed events (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded.
When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka events is $comparisonRule `$expectedCount`
-
$timeout
- The maximum time to wait for the events in ISO-8601 Durations format. -
$consumerKey
- The key of the Kafka consumer configuration. -
$comparisonRule
- The comparison rule. -
$expectedCount
- The expected count of the events to be matched by the rule.
Examples
When I start consuming events from `dev` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of events to Kafka
When I wait with `PT30S` timeout until count of consumed `dev` Kafka events is greater than `1`
When I stop consuming events from `dev` Kafka
When I drain consumed Kafka events to scenario variable `consumed-events`
Then `${consumed-events[0]}` is equal to `some-expected-event`
When I start consuming events from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of events to Kafka
When I drain consumed `prod` Kafka events to scenario variable `events-after-action-X`
!-- Perform more actions triggering the publishing of events to Kafka
When I drain consumed `prod` Kafka events to scenario variable `events-after-action-Y`
When I stop consuming events from `prod` Kafka
When I start consuming events from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of events to Kafka
When I drain consumed `prod` Kafka events to scenario variable `events-after-action-X`
!-- Perform more actions triggering the publishing of events to Kafka
When I peek consumed `prod` Kafka events to scenario variable `events-after-action-Y`
When I stop consuming events from `prod` Kafka