Use OpenEBS Underneath your Apache Kafka Brokers

I'm a Developer Advocate at MayaData, a company dedicated to enabling data agility through the use of Kubernetes as a data layer and the sponsor of the CNCF project OpenEBS and other open source projects. Take it for a test drive via free and easy to use management software by registering here.

Simplifying the use of Apache Kafka on Kubernetes

Sometimes you get a nice big chunk of data, all structured and ready to read in, process, and do something with. Sometimes. More commonly, your data comes in dribbles, bursts, and sometimes a fire-hose. So what do you do with this kind of streaming data? How do you make sure you’re processing efficiently and close to real-time, while not dropping data due to lack of capacity?

Well, you might want to look into a streaming pub/sub system like Apache Kafka. It works great inside of Kubernetes -- with the help of OpenEBS Dynamic LocalPVs -- and gives you a message bus that all of your components can talk on. With Apache Kafka, you can have data streaming in on one topic. You can consume that topic with a job that will categorize the bits of data and put them on different topics. You might have other jobs that process and create their own artifacts that they push to a topic, whatever you might like. And then, maybe you have a final topic you use to put all of the incoming data along with related data generated by your app components into your data lake.

Kafka

You can use it as a reliable message queue, but with multiple subscribers, data persistence, and the ability to run code in the data stream. Just throw some data at it and let Apache Kafka organize it for you and get the data where it needs to be. Because of its design, it’s capable of buffering for a while if you get a burst of data that comes in faster than your processors can process it. This turns out to be a big deal when you’re trying to keep your software components loosely coupled.

You can see that the code is pretty straightforward from this python example:

#!/usr/bin/env python

from time import sleep
from json import dumps
from kafka import KafkaProducer
import os

print('\n\nrunning on ', os.environ['MY_POD_NAME'], '\n')

