Use OpenEBS Underneath your Apache Kafka Brokers

I'm a Developer Advocate at MayaData, a company dedicated to enabling data agility through the use of Kubernetes as a data layer and the sponsor of the CNCF project OpenEBS and other open source projects. Take it for a test drive via free and easy to use management software by registering here.

Simplifying the use of Apache Kafka on Kubernetes

Sometimes you get a nice big chunk of data, all structured and ready to read in, process, and do something with. Sometimes. More commonly, your data comes in dribbles, bursts, and sometimes a fire-hose. So what do you do with this kind of streaming data? How do you make sure you’re processing efficiently and close to real-time, while not dropping data due to lack of capacity?

Well, you might want to look into a streaming pub/sub system like Apache Kafka. It works great inside of Kubernetes -- with the help of OpenEBS Dynamic LocalPVs -- and gives you a message bus that all of your components can talk on. With Apache Kafka, you can have data streaming in on one topic. You can consume that topic with a job that will categorize the bits of data and put them on different topics. You might have other jobs that process and create their own artifacts that they push to a topic, whatever you might like. And then, maybe you have a final topic you use to put all of the incoming data along with related data generated by your app components into your data lake.

Kafka

You can use it as a reliable message queue, but with multiple subscribers, data persistence, and the ability to run code in the data stream. Just throw some data at it and let Apache Kafka organize it for you and get the data where it needs to be. Because of its design, it’s capable of buffering for a while if you get a burst of data that comes in faster than your processors can process it. This turns out to be a big deal when you’re trying to keep your software components loosely coupled.

You can see that the code is pretty straightforward from this python example:

#!/usr/bin/env python

from time import sleep
from json import dumps
from kafka import KafkaProducer
import os

print('\n\nrunning on ', os.environ['MY_POD_NAME'], '\n')

