
Handling streaming data in Kafka isn’t an easy task. Kafka itself does not offer mechanisms for data transformation or combining data a from different streams. You will have to write your own programs to do it.
Confluent has recognized this problem and developed an open source tool called KSQL. It reads Kafka topics and exposes them as streams and tables, on which you can run standard SQL queries. You can manipulate the data with SQL functions and also join data from different streams as you would do with RDBMS tables, all in good old, widely used, SQL.
Until now, Confluent only released a developer preview with a quickstart demo and some documentation, but a stable version should be out soon, probably this April. To learn more you can read this introduction and the documentation from Confluent.
Ksql supports Kafka 0.10.0.1 and later. I could successfully run it against the Confluent platform Kafka and Apache Kafka 0.10.0.1. It did not run well with the built-in Kafka on my Cloudera cluster. I guess it is because Cloudera embeds Kafka 0.9 in their CDH. If you still want to run Ksql with your existing Cloudera cluster, Landoop offers Fast Data, which should enable running Kafka 0.10 in Cloudera CDH, but I did not have the chance to actually test it.
For this test I will use the Confluent platform which comes with zookeeper, Kafka and some extra components that we won’t be using. I created Three servers: “kafka” will run … well, Kafka, and “ksql” and “ksql2” will run ksql to simulate a multiple server installation.
Install confluent platform
I installed it on “kafka” server.
rpm --import https://packages.confluent.io/rpm/4.0/archive.key echo "[Confluent.dist] name=Confluent repository (dist) baseurl=https://packages.confluent.io/rpm/4.0/7 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/4.0/archive.key enabled=1 [Confluent] name=Confluent repository baseurl=https://packages.confluent.io/rpm/4.0 gpgcheck=1 gpgkey=https://packages.confluent.io/rpm/4.0/archive.key enabled=1" > /etc/yum.repos.d/confluent.repo yum clean all yum install confluent-platform-oss-2.11
Now start Confluent server:
/usr/bin/confluent start
This will start Zookeeper and Kafka (along with some other components).
Now let’s install ksql on the two servers (ksql and ksql2) to form a cluster:
Install KSQL
Confluent offers a Docker image that includes Ksql, but I preferred to do it the hard way and install everything myself.
First, download ksql from here. Then untar the file and enter the ksql directory. Ksql has two operation modes: standalone and cluster. In a real-world, production system, we would like to use a cluster of several Ksql server to which the client connects remotely, so we will run in cluster mode. For this demo I used two servers: ksql and ksql2.
After untaring the tar file, edit config/ksqlserver.properties. Here is how the file looks in my setup, with explanations (for ksql2 server, the listener address should change according to the server name):
# The Kafka servers (at least one of them, but it's best to specify more than one for high availability) bootstrap.servers=kafka:9092 ksql.command.topic.suffix=commands # The zookeeper that your Kafka cluster is using zookeeper.connect=kafka:2181 # ksql is configured by default to send metrics back to Confluent servers. Set this parameter to false if you # Want to stop this behavior confluent.support.metrics.enable=false # The name and port of the current machine that ksql server runs on. listeners=http://ksql:8090
Now start the Ksql server:
cd /ksql/ksql ./bin/ksql-server-start ./config/ksqlserver.properties
Ksql server does not write any output to the console, and it writes all logs to /tmp.
If everything goes well you will see a message in /tmp/ksql.log saying that the server is up and running:
[2018-03-14 01:41:53,185] INFO Starting server (io.confluent.ksql.rest.server.KsqlRestApplication:208) [2018-03-14 01:41:53,349] INFO Logging initialized @8265ms (org.eclipse.jetty.util.log:186) [2018-03-14 01:41:53,481] INFO Adding listener: http://ksql:8080 (io.confluent.rest.Application:186) [2018-03-14 01:41:53,774] INFO jetty-9.2.22.v20170606 (org.eclipse.jetty.server.Server:327) [2018-03-14 01:41:55,422] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27) [2018-03-14 01:41:55,767] INFO Started [email protected]{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744) [2018-03-14 01:41:55,801] INFO Started [email protected]{HTTP/1.1}{ksql:8080} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266) [2018-03-14 01:41:55,802] INFO Started @10719ms (org.eclipse.jetty.server.Server:379) [2018-03-14 01:41:55,818] WARN The version check feature of KSQL is disabled. (io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent:68) [2018-03-14 01:41:55,818] INFO Server up and running (io.confluent.ksql.rest.server.KsqlRestApplication:210)
In the logs, I could not see any evidence of the two servers communicating with each other or even being aware of the other servers. This leads me to think this is not a real cluster. I think those servers can supply scalability and load balancing, but not fault tolerance. In case a ksql server goes down, all connections to it will be dropped and will not be transferred automatically to another ksql server. I’m not absolutely sure about this but this is how it looks like.
You can see that ksql created some topics of its own in order to manage offsets, keep metadata and general bookkeeping:

