Flink Job Cluster on Kubernetes - File Based High Availability

How to achieve high availability on Kubernetes without using ZooKeeper by utilizing a custom, file-based high availability implementation

Flink Job Cluster on Kubernetes - File Based High Availability

In my previous post, I explained a bit about Flink and the difference between a job and session clusters. In addition, I showed how to deploy a job cluster in a way that works best in my opinion.

In this blog post, I will talk about how to achieve high availability on Kubernetes without using ZooKeeper by utilizing a custom, file-based high availability implementation. You can find the implementation here.

When running Flink on Kubernetes I think we should strive to use the powers Kubernetes gives us. One of them is ReplicaSet, which gives us the ability to deploy a pod with specified replicas and keep this number of pods up, even if a node fails.

Flink uses ZooKeeper to support job manager(s) high availability. In case a job manager fails, a new one can be started and become the leader.
With Kubernetes pod scheduling, we don't need ZooKeeper to manage job manager high availability.

By using StatefulSet for the job manager and Deployment for the task managers, we make use of ReplicaSet behind the scenes, hence making sure our managers will stay up even without ZooKeeper.

How to Implement a Custom High Availability Service?

To implement our custom HA service we need to implement a few things:

Leader Retrievers and Election Services

This tells flink how to elect a job manager to be the leader and where to retrieve the leader from.

In our case, when we have one job manager and he's always the leader we just tell flink to always choose the same leader, without election. This is why I used StandaloneLeaderElectionService for all the election services.
From the documentation: "The standalone implementation assumes that there is only a single LeaderContender and thus directly grants him the leadership upon start up".

For leader retrievers, I used StandaloneLeaderRetrievalService with the relevant constant address we can use thanks to the Kubernetes services we deploy.
From the documentation: "This implementation assumes that there is only a single contender for leadership (e.g., a single JobManager or ResourceManager process) and that this process is reachable under a constant address."

Checkpoint Recovery Factory

This is a factory that provides 2 classes that allow Flink to save checkpoints, retrieve checkpoints and keep a count of checkpoints.

In our case, both the checkpoints and the counter are saved on disk. You can find the implementations in FsCheckpointRecoveryFactory.java and FsCheckpointIDCounter.java.

Submitted Job Graph Store

This provides a way to save the job graphs and retrieve them.
As we are running a job cluster, we have only one job. This means we can always provide the same job graph.

Flink has a class that helps us with that, SingleJobSubmittedJobGraphStore. You can find the implementation here.

Running Jobs Registry

Documentation explains the need for that component quite well: "This registry is used in highly-available setups with multiple master nodes, to determine whether a new leader should attempt to recover a certain job (because the job is still running), or whether the job has already finished successfully (in case of a finite job) and the leader has only been granted leadership because the previous leader quit cleanly after the job was finished."

I used the provided FsNegativeRunningJobsRegistry class.
Its documentation can be found here and implementation here.

Kubernetes Setup

To make all of this work, we need to make a few adjustments in our k8s YAMLs.

  • Change our Deployment to StatefulSet. This tells Kubernetes that the pods are stateful and the we have a persistent volume attached to each one of them (currently one).
  • We want to make sure our shared volume is accessible by the user Flink runs with. To achieve that I added an init container that changes the ownership of that directory.
  • We add a StorageClass and a PersistentVolume to be able to mount a volume to our pods. We mount the shared volume at /flink-shared.

A few things we need to change in our flink configuration to utilize our brand new HA service:

  • Change our high-availability to our factory class
    high-availability: com.ronlut.flinkjobcluster.filesystemha.SingleFsHaServicesFactory
  • Set the HA storage dir to our mounted persistent volume
    high-availability.storageDir: file:///flink-shared/ha
  • Set up the backend state storage to filesystem
    state.backend: filesystem
  • Set the checkpoints and savepoints dir to the a shared directory that is mounted to the job and task managers
    state.checkpoints.dir: file:///flink-shared/checkpoints and state.savepoints.dir: file:///flink-shared/savepoints

Complete config file can be found here.


Final Notes

  • In this example I used aws-ebs in the StorageClass to show how this will work in a cloud environment. Change it to the equivalent provisioner for your cloud provider
  • If you are using EBS, job manager and task managers must be on same node as EBS can't be mounted to more than one node.
    This is achieved by setting affinity on the task manager deployment pods.
    This might not be needed in other cloud providers, depending on your storage.

Full working example can be found here.

Let me know if you have any questions or something is missing.