Kubernetes Task Runner

Run tasks as Kubernetes pods.

Overview

This plugin is available only in the Enterprise Edition (EE) and Kestra Cloud. The task runner is container-based, so the containerImage property must be set. To access the task’s working directory, use either the {{workingDir}} Pebble expression or the WORKING_DIR environment variable. Input files and namespace files will be available in this directory.

To generate output files you can:

  • Use the outputFiles property of the task and create a file with the same name in the task’s working directory, or
  • Create any file in the output directory, which can be accessed with the {{outputDir}} Pebble expression or the OUTPUT_DIR environment variable.

When the Kestra Worker running this task is terminated, the pod continues until completion. After restarting, the Worker will resume processing on the existing pod unless resume is set to false.

If your cluster is configured with RBAC, the service account running your pod must have the following authorizations:

  • pods: get, create, delete, watch, list
  • pods/log: get, watch
  • pods/exec: get, watch

Here is an example role that grants these authorizations:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: task-runner
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "create", "delete", "watch", "list"]
- apiGroups: [""]
resources: ["pods/exec"]
verbs: ["get", "watch"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get", "watch"]

How to use the Kubernetes task runner

The Kubernetes task runner executes tasks in a specified Kubernetes cluster. It is useful to declare resource limits and resource requests.

Here is an example of a workflow with a task running Shell commands in a Kubernetes pod:

id: kubernetes_task_runner
namespace: company.team
description: |
To get the kubeconfig file, run: `kubectl config view --minify --flatten`.
Then, copy the values to the configuration below.
Here is how Kubernetes task runner properties (on the left) map to the kubeconfig file's properties (on the right):
- clientKeyData: client-key-data
- clientCertData: client-certificate-data
- caCertData: certificate-authority-data
- masterUrl: server e.g. https://docker-for-desktop:6443
- oauthToken: token if you are using OAuth (GKE/EKS)
inputs:
- id: file
type: FILE
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
inputFiles:
data.txt: "{{ inputs.file }}"
outputFiles:
- "*.txt"
containerImage: centos
taskRunner:
type: io.kestra.plugin.ee.kubernetes.runner.Kubernetes
config:
clientKeyData: client-key-data
clientCertData: client-certificate-data
caCertData: certificate-authority-data
masterUrl: server e.g. https://docker-for-desktop:6443
commands:
- echo "Hello from a Kubernetes task runner!"
- cp data.txt out.txt

File handling

If your script task has inputFiles or namespaceFiles configured, an init container will be added to upload files into the main container.

Similarly, if your script task has outputFiles configured, a sidecar container will be added to download files from the main container.

All containers will use an in-memory emptyDir volume for file exchange.

Failure scenarios

If a task is resubmitted (e.g., due to a retry or a Worker crash), the new Worker will reattach to the already running (or an already finished) pod instead of starting a new one.

Specifying resource requests for Python scripts

Some Python scripts may require more resources than others. You can specify the resources required by the Python script in the resources property of the task runner.

id: kubernetes_resources
namespace: company.team
tasks:
- id: python_script
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
taskRunner:
type: io.kestra.plugin.ee.kubernetes.runner.Kubernetes
namespace: default
pullPolicy: ALWAYS
config:
username: docker-desktop
masterUrl: https://docker-for-desktop:6443
caCertData: xxx
clientCertData: xxx
clientKeyData: xxx
resources:
request: # The resources the container is guaranteed to get
cpu: "500m" # Request 1/2 of a CPU (500 milliCPU)
memory: "128Mi" # Request 128 MB of memory
outputFiles:
- "*.json"
script: |
import platform
import socket
import sys
import json
from kestra import Kestra
print("Hello from a Kubernetes runner!")
host = platform.node()
py_version = platform.python_version()
platform = platform.platform()
os_arch = f"{sys.platform}/{platform.machine()}"
def print_environment_info():
print(f"Host's network name: {host}")
print(f"Python version: {py_version}")
print(f"Platform info: {platform}")
print(f"OS/Arch: {os_arch}")
env_info = {
"host": host,
"platform": platform,
"os_arch": os_arch,
"python_version": py_version,
}
Kestra.outputs(env_info)
filename = "environment_info.json"
with open(filename, "w") as json_file:
json.dump(env_info, json_file, indent=4)
if __name__ == '__main__':
print_environment_info()

Using plugin defaults to avoid repetition

You can use pluginDefaults to avoid repeating the same configuration across multiple tasks. For example, set the pullPolicy to ALWAYS for all tasks in a namespace:

id: k8s_taskrunner
namespace: company.team
tasks:
- id: parallel
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: run_command
type: io.kestra.plugin.scripts.python.Commands
containerImage: ghcr.io/kestra-io/kestrapy:latest
commands:
- pip show kestra
- id: run_python
type: io.kestra.plugin.scripts.python.Script
containerImage: ghcr.io/kestra-io/pydata:latest
script: |
import socket
ip_address = socket.gethostbyname(hostname)
print("Hello from AWS EKS and kestra!")
print(f"Host IP Address: {ip_address}")
pluginDefaults:
- type: io.kestra.plugin.scripts.python
forced: true
values:
taskRunner:
type: io.kestra.plugin.ee.kubernetes.runner.Kubernetes
namespace: default
pullPolicy: ALWAYS
config:
username: docker-desktop
masterUrl: https://docker-for-desktop:6443
caCertData: |-
placeholder
clientCertData: |-
placeholder
clientKeyData: |-
placeholder

Guides

Below are a number of guides to help you set up the Kubernetes Task Runner on different platforms.

Google Kubernetes Engine (GKE)

Before you begin

Before you start, you need to have the following:

  1. A Google Cloud account.
  2. A Kestra instance in version 0.18.0 or later with Google credentials stored as secrets or environment variables within the Kestra instance.

Set up Google Cloud

Inside Google Cloud, you’ll need to do the following:

  1. Create and select a project
  2. Create a GKE Custer
  3. Enable Kubernetes Engine API
  4. Setup gcloud CLI with kubectl
  5. Create a Service Account

Creating a flow

Here’s an example flow using the Kubernetes Task Runner with GKE. To authenticate, you need to use OAuth with a service account.

id: gke_task_runner
namespace: company.team
tasks:
- id: metadata
type: io.kestra.plugin.gcp.gke.ClusterMetadata
clusterId: kestra-dev-gke
clusterZone: "europe-west1"
clusterProjectId: kestra-dev
- id: auth
type: io.kestra.plugin.gcp.auth.OauthAccessToken
- id: pod
type: io.kestra.plugin.scripts.shell.Commands
containerImage: ubuntu
commands:
- echo "Hello from a Kubernetes task runner!"
taskRunner:
type: io.kestra.plugin.ee.kubernetes.runner.Kubernetes
namespace: default
config:
caCertData: "{{ outputs.metadata.masterAuth.clusterCertificat }}"
masterUrl: "https://{{ outputs.metadata.endpoint }}"
oauthToken: "{{ outputs.auth.accessToken['tokenValue'] }}"

You’ll need to use the gcloud CLI tool to get the credentials such as masterUrl and caCertData. You can do this by running the following command:

gcloud container clusters get-credentials clustername --region myregion --project projectid

You’ll need to update the following arguments above with your own values:

  • clusterId is the name of your cluster.
  • clusterZone is the region your cluster is in, e.g., europe-west2.
  • clusterProjectId is the ID of your Google Cloud project.

Once you’ve run this command, you can now access your config with kubectl config view --minify --flatten, so you can replace caCertData, masterUrl and username.

Amazon Elastic Kubernetes Service (EKS)

Here’s an example flow using the Kubernetes Task Runner with AWS EKS. To authenticate, you need an OAuth token.

id: eks_task_runner
namespace: company.team
tasks:
- id: shell
type: io.kestra.plugin.scripts.shell.Commands
containerImage: centos
taskRunner:
type: io.kestra.plugin.ee.kubernetes.runner.Kubernetes
config:
caCertData: "{{ secret('certificate-authority-data') }}"
masterUrl: https://xxx.xxx.region.eks.amazonaws.com
username: arn:aws:eks:region:xxx:cluster/cluster_name
oauthToken: xxx
commands:
- echo "Hello from a Kubernetes task runner!"