producer = KafkaProducer(bootstrap_servers=['kafka-svc'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

for i in range(200):
    data = {'key' : 'string'}
    producer.send('random', value=data)
    print("sent ", i, "th clump of random data\n")
    sleep(.1)

That’s a pretty easy way to output data. In this example, ‘random’ is a topic name, and Python will dutifully put our sequential integer onto that Kafka topic for consumption by anyone who asks. It’s pretty much just as easy to catch it on the other end:

#!/usr/bin/env python

from kafka import KafkaConsumer
import os

print('\n\nrunning on ', os.environ['MY_POD_NAME'], '\n')

consumer = KafkaConsumer('random',
                         bootstrap_servers = 'kafka-svc',
                         api_version = (0, 10))

for m in consumer:
    print(m.value, "\n")

Apache Kafka can keep multiple replicas for resiliency

While our data is “in-flight”, Kafka can keep multiple replicas for resiliency. So, you can effectively use Kafka as a structured, buffered output for your program, and you can output more or less whatever you like, receive it on the other side. Kafka will keep it around until none of the consumers need it anymore. Um, where are we going to keep it?

Persistence in Kubernetes is also easy to solve with OpenEBS. You can install it on your cluster and create EBS style volumes using instance storage, use it to orchestrate existing disks on your nodes, and even use a local ZFS pool to create volumes in.

Cool. So, we have the need for Apache Kafka so our code can talk to other pieces of code as well as databases and other components (did I mention there’s a wealth of ready-to-use connectors out there?). We want it in Kubernetes because that’s where we’re running our app, and that way, it’s easy to manage. And we have OpenEBS to handle this new persistence requirement that Kafka brings with it.

So, let’s spin up a test cluster in our favorite cloud and see what we can do with this piece of technology.
  1. Set up a cluster with some nodes that have local ssds, like aws i3 nodes.
    $ kubectl get nodes
    NAME                                         STATUS   ROLES    AGE     VERSION
    ip-172-20-34-74.us-east-2.compute.internal   Ready    node     6d18h   v1.14.10
    ip-172-20-45-3.us-east-2.compute.internal    Ready    node     6d18h   v1.14.10
    ip-172-20-53-19.us-east-2.compute.internal   Ready    node     6d18h   v1.14.10
    ip-172-20-59-66.us-east-2.compute.internal   Ready    node     6d18h   v1.14.10
    ip-172-20-63-65.us-east-2.compute.internal   Ready    master   6d18h   v1.14.10
    
  2. Install OpenEBS on the cluster (you can do this right from OpenEBS Director now)
    Install OpenEBS on the cluster
  3. Use OpenEBS to make a new StorageClass that targets that local ssd, or just use `openebs-device.’ I’ll use ZFS LocalPV in this example:
    ---
    apiVersion: extensions/v1beta1
    kind: DaemonSet
    metadata:
      namespace: openebs
      name: system-setup
      labels:
        openebs.io/component-name: system-setup
        openebs.io/version: 1.4.0
      annotations:
        command: &cmd sed -i 's/main/main contrib/' /etc/apt/sources.list ; apt -y install ; apt -y update ; apt -y install open-iscsi zfsutils-linux; sudo sh -c 'systemctl enable iscsid ; sudo systemctl start iscsid ; sudo systemctl status iscsid ; for d in /dev/nvme0n1; do umount $d ; wipefs -f -a $d ; lsblk ; done; modprobe zfs ; zpool create zfspv-pool /dev/nvme0n1 ; zfs list'
    spec:
      selector:
        matchLabels:
          openebs.io/component-name: system-setup
      template:
        metadata:
          labels:
            openebs.io/component-name: system-setup
        spec:
          hostNetwork: true
          initContainers:
          - name: init-node
            command:
              - nsenter
              - --mount=/proc/1/ns/mnt
              - --
              - sh
              - -c
              - *cmd
            image: alpine:3.7
            securityContext:
              privileged: true
          hostPID: true
          containers:
          - name: wait
            image: k8s.gcr.io/pause:3.1
          hostPID: true
          hostNetwork: true
          tolerations:
          - effect: NoSchedule
            key: node-role.kubernetes.io/master
    ---
    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: openebs-zfs
    allowVolumeExpansion: true
    parameters:
      fstype: "zfs"
      poolname: "zfspv-pool"
    provisioner: zfs.csi.openebs.io
    volumeBindingMode: WaitForFirstConsumer
    
  4. Install Zookeeper and Kafka (we like the KUDO operator for it) using that StorageClass
    $ kubectl get pods -l heritage=kudo
    NAMEkafka-kafka-0                    2/2     Running   0          11m
    kafka-kafka-1                    2/2     Running   0          11m
    kafka-kafka-2                    2/2     Running   0          11m
    zookeeper-instance-zookeeper-0   1/1     Running   0          11m
    zookeeper-instance-zookeeper-1   1/1     Running   0          11m
    zookeeper-instance-zookeeper-2   1/1     Running   0          11m
    $ kubectl get pvc
    NAME                                                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
    kafka-datadir-kafka-kafka-0                                 Bound    pvc-9e408ab2-8bd8-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    kafka-datadir-kafka-kafka-1                                 Bound    pvc-b24f6bac-8bd8-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    kafka-datadir-kafka-kafka-2                                 Bound    pvc-c682fbd7-8bd8-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    zookeeper-instance-datadir-zookeeper-instance-zookeeper-0   Bound    pvc-ef99ec6c-8bd7-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    zookeeper-instance-datadir-zookeeper-instance-zookeeper-1   Bound    pvc-efa0ca56-8bd7-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    zookeeper-instance-datadir-zookeeper-instance-zookeeper-2   Bound    pvc-efaaf239-8bd7-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    
    Install Zookeeper & Kafka
  5. Set up the Grafana dashboard and kick the tires on it a bit.

You can get more detail on these steps here.

Check back in this space for more information on Kafka in the future.

Important Links

Access Director online - https://director.mayadata.io
For help - https://help.mayadata.io
Join OpenEBS community — https://slack.openebs.io

Niladri
Niladri is a CSAT Solutions / Support Engineer at MayaData. He works with Kubernetes, Docker, and the rest of the stack down below.
Anupriya
I currently work as an Information Developer Intern at MayaData, I have started my contribution to Open-Source through Litmus and OpenEBS. I love traveling, reading books, meeting new people, Skating.
Saumya Sharma
Saumya is a Digital Marketing Specialist at MayaData.