Kafka Plugin
The plugin provides integration with an open-source distributed event streaming platform Apache Kafka.
Installation
-
Copy the below line to
dependencies
section of the projectbuild.gradle
fileExample 1. build.gradleimplementation(group: 'org.vividus', name: 'vividus-plugin-kafka', version: '0.5.2')
-
If the project was imported to the IDE before adding new plugin, re-generate the configuration files for the used IDE and then refresh the project in the used IDE.
Producer
Properties
The properties marked with bold are mandatory. |
Property Name | Acceptable values | Default | Description |
---|---|---|---|
|
|
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. |
All other optional producer configs can be set by prefixing regular Kafka properties with kafka.producer.<producer-key>
.
Where <producer-key>
is the key of the producer configuration which should be used as steps parameter.
Consumer
Properties
The properties marked with bold are mandatory. |
Property Name | Acceptable values | Default | Description |
---|---|---|---|
|
|
A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. |
|
|
|
A unique string that identifies the consumer group this consumer belongs to. |
All other optional consumer configs can be set by prefixing regular Kafka properties with kafka.consumer.<consumer-key>.
.
Where <consumer-key>
is the key of the consumer configuration which should be used as steps parameter.
Steps
Start the consumer
Starts the Kafka consumer with the provided configuration to listen the specified topics. The consumer must be stopped when it’s not needed.
When I start consuming messages from `$consumerKey` Kafka topics `$topics`
-
$consumerKey
- the key of the Kafka consumer configuration -
$topics
- the comma-separated set of topics to listen
Drain/Peek the consumed messages
Drains/Peeks the consumed messaged to the specified variable. If the consumer is not stopped, the new messages might arrive after the draining. If the consumer is stopped, all the messages received from the consumer start or after the last draining operation are stored to the variable.
When I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName`
-
$queueOperation
-DRAIN
- saves the messages consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed message,PEEK
- saves the messages consumed since the last drain or from the consumption start and doesn’t change the consumer cursor position -
$consumerKey
- the key of the Kafka consumer configuration -
$variableName
- the variable name to store the messages. The messages are accessible via zero-based index, e.g.${my-var[0]}
will return the first received message.
Wait for the messages
Waits until the count of the consumed messaged (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded.
When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka messages is $comparisonRule `$expectedCount`
-
$timeout
- the maximum time to wait for the messages in ISO-8601 format -
$consumerKey
- the key of the Kafka consumer configuration -
$comparisonRule
- the comparison rule -
$expectedCount
- the expected count of the messages to be matched by the rule
Stop the consumer
Stops the Kafka consumer started by the corresponding step before. All recorded messages are kept and can be drained into the variable using the step described above.
When I stop consuming messages from `$consumerKey` Kafka
-
$consumerKey
- the key of the Kafka consumer configuration
Examples
When I start consuming messages from `dev` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I wait with `PT30S` timeout until count of consumed `dev` Kafka messages is greater than `1`
When I stop consuming messages from `dev` Kafka
When I drain consumed Kafka messages to scenario variable `consumed-messages`
Then `${consumed-messages[0]}` is equal to `some-expected-message`
When I start consuming messages from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-X`
!-- Perform more actions triggering the publishing of messages to Kafka
When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-Y`
When I stop consuming messages from `prod` Kafka
When I start consuming messages from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-X`
!-- Perform more actions triggering the publishing of messages to Kafka
When I peek consumed `prod` Kafka messages to scenario variable `messages-after-action-Y`
When I stop consuming messages from `prod` Kafka