
It’s been quite a while since my last post. That’s because I was putting an effort in learning some DevOps skills (Mainly Kubernetes) and I hope to include some of this content in my blog soon. This time we will look at how to consume data from Kafka and send it to Splunk for analysis.
Kafka connect is a framework that uses Kafka topics for collecting data from various sources and distributing it to different sinks. It comes bundled with Kafka installation but can run independently from Kafka brokers and access them remotely. Here is an explanation about what Kafka connect is and it’s architecture. It is also a good candidate for running on Kubernetes since it only uses outgoing communication.
The framework uses plugins to be able to talk to different sources and sinks. There are many ready plugins for a variety of systems. Some of them are free and some are licensed to companies like Confluent or Debezium. many of them can be found here. Some systems can be a source of data, some can be a sink and some can be both. Basically a source adapter polls the source system for changes, pulls the data and publish it in a Kafka topic. A sink adapter subscribes to a Kafka topic, gets incoming events and exports them into the target system.
As I mentioned, there are several dozens of supported adapters. Just for the demonstration we will capture events from kafka topic and store them in splunk for visualization and investigation.
Running the framework
Kafka connect was developed by Confluent but is present in any Kafka distribution. I do not use Confluent platform so I tested it on Apache kafka and on Cloudera.
There are Three files you will need and here’s where to find them in different Kafka distributions:
File: connect-distributed.sh
Location in Apache Kafka: $KAFKA_HOME/bin
Location in Confluent: $CONFLUENT_HOME/bin
Location in Cloudera:/opt/cloudera/parcels/CDH/lib/kafka/bin
Files: connect-distributed.properties and connect-log4j.properties
Location in Apache Kafka: $KAFKA_HOME/config
Location in Confluent: $CONFLUENT_HOME/etc/kafka
Location in Cloudera: /opt/cloudera/parcels/CDH/etc/kafka/conf.dist
connect-distributed.sh runs the framework (there is also a standalone version), connect-distributed.properties contains parameters for the runtime and connect-log4j.properties contains log related parameters.
So first we need to edit connect-distributed.properties. Some property changes are needed specifically for splunk and are not necessary with other sinks (You should read the documentation for those).
Add or change the following properties in conect-distributed.properties:
bootstrap servers=[your kafka server 1:9092, your kafka server 2:9092...] group.id=kafka-connect-splunk key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter #(Those values are needed specifically for splunk, other sinks may require other converters) key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.schemas.enable=false internal.value.schemas.enable=false offset.flush.interval=10000 plugin.path=/opt/connect-plugins
Take note of the plugin path.
The file also tells connect which topics to use for holding its configuration, status and offsets and how many replicas and partitions each of them will have. For production you may want more than one replica and some more partitions, according to your load and the number of connect workers.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #offset.storage.partitions=25 # Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated, # and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. config.storage.topic=connect-configs config.storage.replication.factor=1 # Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted. # Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create # the topic before starting Kafka Connect if a specific topic configuration is needed. # Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value. # Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able # to run this example on a single-broker cluster and so here we instead set the replication factor to 1. status.storage.topic=connect-status status.storage.replication.factor=1 #status.storage.partitions=5
In case your Kafka cluster is secured with TLS and Kerberos, you will have to add this section to the file.
sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka security.protocol=SASL_SSL sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useKeyTab=true \ storeKey=true \ keyTab="[path to keytab]" \ principal="kafka/[host fqdn]"; ssl.key.password=[password] ssl.keystore.location=[path to keystore file] ssl.keystore.password=[password] ssl.truststore.location=[path to truststore file[ ssl.truststore.password=[password] ssl.protocol=TLS ssl.enabled.protocols=TVSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS ######################################### consumer.security.protocol=SASL_SSL consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useKeyTab=true \ storeKey=true \ keyTab="[path to keytab]" \ principal="kafka/[host fqdn]"; consumer.ssl.key.password=[password] consumer.ssl.keystore.location=[path to keystore file] consumer.ssl.keystore.password=[password] consumer.ssl.truststore.location=[path to truststore file[ consumer.ssl.truststore.password=[password] consumer.ssl.protocol=TLS consumer.ssl.enabled.protocols=TVSv1.2,TLSv1.1,TLSv1 consumer.ssl.keystore.type=JKS consumer.ssl.truststore.type=JKS consumer.sasl.kerberos.service.name=kafkaasl.mechanism-GSSAPI consumer.sasl.kerberos.service.name=kafka
Now we need to create the plugin path and put the needed plugins there. A plugin can come in two formats: a directory with files or a jar file. For our example we will download the plugin for splunk from here (Extract it and take the splunk-kafka-connect-v2.0.2.jar file from lib directory, then copy it to the plugins directory pointed by plugin.path).
Now start kafka connect by running. You should prefix the script and the properties files with their respective paths from the file locations listed above in the post:
connect-distributed.sh connect-distributed.properties
This will run Kafka connect in the the console. In production you will probably want to add the “-daemon” parameter to make connect run in the background. If you run more than one instance (on different hosts) they will form a cluster and work together. It taked few minites to finish initialization but If Kafka connect started without errors we can go on to the next step.
Splunk sink
First we need to create an index. Go to Settings->Data->Indexes and create an index. I wasn’t too creative and just called mine “guy”.
Now create a HTTP Event collector (hec). You can create the hec on an indexer or on a heavy forwarder in your Splunk cluster
. Go to Settings->Data->Data inputs and then select HTTP event collector. Now click “global settings” and make sure all tokens are enabled:

Now click “New token”. In the first wizard page, just give the new token a name, then click next. In the next page, you can leave the source type to be automatic so splunk will try to infer it. If the data has a known format such as syslog that you can choose this specific type. Then, underneath we will choose the index we want the data to go to:

Then click review and submit.
Take note of the token value of the hec you just created, you will need it later:
![]()
The way we interact with Kafka connect server is via REST api. Run this to create a new connector withing Kafka connect:”
curl http://clouderal1:8083/connectors -X POST -H "Content-Type:application/json" -d '{
"name": "kafka-connect-splunk",
"config": {
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"tasks.max": "3",
"splunk.indexes": "guy",
"topics": "guy",
"splunk.hec.uri": "http://splunk:8088",
"splunk.hec.token": "3647d96b-5705-4109-a5c5-67c5a9325f85",
"splunk.hec.validate.certs": "false",
"splunk.hec.ack.enabled": "false"
}
}'
Many parameters are self explanatory, the others are:
- hec token is the one you got when creating the hec in splunk.
- hec rui is the address where you created your hec (indexer or heavy forwarder) and the port from the global settings (8088 by default)
- In case you enabled ack when created the hec, you should also set ack.enabled to true.
If you ever need to delete the connector, this is how to do it:
curl -X DELETE http://clouderal1:8083/connectors/kafka-connect-splunk
You can also check its status:
http://clouderal1:8082/connectors/kafka-connect-splunk/status
And many other actions as documented here.
Now let’s test it. Create a topic called guy:
kafka-topics.sh --create --topic guy --bootstrap-server clouderal1:9092,clouderal2:9092,clouderal3:9092 --replication-factor 3 --partitions 3
Then produce some events using the console producer bundled with Kafka:
kafka-console-producer.sh --topic guy --bootstrap-server clouderal1:9092,clouderal2:9092,clouderal3:9092
21/07/21 12:44:08 INFO clients.Metadata: [Producer clientId=console-producer] Cluster ID: Pm-td-NYRTS1qjs0FCFN_g
Hello
1234
>>12345
>Bye
>21/07/21 12:44:25 INFO producer.KafkaProducer: [Producer clientId=console-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
You can see in Kafka connect log that it is consuming the data:
[2021-07-21 12:42:55,780] INFO [Consumer clientId=connector-consumer-kafka-connect-splunk-0, groupId=connect-kafka-connect-splunk] Setting offset for partition guy-0 to the committed offset FetchPosition{offset=17, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[clouderal2:9092 (id: 1546333889 rack: null)], epoch=4}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:792)
[2021-07-21 12:44:54,058] INFO WorkerSinkTask{id=kafka-connect-splunk-0} Committing offsets asynchronously using sequence number 12: {guy-0=OffsetAndMetadata{offset=21, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:349)
Now go to Splunk and query guy index:

The data we just entered is present in the Splunk index.
This is a somewhat simplistic scenario just for the sake of demonstration. In real world the data type and security considerations will be more complex.
* If you are using Cloudera CDP (the newer versions), then Kafka connect can be configured and managed via Cloudera manager and there is no need to deal directly with configuration files and the cli.