Skip to content

Toro Cloud Dev Center


Kafka listener endpoint

The Kafka listener endpoint enables developers to write applications that subscribe to Kafka topics and invoke a registered service when messages are received from these topics.

Unsupported by Martini Online

You cannot create nor edit a Kafka listener endpoint in Martini Online.

Properties

The Kafka listener endpoint has Martini and Kafka-specific properties, grouped accordingly and described in the following sub-sections.

For more information...

The Kafka-specific property descriptions below are, for the most part, derived from the Kafka consumer configuration and Spring for Kafka official documentation pages. For further reading about each property, visit these resources.

General configuration

Property Default Description
Name (required) The name of the endpoint.
Service (required) The service to execute when the endpoint is triggered.
Run As Anonymous The user to run the service in behalf of. This is logged to Tracker.
Document Type <Name of endpoint type> The document type to be used when adding documents to Tracker as this endpoint is triggered.
Auto Start true Whether or not to automatically start the endpoint upon package startup.
Log To Tracker false Flag determining whether executions should be logged to Tracker.
Replicated true If this endpoint is configured on a Martini instance that's running in a cluster, replicated will determine whether to run the endpoint on all instances, or only the elected leader node in the cluster. When this is checked, all instances will run the endpoint. When it's unchecked, only the leader node will run the endpoint.

Kafka listener-specific configuration

Name Default Choices Description
Bootstrap Servers localhost:9092 The list of host-port pairs to use for establishing the initial connection to the Kafka cluster.
Topics The list of topics to subscribe to.
Group ID The consumer group to which this/these listener/s belong/s to.
Key Deserializer byte, byte_buffer, double, integer, long, string, json, xml, or the fully qualified name of a class that implements org.apache.kafka.common.serialization.Deserializer; just make sure the class exists in the classpath. The deserializer to use for message key.
Value Deserializer byte, byte_buffer, double, integer, long, string, json, xml. or the fully qualified name of a class that implements org.apache.kafka.common.serialization.Deserializer; just make sure the class exists in the classpath. The deserializer to use for message value.

Commit strategy configuration

Name Default Choices Description
Enable Auto Commit true true, false Whether or not to commit the offset periodically in the background.
Auto Commit Interval (ms) 5000 The frequency in milliseconds that the offsets are auto-committed to Kafka when Enable Auto Commit is set to true.
Acknowledgement Mode BATCH
  • RECORD - commit the offset after each message has been passed to the listener(s)
  • BATCH - commit the offset when all the messages have been processed
  • TIME - commit the offset when all the messages have been processed as long as the Acknowledgement Time (ms) since the last commit has been exceeded
  • COUNT - commit the offset when all the messages have been processed as long as Acknowledgement Count messages have been received since the last commit
  • COUNT_TIME - similar to TIME and COUNT but the commit is performed if either condition is true
  • MANUAL - the registered service is responsible for committing the offset by calling ack.acknowledge() manually; after which, the same semantics as BATCH are applied
  • MANUAL_IMMEDIATE - commit the offset immediately when ack.acknowledge() method is called by the registered service.
If Enable Auto Commit is false, this will be the offset commit behavior or strategy.
Acknowledgement Count 1 [0,...] The number of outstanding record count after which offsets should be committed when Acknowledgement Mode is set to COUNT or COUNT_TIME.
Acknowledgement Time (ms) 1 [0,...] The time (ms) after which outstanding offsets should be committed when Acknowledgement Mode is set to TIME or COUNT_TIME.
Acknowledgement On Error false true, false If Enable Auto Commit is false, this will determine whether or not to commit offsets when the listener/s throws exceptions.

Data configuration

Name Default Choices Description
Fetch Max Bytes 52428800 [0,...] The maximum amount of data the server should return for a fetch request.
Fetch Min Bytes 1 [0,...] The minimum amount of data the server should return for a fetch request.
Max Partition Fetch Bytes 1048576 [0,...] The maximum amount of data per-partition the server will return.
Receive Buffer Bytes 65536 [0,...] The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.
Send Buffer Bytes 131072 [0,...] The size of the TCP send buffer (SO_SNDBUF) to use when sending data.

Metrics configuration

Name Default Choices Description
Reporters The list of fully qualified class names; where each class will be use as metrics reporter. The class must implement the org.apache.kafka.common.metrics.MetricsReporter interface. Note that the class must exists in the classpath.
Recording Level Info Info, Debug The highest recording level for metrics.
Number Samples 2 [0,...] The number of samples maintained to compute metrics.
Sample Window (ms) 30000 [0,...] The window of time a metrics sample is computed over.

Networking configuration

Name Default Choices Description
Isolation Level Read Uncommitted Read Uncommitted, Read Committed Controls how to read messages written transactionally.
Connections Max Idle (seconds) 540 [0,...] Close idle connections after the number of seconds specified by this configuration.
Fetch Max Wait (ms) 500 [0,...] The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by Fetch Min Bytes.
Heartbeat Interval (ms) 3000 [0,...] The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities.
Max Poll Interval (seconds) 300 [0,...] The maximum delay between messages requests to Kafka when using consumer group management.
Max Poll Records 500 [0,...] The maximum number of messages returned in a single message request to Kafka.
Monitor Interval (seconds) 30 [0,...] The interval between checks for a non-responsive listener/s in seconds.
Poll Timeout (seconds) 1 [0,...] The maximum time to block in the listener/s waiting for messages.
Request Timeout (seconds) 305 [0,...] The configuration controls the maximum amount of time the listener/s will wait for the response of a request.
Session Timeout (seconds) 10 [0,...] The timeout used to detect consumer failures when using Kafka's group management facility.
Shutdown Timeout (seconds) 10 [0,...] The timeout for shutting down the listener/s.
Batch false true, false If true, the registered service will be invoked on each batch of messages. Otherwise, it will be invoked on each message in the batch.

