github twitter linkedin stackoverflow soundcloud email
Parallelizing R code on Kubernetes
Aug 7, 2018
10 minutes read

Kubernetes who?

The hype around kubernetes is real, but likely also justified. Kubernetes is an open-source tool that facilitates deployment of jobs and services onto computer clusters. It provides different patterns for different type of workloads, be it API servers, databases or running batch jobs. Not only makes kubernetes running workloads and services easy, it also keeps them running.

At the core of the technology are containers, which kubernetes skillfully manages inside so-called pods. A pod represents a single instance of an application and contains one or sometimes more containers. Pods in turn live on worker nodes - actual servers - and are managed by a controller on the master node. We can interact with pods indirectly via instructions to controller.

Mark Edmondson has already written a fantastic blog post about different use cases for running R application inside kubernetes. I’ll dive into the one topic he didn’t expand upon: the parallel execution of R code on kubernetes.

I will similarly use GCP’s kubernetes engine to deploy my jobs, however all major cloud providers have similar offerings. It’s worth mentioning that Google provides 300$ worth of credit free to spend on any of their cloud products, so you can freely experiment without burning a hole in your pocket.

Single job with static parameters

The simplest use case of parallelization is running the same script over and over again, but in parallel instead of in a sequential order. A classic example is simulation, i.e. the random generation of numbers given a fixed set of parameters.

I am taking an example from Azure’s tutorial on running R code in parallel, simulating stock prices after a year (365 days) given a fixed value for standard deviation and average stock price movement per day.

mean_change = 1.001 
volatility = 0.01 
opening_price = 100 

getClosingPrice <- function(days) { 
        movement <- rnorm(days, mean=mean_change, sd=volatility) 
        path <- cumprod(c(opening_price, movement)) 
        closingPrice <- path[days] 

replicate(1000, getClosingPrice(365)) 

Let’s save the above script into an executable file, in our case monte-carlo.R, and write a minimal Dockerfile encapsulating the script. Remember kubernetes works with containers and can access them directly from Dockerhub.

FROM rocker/r-base
COPY monte-carlo.R ./

We build the image and upload it to Dockerhub using the docker command line tool.

# build image
docker build -t mtoto/mc-demo:latest .
# upload to docker hub
docker push mtoto/mc-demo:latest

Now comes the kubernetes bit in the form of a job.yaml file, that contains the instructions for the controller. Note that under spec: we specify the number of pods to run our job on in parallel (distribution of pods over nodes is handled by kubernetes), and the number of completions. Each pod picks up a single run and exists after the script has finished. By the end of this workload 100 pods have been created, run and terminated.

apiVersion: batch/v1
kind: Job
  name: static-demo
  parallelism: 10
  completions: 100
      name: static-example
        jobgroup: static-example
      - name: birthday
        image: mtoto/mc-demo
        command: ["Rscript", "monte-carlo.R"]
      restartPolicy: Never

With everything in place (R script, Dockerfile, .yaml file), we are ready to deploy our first job to kubernetes. Assuming you have enabled the relevant services in the google cloud console, downloaded the google cloud SDK and have kubectl installed, we can create our cluster and deploy our first the workload on GCP in the following way:

# create 4 node cluster "kubepar" on google kubernetes engine
gcloud container clusters create kubepar --machine-type n1-standard-1 --num-nodes 4
# get credentials to point kubectl to our cluster
gcloud container clusters get-credentials kubepar
# create job
kubectl create -f job.yaml

We can monitor the progress of our job using the command kubectl get pods, to see how many pods have successfully run.

Similarly we can look at the state of the nodes with kubectl get nodes or the overall status of the job with kubectl get jobs static-demo. For a more detailed output, substitute get with describe, such as kubectl describe pods.

Once the job has finished, we collect the output of our simulation from the logs of each pod and write it to a .txt file.

for p in $(kubectl get pods -l jobgroup=static-example -o name)
  kubectl logs $p >> output.txt

Reading the output into R we can plot the results:


Common template and multiple parameters using expansion

Moving on, we want to parallelize a script with different parameters at each run. Again, I am taking an example from a doAzureParallel tutorial where

… we calculate for a room of N people the probability that someone in the room shares a birthday with someone else in the room.

Below is the simulation script for 100.000 rooms where we supply the number of people in the room as a command line argument.

#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)
n <- as.double(args[1])

pbirthdaysim <- function(n) { 
        ntests <- 100000 
        pop <- 1:365 
        anydup <- function(i) 
                    sample(pop, n, replace=TRUE)))
        sum(sapply(seq(ntests), anydup)) / ntests 


Unlike before, we are not creating a single representation of our Job object in a .yaml file, but a Job template with placeholders. The Dockerfile is the same as before, except for the script. Don’t forget to build and upload the image before continuing.

apiVersion: batch/v1
kind: Job
  name: par-demo-$ITEM
      name: par-example
        jobgroup: par-example
      - name: birthday
        image: mtoto/birthday-demo
        command: ["Rscript", "birthday.R $ITEM"]
      restartPolicy: Never

Notice that we didn’t specify parallelization parameters nor the number of completions. It’s because we are going to expand the above template into 100 different job.yaml files, one for each run with a different n parameter ($ITEM in the .yaml fie) for the birthday simulation.

# create folder for jobs
mkdir jobs
# create job.yaml files
for i in {1..100}
  cat job.yaml | sed "s/\$ITEM/$i/" > ./jobs/job-$i.yaml

