Tamas Szilagyi

Coding with Data

github twitter linkedin stackoverflow soundcloud email
Lightweight streaming analytics with NATS
Oct 2, 2018
8 minutes read

Go in the fast lane

Fast data is the new big data. But how difficult is it really to set up a complete streaming analytics solution from the ground up? It turns out not that hard, not if you are using NATS Streaming. Developed in Go

“…NATS Streaming is an extremely performant, lightweight reliable streaming platform built on NATS.”

I have been wanting to experiment with Go for some time, so building a toy service on top NATS Streaming seemed like a interesting project to start with. To get familiar with the basics, I worked through about two thirds of the Tour of Go - a dope interactive introduction to the fundamentals of the language. At this point, I figured I know enough for the fifty or so lines of Go code I was about to write and headed for the IDE. What I had in mind was the classic streaming analytics demo: The real-time Twitter dashboard. After some initial research I was able to break down the task at hand into 4 subtasks:

  1. Communicate with the Twitter Streaming API.
  2. Ingest tweets into NATS Streaming.
  3. Provision a MySQL database where the tweets will be written to.
  4. Create a Shiny App as a (near) real-time NLP dashboard.

To ensure everything is working as expected I’ll use Docker containers in conjunction with Docker Compose as the orchestration tool.

About NATS Streaming

Undeniably, Kafka is the most widely used streaming solution right now. But is it the only option out there? Is it even the best option? It depends, of course. But if you prefer a lightweight footprint and simplicity without sacrificing performance, NATS is very, very hard to beat. NATS Streaming is a service layer on top the original NATS framework. The latter was originally conceived as a distributed messaging system with few guarantees, but blazing fast performance. NATS Streaming extends the original framework through the introduction of at-least-once delivery, durable storage, message replay and a couple other enhanced quality of service features.

The central piece is the NATS (Streaming) Server. It manages subscriptions on specific subjects and handles communications between clients. Once the server is up and running, we can create and publish messages unto subjects, and on the receiving end subscribe to them.

Installing the package creates an executable nats-streaming-server.go that we can run to start the server.

[12505] 2018/10/01 11:53:13.037897 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.11.0
[12505] 2018/10/01 11:53:13.038015 [INF] STREAM: ServerID: iDV733mTgjWieVayqCLiG2
[12505] 2018/10/01 11:53:13.038022 [INF] STREAM: Go version: go1.11
[12505] 2018/10/01 11:53:13.038880 [INF] Starting nats-server version 1.3.0
[12505] 2018/10/01 11:53:13.038894 [INF] Git commit [not set]
[12505] 2018/10/01 11:53:13.039199 [INF] Listening for client connections on 0.0.0.0:4222
[12505] 2018/10/01 11:53:13.039208 [INF] Server is ready
[12505] 2018/10/01 11:53:13.068118 [INF] STREAM: Recovering the state...
[12505] 2018/10/01 11:53:13.068165 [INF] STREAM: No recovered state
[12505] 2018/10/01 11:53:13.320178 [INF] STREAM: Message store is MEMORY
[12505] 2018/10/01 11:53:13.320295 [INF] STREAM: ---------- Store Limits ----------
[12505] 2018/10/01 11:53:13.320305 [INF] STREAM: Channels:                  100 *
[12505] 2018/10/01 11:53:13.320312 [INF] STREAM: --------- Channels Limits --------
[12505] 2018/10/01 11:53:13.320320 [INF] STREAM:   Subscriptions:          1000 *
[12505] 2018/10/01 11:53:13.320329 [INF] STREAM:   Messages     :       1000000 *
[12505] 2018/10/01 11:53:13.320337 [INF] STREAM:   Bytes        :     976.56 MB *
[12505] 2018/10/01 11:53:13.320343 [INF] STREAM:   Age          :     unlimited *
[12505] 2018/10/01 11:53:13.320349 [INF] STREAM:   Inactivity   :     unlimited *
[12505] 2018/10/01 11:53:13.320356 [INF] STREAM: ----------------------------------

By default, NATS Streaming persists the messages as files. It suffices to start publishing messages to a subject, and they will be saved to memory. The other option is a MySQL database. Using the flags --store, --sql_driver and --sql_source when starting the nats-streaming-server, we can configure access to the database, or alternatively supply a .conf file.

Once the server is up and running, we can create subjects and publish messages. A simple worker program written in Go that ingests data from the Twitter Streaming API, and funnels it into the database using NATS Streaming, is barely ~50 lines of code.

package main

import (
    "log"
    "os"
    "stream/keys"

    "github.com/dghubble/go-twitter/twitter"
    "github.com/dghubble/oauth1"
    stan "github.com/nats-io/go-nats-streaming"
)

func main() {
    var err error
    word := os.Getenv("TWITTER")  // Get word to filter Twitter stream on 

    config := oauth1.NewConfig(keys.Key, keys.Secret)
    token := oauth1.NewToken(keys.Token, keys.TokenSecret)
    httpClient := config.Client(oauth1.NoContext, token)

    // Twitter client
    twitterClient := twitter.NewClient(httpClient)
    // Nats client
    natsClient, err := stan.Connect("test-cluster", "test",
        stan.NatsURL("nats://nats:4222"))
    if err != nil {
        log.Fatal(err)
    }
    
    // Convenience Demux demultiplexed stream messages
    demux := twitter.NewSwitchDemux()
    demux.Tweet = func(tweet *twitter.Tweet) {
        natsClient.Publish(word, []byte(tweet.Text))
    }

    // Filter parameters for Twitter stream
    filterParams := &twitter.StreamFilterParams{
        Track:         []string{word},
        StallWarnings: twitter.Bool(true),
        Language:      []string{"en"},
    }
    
    stream, err := twitterClient.Streams.Filter(filterParams)
    if err != nil {
        log.Fatal(err)
    }
    for message := range stream.Messages {
        demux.Handle(message)
    }
}