Resiliency configuration

Name Default Choices Description
Auto Offset Reset Latest Latest, Earliest, None What to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (e.g. because that data has been deleted).
Reconnect Backoff (ms) 50 [0,...] The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect.
Reconnect Backoff Max (ms) 1000 [0,...] The base amount of time to wait before attempting to reconnect to a given host.
Retry Backoff (ms) 100 [0,...] The amount of time to wait before attempting to retry a failed request to a given topic partition.

SASL configuration

Name Default Choices Description
JAAS Config JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
Kerberos Kinit Command /usr/bin/kinit Kerberos kinit command path.
Kerberos Service Name The Kerberos principal name that Kafka runs as.
Mechanism GSSAPI SASL mechanism used for client connections.
Kerberos Minimum Time Before Login (ms) 60000 [0,...] Login thread sleep time between refresh attempts.
Kerberos Ticket Renew Jitter 0.05 [0.00-1.00] Percentage of random jitter added to the renewal time.
Kerberos Ticket Renew Window Factor 0.80 [0,...] Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.

SSL configuration

Name Default Description
Cipher Suites The list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.
Enabled Protocols TLSv1,TLSv1.2,TLSv1.1 The list of protocols enabled for SSL connections.
Endpoint Identification Algorithm The endpoint identification algorithm to validate server hostname using server certificate.
Keymanager Algorithm SunX509 The algorithm used by key manager factory for SSL connections.
Key Password The password of the private key in the key store file.
Keystore Location The location of the key store file.
Keystore Password The store password for the key store file.
Keystore Type JKS The file format of the key store file. This is optional for client.
Protocol TLS The SSL protocol used to generate the SSLContext.
Provider The name of the security provider used for SSL connections.
Secure Random Implementation The SecureRandom PRNG implementation to use for SSL cryptography operations.
Trustmanager Algorithm PKIX The algorithm used by trust manager factory for SSL connections.
Truststore Location The location of the trust store file.
Truststore Password The password for the trust store file.
Truststore Type JKS The file format of the trust store file.

Advanced configuration

Name Default Choices Description
Partition Assignment Strategies org.apache.kafka.clients.consumer.RangeAssignor (not shown in UI) The list of fully qualified class names, where each class will be used by the client to distribute partition ownership amongst listener instances when group management is used. Note that these classes must exist in the classpath.
Interceptor Classes The list of fully qualified class names, where each class will be used as interceptors. The classes must implement the org.apache.kafka.clients.consumer.ConsumerInterceptor interface which allows it to intercept (and possibly mutate) messages received by the consumer. Note that these classes must exist in the classpath.
Client ID An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just IP/port by allowing a logical application name to be included in server-side request logging.
Security Protocol PlainText PlainText, Ssl, Sasl PlainText, Sasl Ssl Protocol used to communicate with brokers.
Metadata Max Age (ms) 300000 [0,...] The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
Concurrency 1 [1,...] The maximum number of concurrent listeners running. Messages from within the same partition will be processed sequentially.
Log Container Config true true, false Whether or not to instruct each listener to log this configuration.
Sync Commits true true, false Whether or not to commit offsets synchronously when Enable Auto Commit is false.
Exclude Internal Topics true true, false Whether messages from internal topics (such as offsets) should be exposed to the listener/s.
Check Crcs true true, false Automatically check the CRC32 of the messages consumed.

Service

When the endpoint is triggered, the following variables are exposed to the configured service:

General parameters

Name Type Description
$trackerId java.lang.String The Tracker document internal ID. If the endpoint was configured to not track, this value will be null.
$tracker io.toro.martini.tracker.Tracker The Tracker object. If the endpoint was configured to not track, this value will be null.
martiniPackage MartiniPackage The Martini package that contains the endpoint.
parameters java.util.Map A map containing all the endpoint specific parameters.
properties java.util.Map A map containing containing all the properties associated with the endpoint.

Kafka listener-specific parameters

Name Type Description
messages java.util.List The list containing the key-value pairs received from Kafka.
consumer org.apache.kafka.clients.consumer.Consumer The client that consume messages from Kafka.
ack org.springframework.kafka.support.Acknowledgment When Enable Auto Commit is set tofalse and Acknowledgment Mode is set to MANUAL or MANUAL_IMMEDIATE, the registered service is responsible for committing the offset by calling ack.acknowledge() manually. Otherwise, this variable can be just ignored.

Message key/value failed deserialization

A io.toro.martini.endpoint.DeserializationException object is returned in the message key/value whenever deserialization failure occurs, so it is best to check if the key/value in a particular message is an instance of io.toro.martini.endpoint.DeserializationException and take appropriate action.

Example

Consider the following Kafka listener endpoint configuration:

Kafka listener endpoint configuration

This registers the Kafka listener endpoint to subscribe to the stocks topic and expect the message's key to be a string and the value to be a valid json string. The service assigned to this endpoint has the following inputs:

Kafka listener service example inputs

The core package contains helper models for Kafka which are located at models.io.toro.martini.kafka. These models can be reused to map your data with a Gloop model, like in the example above where messages.value was mapped with stock Gloop model.

Kafka listener service example process

(1) Note that messages will only contain only a single message if the Batch property is set to false. (3-4) Mock services that process the received messages. (6) This is only necessary if Enable Auto Commit is false and Acknowledgment Mode is set to MANUAL or MANUAL_IMMEDIATE


  1. The type of endpoint is the default Tracker document type for all endpoints.