In the last episode, we wrote some simple code to produce messages in Apache Kafka via a RESTful API, implemented using the amazing Flask framework. If you have followed the post and written some code yourself, you might now want to put it into action. We assume you have Kafka fired up and written and started up a simple Flask app as described before.

Testing Client

To give our setup a test drive, let’s write a simple Kafka consumer in [Python][python] using the pykafka package. For the sake of simplicity, let’s just consume all messages in the topic named “test”, like so:

from pykafka import KafkaClient

def get_kafka_client():
    return KafkaClient(hosts='127.0.0.1:9092')

if __name__ == '__main__':
    client = get_kafka_client()
    topicname = 'test'
    topic = client.topics[topicname]
    for i in topic.get_simple_consumer():
        print "Message:", i.value

Running this script should print out all messages that get written to our test topic until the script is terminated. You can also use the console consumer (and producer) included with Kafka, but let’s in the spirit of DIY not do that.

Producing Messages

To produce messages, we could write another Python script using pykafka, but this is not what we want. We wrote our Flask app such that we can produce messages using http. And being true hipsters, we will do it old school, using the console and cURL.

curl -POST 'http://127.00.1:5000/post/test' -H "Content-type: application/json" -d '{
"Hello": "World"
}'

This should trigger the output

Message: {"Hello": "World"}

from your client script.

Consuming With Flask

Now let’s do something more exciting than writing messages to the console. Let’s spit them out as HTML5 server-sent events. This way we’ll be able to consume them later in a webpage using javascript and ultimately create a live dashboard. This could be used to monitor temperatures of a machine or something like that. For this we need to add one more route to our Flask app.

@app.route('/topic/<topicname>')
def get_messages(topicname):
    client = get_kafka_client()
    def events():
        for i in client.topics[topicname.encode('ascii')].get_simple_consumer():
            yield 'data: {0}\n\n'.format(i.value)
    return Response(events(), mimetype="text/event-stream")

I’ll give you some time now to implement this and check in with you next time when we’ll write some javascript to consume our messages.