You can now connect from ksql client to any of the ksql servers:
./bin/ksql-cli remote http://ksql:8090
If everything goes well you will get a “ksql>” prompt.
The simulator program
For our test we need a program that will generate our data stream. Ksql itself comes with two built in data generators. One generates clickstream table the other generates users stream. You can see their usage in the quickstart guide. However, I decided to write my own very simple data generator.
Suppose we have 10 temperature sensors and 10 pressure sensors, each sends its readings every Two seconds. So we will have two data streams to work on. Here is the python program that simulates it:
from kafka import KafkaProducer
import random
import time
from threading import Timer
kafka_server = "kafka:9092"
producer = KafkaProducer(bootstrap_servers=kafka_server)
def send(data_type, data, key1):
if data_type == "temp":
producer.send(topic="temperature", value=data, key=key1)
if data_type == "press":
producer.send(topic="pressure", value=data, key=key1)
while True:
for i in range(1, 10):
# generate random temperature and pressure
temp = random.randint(-20, 101)
press = random.randint(30, 80)
# comstruct csv lines with sensor id, timetamp and sensor reading and send them down the topic
temp_line = str(i) + "," + str(time.ctime()) + "," + str(temp)
send("temp", temp_line, str(i))
press_line = str(i) + "," + str(time.ctime()) + "," + str(press)
send("press", press_line,str(i))
time.sleep(2)
So I created two topics: temperature and pressure and ran the above program to generate data.
Playing with Ksql
Now let’s create the stream based on the temperature sensors:
CREATE STREAM temperature (sensor_id int, datetime varchar ,temp int) \ WITH (kafka_topic='temperature', value_format='DELIMITED'); Message ---------------- Stream created ----------------
Now we can run a simple select statement on the stream and get the data:
ksql> select * from temperature; 1521236042150 | null | 1 | Fri Mar 16 23:34:02 2018 | 77 1521236044168 | null | 2 | Fri Mar 16 23:34:04 2018 | 69 1521236046181 | null | 3 | Fri Mar 16 23:34:06 2018 | 63 1521236048190 | null | 4 | Fri Mar 16 23:34:08 2018 | 88 1521236050194 | null | 5 | Fri Mar 16 23:34:10 2018 | 22 1521236052199 | null | 6 | Fri Mar 16 23:34:12 2018 | 44 1521236054214 | null | 7 | Fri Mar 16 23:34:14 2018 | 58 1521236056225 | null | 8 | Fri Mar 16 23:34:16 2018 | 42
Unlike a regular query from a RDBMS, a query from a stream returns infinite results since the stream is always open and new items are flowing through it.
Now we will create a table for the pressure data. Note that unlike a stream, a table must have a key:
CREATE table pressure (sensor_id int, datetime varchar ,press int) \ WITH (kafka_topic='pressure', value_format='DELIMITED', key='sensor_id');
The main difference between streams and tables is that stream data is immutable while table data can be updated. A stream can be joined with a table but two streams cannot be joint, this is why I created a stream and a table, not two streams.
Another thing to note, is that when sending data to a topic on which a table is built, you must specify a key. If you do not specify a key, any query from the table will just hang as described here.
ksql> select * from pressure; 1522095508187 | 7 | 7 | Mon Mar 26 23:18:28 2018 | 57 1522095510200 | 8 | 8 | Mon Mar 26 23:18:30 2018 | 43 1522095512214 | 9 | 9 | Mon Mar 26 23:18:32 2018 | 32 1522095514228 | 1 | 1 | Mon Mar 26 23:18:34 2018 | 43 1522095516228 | 2 | 2 | Mon Mar 26 23:18:36 2018 | 64
Now let’s create a new stream which joins the stream and table we created earlier:
create stream all_data as \ select a.sensor_id, \ a.datetime, \ a.temp, \ b.press \ from temperature a \ left join pressure b on a.sensor_id=b.sensor_id; Message ---------------------------- Stream created and running ---------------------------- ksql> select * from all_data; 1522096450841 | 1 | 1 | Mon Mar 26 23:34:10 2018 | 27 | 43 1522096452857 | 2 | 2 | Mon Mar 26 23:34:12 2018 | 2 | 54 1522096454873 | 3 | 3 | Mon Mar 26 23:34:14 2018 | 15 | 59 1522096456888 | 4 | 4 | Mon Mar 26 23:34:16 2018 | 54 | 53 1522096458904 | 5 | 5 | Mon Mar 26 23:34:18 2018 | 70 | 30 1522096460904 | 6 | 6 | Mon Mar 26 23:34:20 2018 | 82 | 57 1522096462920 | 7 | 7 | Mon Mar 26 23:34:22 2018 | 77 | 49
You can see that now the output contains both temperature and pressure.
We will drop this stream and recreate it with an added column which is a product of temperature and pressure:
drop stream all_data; create stream all_data as \ select a.sensor_id, \ a.datetime, \ a.temp, \ b.press, \ a.temp*b.press \ from temperature a \ left join pressure b on a.sensor_id=b.sensor_id; Message ---------------------------- Stream created and running ---------------------------- ksql> select * from all_data; 1522182652958 | 4 | 4 | Tue Mar 27 23:30:52 2018 | -13 | 50 | -650 1522182654972 | 5 | 5 | Tue Mar 27 23:30:54 2018 | 87 | 66 | 5742 1522182656987 | 6 | 6 | Tue Mar 27 23:30:56 2018 | 2 | 50 | 100 1522182659001 | 7 | 7 | Tue Mar 27 23:30:59 2018 | 92 | 51 | 4692 1522182661004 | 8 | 8 | Tue Mar 27 23:31:01 2018 | 50 | 72 | 3600 1522182663018 | 9 | 9 | Tue Mar 27 23:31:03 2018 | 64 | 51 | 3264
So, as you see, Ksql can do on the fly data transformation and enrichment as well.
Finally, I wanted to run some aggregate functions. The first thing that popped into my mind was average, but unfortunately Confluent left the avg function out of ksql. So I tried with sum()/count() but this produced an error: “No SUM aggregate function with Schema{INT32} argument type exists!”. Searching the web I found that this is a known issue. Although the issue is reported as resolved, I still couldn’t run those aggregate functions. I tried changing the data type from int to bigint and to double but the same error remained. Even a simple count(*) produced the same error.
Conclusions
The idea behind Ksql is awesome and makes consuming data streams via Kafka much easier. It’s tables and streams, data manipulation and joining can encapsulate complex logic and yet expose a simple interface to the developer/analyst. In this post I explored only a small part of its capabilities.
In real world, you will probably want to connect to ksql from your programs, not ksql client. For this purpose there is a JDBC driver. It seems to be developed independently from the ksql project and I did not test it myself, but it is worth trying.
However, as a young product which is just a developer preview, it still has some rough edges such as the aggregate functions issue and poor documentation. As Confluent states, it is not ready for production use. I hope those problems will be addressed in the upcoming official release.