That’s it. Of course this is only two pieces of the puzzle. We still need to provision a SQL database for the message store and build a Shiny App to munge and visualize the data.

Meet the architect: Docker Compose

Instead of creating the remaining services one by one and linking them up, it is better to deploy this mini infrastructure in its entirety. With Docker containers, we can package each piece with all its dependencies. Using Docker Compose, we can configure how the containers should work in tandem and communicate with each other if needed.

With a Docker image for everything nowadays, our Dockerfiles won’t be long. Adding a couple of environment variables here or installing additional package there, most configurations will already be taken care of by the read-only layers of the base images.

Our infra consists of 4 containers: One for the MySQL database, the NATS streaming server, the NATS worker that will publish the messages and finally the Shiny app. A couple of pointers with regards to the docker-compose.yml file below:

  • build: context: parameter is the location of the Dockerfile.
  • restart: always is set because services aren’t booted in sequence despite dependencies.
  • ports: will be shared among services, and also exposed to the outside world.
  • environment: - TWITTER=${TWITTER} ensures that the $TWITTER environment variable (as defined in the .env file) is available for all.

Without further ado, this is what the final docker-compose.yml file looks like:

version: "3"

services:
  db:
    build:
      context: "./db"
    restart: always
    command: --default-authentication-plugin=mysql_native_password
    ports:
      - "3306"
  nats:
    image: nats-streaming:latest
    restart: always
    depends_on:
      - db
    command: -m 8222 --store SQL --sql_driver mysql --sql_source "root:pwd@tcp(db:3306)/nss_db"
    ports:
      - "4222"
      - "8222:8222"
  nats-worker:
    build:
      context: "./nats"
    environment:
    - TWITTER=${TWITTER}
    restart: always
    entrypoint: /go/main
    depends_on:
      - nats
  shiny:
    build:
      context: "./shiny"
    environment:
    - TWITTER=${TWITTER}
    ports:
      - "80:3838"
    depends_on:
      - db

To build all the images we can use docker-compose build; the command to spin up the services is docker-compose -f docker-compose.yml up. Similarly to stop the containers we have docker-compose stop and docker-compose rm -fv to get rid of the stopped containers.

Shiny Apps and streaming data

I have written about Shiny Apps and how to containerize them before, so I will only briefly touch upon dealing with real-time data here. As we have seen before, NATS Streaming is continuously dumping new data into our MySQL database according to a predefined schema. On the R side it turns out we have pretty sweet tools for dealing with databases, such as pool and DBI. Specifically for shiny apps, there is also a function called shiny::reactivePoll() that:

“..create a reactive data source, which works by periodically polling a non-reactive data source.”

Two of the required arguments are functions. One to check whether some value in our database has been updated, and if so, one to pull the updated data from the database. The other two required arguments are the number of milliseconds to wait between checks, and the user session.

This is the relevant bit from the shiny app:

library(pool)
library(DBI)
library(shiny)
library(anytime)
library(tidytext)

pool <- dbPool(
        drv = RMySQL::MySQL(),
        dbname = "nss_db",
        host = "db", 
        port = 3306,
        username = "nss",
        password = "password" 
)

data <- reactivePoll(1000, session,
             # This function returns the latest timestamp from the DB
             checkFunc = function() {
                     pool %>% tbl("Messages") %>%
                             summarise(max_time = max(timestamp, na.rm = TRUE)) %>%
                             collect() %>%
                             unlist()
                     
             },
             # This function returns a data.frame ready for text mining
             valueFunc = function() {
                     pool %>% tbl("Messages") %>%
                             filter(!data %like% "%http%") %>% 
                             arrange(-timestamp) %>%
                             head(20000) %>%
                             collect() %>%
                             mutate(data = gsub("[^[:alnum:][:space:]]","",data)) %>%
                             unnest_tokens(word, data) %>%
                             anti_join(stop_words) %>% 
                             mutate(timestamp = anytime(timestamp/1e+9)) %>%
                             inner_join(get_sentiments("bing")) 
                   
             }
        )

After establishing the pool connection, it is used to check whether the latest timestamp is different from the previous one. If that’s the case, we pull the last 20.000 tweets from the database, collect it as an R data.frame and transform it using the tidytext package.

Postscript

The app I have outlined in this post is currently live on stream.tamaszilagyi.com plotting a few metrics for tweets containing the word “trump”, for demonstration purposes. It is running on a small Linux VM on Azure so don’t be intimidated by slow load times. I only have so much free Azure credit.

With minor modifications though, we could deploy our containers onto a cluster of computers and scale the crap out of this little streaming service. Such is the beauty of cloud resources and using cloud-native technologies like Docker and NATS.

As always, all the code is on my GitHub, including instructions on how to try it for yourself.



Back to posts


comments powered by Disqus