Kafka Plugin

The plugin provides integration with an open-source distributed event streaming platform Apache Kafka.

Installation

  1. Copy the below line to dependencies section of the project build.gradle file

    Example 1. build.gradle
    implementation(group: 'org.vividus', name: 'vividus-plugin-kafka', version: '0.5.12')
  2. If the project was imported to the IDE before adding new dependency, re-generate the configuration files for the used IDE and then refresh the project in the used IDE.

Producer

Properties

The properties marked with bold are mandatory.
Property Name Acceptable values Default Description

kafka.producer.<producer-key>.bootstrap.servers

<host>:<port>

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

All other optional producer configs can be set by prefixing regular Kafka properties with kafka.producer.<producer-key>. Where <producer-key> is the key of the producer configuration which should be used as steps parameter.

Steps

Send event with value

Sends the event with the value to the provided topic with no key or partition.

When I send event with value `$value` to `$producerKey` Kafka topic `$topic`

Deprecated syntax (will be removed in VIVIDUS 0.6.0):

When I send data `$data` to `$producerKey` Kafka topic `$topic`
  • $value - The event value.

  • $producerKey - The key of Kafka producer configuration.

  • $topic - The topic name.

Examples

Example 2. Send the event to the Kafka topic
When I send event with value `my-data` to `dev` Kafka topic `my-topic`

Send event with key and value

Sends the event with the key and the value to the provided topic. Events with the same event key are written to the [same partition](https://kafka.apache.org/documentation/#intro_concepts_and_terms), and Kafka guarantees that any consumer of a given topic-partition will always read that partition’s events in exactly the same order as they were written.

When I send event with key `$key` and value `$value` to `$producerKey` Kafka topic `$topic`
  • $key - The event key, all the events with the same event key are written to the same partition.

  • $value - The event value.

  • $producerKey - The key of Kafka producer configuration.

  • $topic - The topic name.

Examples

Example 3. Send the event to the Kafka topic
When I send event with key `passenger car` and value `BMW x7` to `dev` Kafka topic `cars`

Consumer

Properties

The properties marked with bold are mandatory.
Property Name Acceptable values Default Description

kafka.consumer.<consumer-key>.bootstrap.servers

<host>:<port>

A comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself.

kafka.consumer.<consumer-key>.group.id

<any-string>

A unique string that identifies the consumer group this consumer belongs to.

All other optional consumer configs can be set by prefixing regular Kafka properties with kafka.consumer.<consumer-key>.. Where <consumer-key> is the key of the consumer configuration which should be used as steps parameter.

Steps

Start the consumer

Starts the Kafka consumer with the provided configuration to listen the specified topics. The consumer must be stopped when it’s not needed.

When I start consuming events from `$consumerKey` Kafka topics `$topics`

Deprecated syntax (will be removed in VIVIDUS 0.6.0):

When I start consuming messages from `$consumerKey` Kafka topics `$topics`
  • $consumerKey - The key of the Kafka consumer configuration.

  • $topics - The comma-separated set of topics to listen.

Drain/Peek the consumed events

Drains/Peeks the consumed events to the specified variable. If the consumer is not stopped, the new events might arrive after the draining. If the consumer is stopped, all the events received from the consumer start or after the last draining operation are stored to the variable.

When I $queueOperation consumed `$consumerKey` Kafka events to $scopes variable `$variableName`

Deprecated syntax (will be removed in VIVIDUS 0.6.0):

When I $queueOperation consumed `$consumerKey` Kafka messages to $scopes variable `$variableName`
  • $queueOperation - DRAIN - saves the events consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed event, PEEK - saves the events consumed since the last drain or from the consumption start and doesn’t change the consumer cursor position

  • $consumerKey - The key of the Kafka consumer configuration.

  • $scopes - The comma-separated set of the variables scopes.

  • $variableName - The variable name to store the events. The events are accessible via zero-based index, e.g. ${my-var[0]} will return the first received event.

Wait for the events

Waits until the count of the consumed events (from the consumer start or after the last draining operation) matches to the rule or until the timeout is exceeded.

When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka events is $comparisonRule `$expectedCount`

Deprecated syntax (will be removed in VIVIDUS 0.6.0):

When I wait with `$timeout` timeout until count of consumed `$consumerKey` Kafka messages is $comparisonRule `$expectedCount`
  • $timeout - The maximum time to wait for the events in ISO-8601 Durations format.

  • $consumerKey - The key of the Kafka consumer configuration.

  • $comparisonRule - The comparison rule.

  • $expectedCount - The expected count of the events to be matched by the rule.

Stop the consumer

Stops the Kafka consumer started by the corresponding step before. All recorded events are kept and can be drained into the variable using the step described above.

When I stop consuming events from `$consumerKey` Kafka

Deprecated syntax (will be removed in VIVIDUS 0.6.0):

When I stop consuming messages from `$consumerKey` Kafka
  • $consumerKey - The key of the Kafka consumer configuration.

Examples

Example 4. Consume events from the Kafka topic
When I start consuming events from `dev` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of events to Kafka
When I wait with `PT30S` timeout until count of consumed `dev` Kafka events is greater than `1`
When I stop consuming events from `dev` Kafka
When I drain consumed Kafka events to scenario variable `consumed-events`
Then `${consumed-events[0]}` is equal to `some-expected-event`
Example 5. Drain events while listener is running
When I start consuming events from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of events to Kafka
When I drain consumed `prod` Kafka events to scenario variable `events-after-action-X`
!-- Perform more actions triggering the publishing of events to Kafka
When I drain consumed `prod` Kafka events to scenario variable `events-after-action-Y`
When I stop consuming events from `prod` Kafka
Example 6. Peek events while listener is running
When I start consuming events from `prod` Kafka topics `my-topic-1, my-topic-2`
!-- Perform any actions triggering the publishing of events to Kafka
When I drain consumed `prod` Kafka events to scenario variable `events-after-action-X`
!-- Perform more actions triggering the publishing of events to Kafka
When I peek consumed `prod` Kafka events to scenario variable `events-after-action-Y`
When I stop consuming events from `prod` Kafka