I love Flink. I think it's an amazing product, with great documentation and community.

For readers who aren't familiar with Flink, it is a framework for computations over unbounded and bounded data streams. It runs in a distributed manner and designed to perform exceptionally at scale.
You can read more about Flink here.

I didn't think I would struggle with doing something pretty straightforward like deploying a job cluster on k8s, not to mention deploying it on k8s with file based high-availability configured, which will be covered in the next post.

TL;DR: GitHub repo

Just to be on the same page, let's explain what a job cluster is and how is it different from a session cluster.

Job VS Session Cluster

Session cluster is a long-running Flink cluster, executing the jobs submitted to it.
Job cluster on the other hand, is a Flink cluster that is dedicated to run a single predefined job, without job submission.

Why would you choose one over the other?

In my opinion, a session cluster is more suitable to a situation where you submit multiple short-running jobs, dealing with bounded data. The cluster's resources are shared for all the jobs running on it.
If you want to run a job that deals with unbounded data, this job is not intended to end, ever. You want to be able to upgrade the job and redeploy the cluster with the new job, instead of dealing with resubmitting jobs, hence a job cluster feels more appropriate.

Now, let's continue with our adventure (using Flink 1.9.2).

Kubernetes: Job or Deployment?

Flink, in their official example advices to use a kubernetes job for the job-manager. This makes no sense IMHO as you want your job manager to be a long running application and automatically restart and continue from where it stopped if the pod gets deleted.

This is why I decided to change the job to a deployment.

Probes

Probes is a useful feature in kubernetes that helps us makes sure the application is running.

With Flink it's pretty easy to configure a liveness probe by accessing the Flink dashboard ui.

You can find that in the jobmanager-ha.yaml yaml.

Another thing I didn't like was the fact configuration is passed to flink via the CLI in the k8s container arguments.
This is why I created a configmap and use it to set Flink's configuration, both for the job and task managers.
You can find the definition in the flink-configuration-ha.yaml file.

Web UI

I added a rest service to be able to access Flink's web ui.
You can find the definition in the jobmanager-rest-service.yaml file.


You can find my fully working example here.

Don't forget to remove the imagePullPolicy: Never and set a real image name in the job manager and task manager yamls to run it in a non-minikube environment.

In the next blog post I cover the details of deploying a highly available Flink job cluster on k8s without ZooKeeper, using a file-based high availability implementation.

Enjoy your Flink adventures!