producer = KafkaProducer(bootstrap_servers=['kafka-svc'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

for i in range(200):
    data = {'key' : 'string'}
    producer.send('random', value=data)
    print("sent ", i, "th clump of random data\n")
    sleep(.1)

That’s a pretty easy way to output data. In this example, ‘random’ is a topic name, and Python will dutifully put our sequential integer onto that Kafka topic for consumption by anyone who asks. It’s pretty much just as easy to catch it on the other end:

#!/usr/bin/env python

from kafka import KafkaConsumer
import os

print('\n\nrunning on ', os.environ['MY_POD_NAME'], '\n')

consumer = KafkaConsumer('random',
                         bootstrap_servers = 'kafka-svc',
                         api_version = (0, 10))

for m in consumer:
    print(m.value, "\n")

Apache Kafka can keep multiple replicas for resiliency

While our data is “in-flight”, Kafka can keep multiple replicas for resiliency. So, you can effectively use Kafka as a structured, buffered output for your program, and you can output more or less whatever you like, receive it on the other side. Kafka will keep it around until none of the consumers need it anymore. Um, where are we going to keep it?

Persistence in Kubernetes is also easy to solve with OpenEBS. You can install it on your cluster and create EBS style volumes using instance storage, use it to orchestrate existing disks on your nodes, and even use a local ZFS pool to create volumes in.

Cool. So, we have the need for Apache Kafka so our code can talk to other pieces of code as well as databases and other components (did I mention there’s a wealth of ready-to-use connectors out there?). We want it in Kubernetes because that’s where we’re running our app, and that way, it’s easy to manage. And we have OpenEBS to handle this new persistence requirement that Kafka brings with it.

So, let’s spin up a test cluster in our favorite cloud and see what we can do with this piece of technology.
  1. Set up a cluster with some nodes that have local ssds, like aws i3 nodes.
    $ kubectl get nodes
    NAME                                         STATUS   ROLES    AGE     VERSION
    ip-172-20-34-74.us-east-2.compute.internal   Ready    node     6d18h   v1.14.10
    ip-172-20-45-3.us-east-2.compute.internal    Ready    node     6d18h   v1.14.10
    ip-172-20-53-19.us-east-2.compute.internal   Ready    node     6d18h   v1.14.10
    ip-172-20-59-66.us-east-2.compute.internal   Ready    node     6d18h   v1.14.10
    ip-172-20-63-65.us-east-2.compute.internal   Ready    master   6d18h   v1.14.10
    
  2. Install OpenEBS on the cluster (you can do this right from OpenEBS Director now)
    Install OpenEBS on the cluster
  3. Use OpenEBS to make a new StorageClass that targets that local ssd, or just use `openebs-device.’ I’ll use ZFS LocalPV in this example:
    ---
    apiVersion: extensions/v1beta1
    kind: DaemonSet
    metadata:
      namespace: openebs
      name: system-setup
      labels:
        openebs.io/component-name: system-setup
        openebs.io/version: 1.4.0
      annotations:
        command: &cmd sed -i 's/main/main contrib/' /etc/apt/sources.list ; apt -y install ; apt -y update ; apt -y install open-iscsi zfsutils-linux; sudo sh -c 'systemctl enable iscsid ; sudo systemctl start iscsid ; sudo systemctl status iscsid ; for d in /dev/nvme0n1; do umount $d ; wipefs -f -a $d ; lsblk ; done; modprobe zfs ; zpool create zfspv-pool /dev/nvme0n1 ; zfs list'
    spec:
      selector:
        matchLabels:
          openebs.io/component-name: system-setup
      template:
        metadata:
          labels:
            openebs.io/component-name: system-setup
        spec:
          hostNetwork: true
          initContainers:
          - name: init-node
            command:
              - nsenter
              - --mount=/proc/1/ns/mnt
              - --
              - sh
              - -c
              - *cmd
            image: alpine:3.7
            securityContext:
              privileged: true
          hostPID: true
          containers:
          - name: wait
            image: k8s.gcr.io/pause:3.1
          hostPID: true
          hostNetwork: true
          tolerations:
          - effect: NoSchedule
            key: node-role.kubernetes.io/master
    ---
    apiVersion: storage.k8s.io/v1
    kind: StorageClass
    metadata:
      name: openebs-zfs
    allowVolumeExpansion: true
    parameters:
      fstype: "zfs"
      poolname: "zfspv-pool"
    provisioner: zfs.csi.openebs.io
    volumeBindingMode: WaitForFirstConsumer
    
  4. Install Zookeeper and Kafka (we like the KUDO operator for it) using that StorageClass
    $ kubectl get pods -l heritage=kudo
    NAMEkafka-kafka-0                    2/2     Running   0          11m
    kafka-kafka-1                    2/2     Running   0          11m
    kafka-kafka-2                    2/2     Running   0          11m
    zookeeper-instance-zookeeper-0   1/1     Running   0          11m
    zookeeper-instance-zookeeper-1   1/1     Running   0          11m
    zookeeper-instance-zookeeper-2   1/1     Running   0          11m
    $ kubectl get pvc
    NAME                                                        STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
    kafka-datadir-kafka-kafka-0                                 Bound    pvc-9e408ab2-8bd8-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    kafka-datadir-kafka-kafka-1                                 Bound    pvc-b24f6bac-8bd8-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    kafka-datadir-kafka-kafka-2                                 Bound    pvc-c682fbd7-8bd8-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    zookeeper-instance-datadir-zookeeper-instance-zookeeper-0   Bound    pvc-ef99ec6c-8bd7-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    zookeeper-instance-datadir-zookeeper-instance-zookeeper-1   Bound    pvc-efa0ca56-8bd7-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    zookeeper-instance-datadir-zookeeper-instance-zookeeper-2   Bound    pvc-efaaf239-8bd7-11ea-b4f1-060d94d0ae04   5Gi        RWO            openebs-zfs    11m
    
    Install Zookeeper & Kafka
  5. Set up the Grafana dashboard and kick the tires on it a bit.

You can get more detail on these steps here.

Check back in this space for more information on Kafka in the future.

Important Links

Access Director online - https://director.mayadata.io
For help - https://help.mayadata.io
Join OpenEBS community — https://slack.openebs.io

Shubham Chaudhary
Contributor to LitmusChaos at MayaData
Udit Gaurav
ChaosEngineer at MayaData
Uma Mukkara
Uma Mukkara is the co-founder and COO at MayaData. At MayaData co-created two open source projects OpenEBS and LitmusChaos at MayaData. Uma is also a maintainer of the LitmusChaos project. Uma's interests include research and contributions to the areas of cloud-native data management and cloud-native chaos engineering. Uma holds a Masters degree in Telecommunications and software engineering from Illinois Institute of Technology, Chicago and a bachelor’s degree in Communications from S.V.University, Tirupati, India.