Kafka Plugin

The plugin provides integration with an open-source distributed event streaming platform Apache Kafka.

Installation

Example 1. build.gradle
implementation(group: 'org.vividus', name: 'vividus-plugin-kafka', version: '0.5.0')

Producer

Properties

The properties marked with bold are mandatory.
Property Name Acceptable values Default Description

kafka.producer.<producer-key>.bootstrap.servers

<host>:<port>

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

All other optional producer configs can be set by prefixing regular Kafka properties with kafka.producer.<producer-key>. Where <producer-key> is the key of the producer configuration which should be used as steps parameter.

Steps

Send the data

Sends the data to the provided topic with no key or partition.

When I send data `$data` to `$producerKey` Kafka topic `$topic`
  • $producerKey - the key of Kafka producer configuration

  • $data - the data to send

  • $topic - the topic name

Examples

Example 2. Send the data to the Kafka topic
When I send data `my-data` to `dev` Kafka topic `my-topic`

Consumer

Properties

The properties marked with bold are mandatory.
Property Name Acceptable values Default Description

kafka.consumer.<consumer-key>.bootstrap.servers

<host>:<port>

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

kafka.consumer.<consumer-key>.group.id

<any-string>

A unique string that identifies the consumer group this consumer belongs to.

All other optional consumer configs can be set by prefixing regular Kafka properties with kafka.consumer.<consumer-key>.. Where <consumer-key> is the key of the consumer configuration which should be used as steps parameter.

Steps

Start the consumer

Starts the Kafka consumer with the provided configuration to listen the specified topics. The consumer must be stopped when it’s not needed.

When I start consuming messages from `$consumerKey` Kafka topics `$topics`
  • $consumerKey - the key of the Kafka consumer configuration

  • $topics - the comma-separated set of topics to listen

Drain/Peek the consumed messages

Drains/Peeks the consumed messaged to the specified variable. If the consumer is not stopped, the new messages might arrive after the draining. If the consumer is stopped, all the messages received from the consumer start or after the last draining operation are stored to the variable.

When I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName`
  • $queueOperation - DRAIN - saves the messages consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed message, PEEK - saves the messages consumed since the last drain or from the consumption start and doesn’t change the consumer cursor position

  • $consumerKey - the key of the Kafka consumer configuration

  • $scopes - The comma-separated set of the variables scopes.

  • $variableName - the variable name to store the messages. The messages are accessible via zero-based index, e.g. ${my-var[0]} will return the first received message.

Wait for the messages

Waits until the count of the consumed messaged (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded.

When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka messages is $comparisonRule `$expectedCount`
  • $timeout - the maximum time to wait for the messages in ISO-8601 format

  • $consumerKey - the key of the Kafka consumer configuration

  • $comparisonRule - the comparison rule

  • $expectedCount - the expected count of the messages to be matched by the rule

Stop the consumer

Stops the Kafka consumer started by the corresponding step before. All recorded messages are kept and can be drained into the variable using the step described above.

When I stop consuming messages from `$consumerKey` Kafka
  • $consumerKey - the key of the Kafka consumer configuration

Examples

Example 3. Consume messages from the Kafka topic
When I start consuming messages from `dev` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I wait with `PT30S` timeout until count of consumed `dev` Kafka messages is greater than `1`
When I stop consuming messages from `dev` Kafka
When I drain consumed Kafka messages to scenario variable `consumed-messages`
Then `${consumed-messages[0]}` is equal to `some-expected-message`
Example 4. Drain messages while listener is rinning
When I start consuming messages from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-X`
!-- Perform more actions triggering the publishing of messages to Kafka
When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-Y`
When I stop consuming messages from `prod` Kafka
Example 5. Peek messages while listener is rinning
When I start consuming messages from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of messages to Kafka
When I drain consumed `prod` Kafka messages to scenario variable `messages-after-action-X`
!-- Perform more actions triggering the publishing of messages to Kafka
When I peek consumed `prod` Kafka messages to scenario variable `messages-after-action-Y`
When I stop consuming messages from `prod` Kafka