With the same command as before, we create all the jobs at once: kubectl create -f ./jobs. Kubernetes will automatically create, distribute and run our jobs in parallel across pods on the nodes of our cluster.

Using the same bash script as before, we can retrieve the output from each run and after read it into R.

Plotting the results, the probability that 2 or more people will have the same birthday is 99% after 60 people are in the room.

plot(probabiliy, xlab="People in room", 
     ylab="Probability of shared birthday")

Fine parallel processing using a work queue

In the previous example, we created all the jobs at once, which can overload the scheduler if the number of jobs is very large. A smarter approach is to create a work queue and let the pods pick them off one by one as they go along. Unlike before, each pod will work on multiple items until the queue is empty instead of creating a pod for each task.

To illustrate the last approach, we will parallelize the training of different regression models, a common use case for parallelization in R.

The function below takes the name of an algorithm, loads the dataset, creates a training set, runs a model using the caret package and finally uploads the result to google cloud storage as an .rds file. This way the work queue only needs to contain the names of the models to run.

# modeling function
run_save_model <- function(method) {
        # load pkgs and data
        # split data
        train_index <- createDataPartition(BostonHousing$medv,1, p = .7)
        train <- BostonHousing[train_index[[1]],]
        # train model
        model <- train(medv ~., 
                       data = train, 
                       method = method)
        # upload to storage bucket
        file <- sprintf("%s_model.rds", method)
        saveRDS(model, file)
                   name = file,
                   bucket = "bostonmodels")

Setting up Redis on kubernetes

We’ll be using Redis for the work queue, so we need an additional pod running Redis and a service so other pods can find it. The recipes for both are within redis-master.yaml and redis-service.yaml. Similarly to jobs, we can use kubectl create command to start the instances and then use the Redis command line tool to add the work items to the queue.

# create redis pod and redis service
kubectl create -f ./redis-pod.yaml
kubectl create -f ./redis-service.yaml
# create temporary interactive pod
kubectl run temp -i --rm --tty  --image redis --command "/bin/sh"
# initiate redis cli
redis-cli -h redis
# push items into queue named "test"
rpush test "lm" "rf" "gbm" "enet" "brnn" "bridge"
# doublecheck queue
lrange test 0 -1

On the consumer side, I re-implemented the Redis client from the official docs in R using the redux package. The file rediswq.R contains all the building blocks.

Giving access to google cloud storage from kubernetes

Before we could extract the output from the logs, now we will save the models as .rds files on cloud storage. For this, the containers running on our cluster need write access to our storage bucket.

Using GCP, we need to create a new service account inside our project and under Roles give it full access to cloud storage by selecting Storage Object Admin. Make sure to check the box for Furnish a new private key and click SAVE.

Back to the terminal, we can save our credentials as a Secret that will be directly accessible to the kubernetes engine.

# create secret named "gcs-key"
kubectl create secret generic gcs-key --from-file=key.json=PATH-TO-KEY-FILE.json

We’ll see how to use this secret in the job.yaml file shortly.

Worker program

Finally, we write a worker program that takes the work items from the Redis work queue and executes run_save_model(). While the pods have no knowledge of the number of work items in the queue, they notice when the queue is empty and will automatically terminate.


# connect to redis host
host <- Sys.getenv("REDIS_SERVICE_HOST")
db <- redis_init(host = host)

# authenticate gcs

print(paste0("Worker with sessionID: ", session))
print(paste0("Initial queue state: empty=", as.character(empty())))

while (!empty()) {
        item <- lease(lease_secs=10,
                        block = TRUE,
                        timeout = 2)
        if (!is.null(item)) {
                print(paste0("working on: ", item))
                # actual work
        } else {
          print("waiting for work")       
print("queue emtpy, finished")

Now that we have all the scripts in place, let’s not forget to build a Docker image and upload it to Dockerhub. The Dockerfie is going to be a bit longer this time given the numerous dependencies our program needs.

As for the .yaml file, it is very similar to what we have written before with the addition of mounting our Secret gcs-key as a volume so that the containers have access. We name this variable GCS_AUTH_FILE, which the googlegoogleCloudStorageR package looks for when loading the library to authenticate the client.

apiVersion: batch/v1
kind: Job
  name: fine-demo
  parallelism: 4
      name: fine-example
        jobgroup: fine-example
      - name: google-cloud-key
          secretName: gcs-key
      - name: c
        image: mtoto/ml-demo
        - name: google-cloud-key
          mountPath: /var/secrets/google
        - name: GCS_AUTH_FILE
          value: /var/secrets/google/key.json
        command: ["Rscript", "worker.R"]
      restartPolicy: OnFailure

Just like before, we hit kubectl create -f job.yaml to start the job and monitor the status of the 4 pods with kubectl get pods. You will notice that the pods don’t exit until the queue is finished. Once they are done working on one item they pick up the next one, saving additional overhead compared to the previous two approaches.

Trade-offs to keep in mind

Going from static workloads to setting up work queues that feed into the workers, we are introducing additional complexity. It’s not always a good thing, especially not if modifying existing applications is costly. We could’ve done parallel machine learning just as well using parameter expansion (the second approach).

On the other hand, having one Job object for each work item creates some overhead that a single Job object for all work items does not. Again, the difference will become more apparent the more work we have.

Lastly, the first two approaches create as many pods as work items, requiring less modification to existing code. With the last approach however each pod can process multiple items, which is a gain in efficiency.

Back to posts

comments powered by Disqus