Kafka Plugin

The plugin provides integration with an open-source distributed event streaming platform Apache Kafka.

Installation

Example 1. build.gradle
implementation(group: 'org.vividus', name: 'vividus-plugin-kafka', version: '0.2.12')

Producer

Properties

The properties marked with bold are mandatory.
Property Name Acceptable values Default Description

kafka.producer.bootstrap.servers

<host>:<port>

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

All other optional producer configs can be set by prefixing regular Kafka properties with kafka.producer..

Steps

Send the data

Sends the data to the provided topic with no key or partition.

When I send data `$data` to Kafka topic `$topic`
  • $data - the data to send

  • $topic - the topic name

Examples

Example 2. Send the data to the Kafka topic
When I send data `my-data` to Kafka topic `my-topic`

Consumer

Properties

The properties marked with bold are mandatory.
Property Name Acceptable values Default Description

kafka.consumer.bootstrap.servers

<host>:<port>

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

kafka.consumer.group.id

<any-string>

A unique string that identifies the consumer group this consumer belongs to.

All other optional consumer configs can be set by prefixing regular Kafka properties with kafka.consumer..

Steps

Start the consumer

Starts the Kafka consumer with the provided configuration to listen the specified topics. The consumer must be stopped when it’s not needed.

When I start consuming messages from Kafka topics `$topics`
  • $topics - the comma-separated set of topics to listen

Drain the consumed messages

Drains the consumed messaged to the specified variable. If the consumer is not stopped, the new messages might arrive after the draining. If the consumer is stopped, all the messages received from the consumer start or after the last draining operation are stored to the variable.

When I drain consumed Kafka messages to $scopes variable `$variableName`

Wait for the messages

Waits until the count of the consumed messaged (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded.

When I wait with `$timeout` timeout until count of consumed Kafka messages is $comparisonRule `$expectedCount`
  • $timeout - the maximum time to wait for the messages in ISO-8601 format

  • $comparisonRule - the comparison rule

  • $expectedCount - the expected count of the messages to be matched by the rule

Stop the consumer

Stops the Kafka consumer started by the corresponding step before. All recorded messages are kept and can be drained into the variable using the step described above.

When I stop consuming messages from Kafka

Examples

Example 3. Consume messages from the Kafka topic
When I start consuming messages from Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I wait with `PT30S` timeout until count of consumed Kafka messages is greater than `1`
When I stop consuming messages from Kafka
When I drain consumed Kafka messages to scenario variable `consumed-messages`
Then `${consumed-messages[0]}` is equal to `some-expected-message`
Example 4. Drain messages while listener is rinning
When I start consuming messages from Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I drain consumed Kafka messages to scenario variable `messages-after-action-X`
!-- Perform more actions triggering the publishing of messages to Kafka
When I drain consumed Kafka messages to scenario variable `messages-after-action-Y`
When I stop consuming messages from Kafka