Dynamic Flows
Implement dynamic flows in Kestra.
In this guide, we will look at ways to implement dynamic flows in Kestra.
Dynamic Flow using Inputs
In this method, we will create a flow as a template, and the dynamic values in the template can then be filled using Kestra inputs to generate the desired flow. Let us see this with the help of an example.
Here, we will create a sample flow that downloads CSV data using the HTTP Download task and then loads the data to a PostgreSQL table. Such a dynamic flow can be helpful when you have new HTTP URLs getting generated on a regular cadence, and you need to pull in the latest data from the new HTTP URL to upload to a table.
The flow will take the HTTP URL and the PostgreSQL database connection details as inputs. This leads to a dynamic flow, as the same flow can then be utilized with different HTTP URLs and different PostgreSQL databases and tables.
id: dynamic_flownamespace: company.team
inputs: - id: http_url type: STRING defaults: "https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv"
- id: postgres_host type: STRING defaults: "localhost"
- id: postgres_port type: STRING defaults: "5432"
- id: postgres_db type: STRING defaults: "postgres"
- id: postgres_table type: STRING
- id: postgres_username type: STRING
- id: postgres_password type: STRING
tasks: - id: http_download type: io.kestra.plugin.core.http.Download uri: "{{ inputs.http_url }}"
- id: copyin type: io.kestra.plugin.jdbc.postgresql.CopyIn url: "jdbc:postgresql://{{ inputs.postgres_host }}:{{ inputs.postgres_port }}/{{ inputs.postgres_db }}" username: "{{ inputs.postgres_username }}" password: "{{ inputs.postgres_password }}" format: CSV from: "{{ outputs.http_download.uri }}" table: "{{ inputs.postgres_table }}" header: true
As can be seen from the above flow, it is dynamic as all its important parameters are controlled via inputs.
Dynamic Flow using Code
We can write code in any language to generate the dynamic flow, and then upload the flow to Kestra. Let us understand this with the help of an example.
We will create a dynamic flow using python which downloads a CSV file using the HTTP Download task and upload the contents into PostgreSQL table. Say, we want to extract the data from multiple HTTP URLs and upload the data to corresponding a PostgreSQL table. We can, in parallel, start the process of downloading the data from HTTP URL and uploading it to PostgreSQL table. For two items, products and orders, this is how our flow should look like:
id: dynamic_flownamespace: company.teamtasks: - id: parallel type: io.kestra.plugin.core.flow.Parallel tasks: - id: sequential_task_0 type: io.kestra.plugin.core.flow.Sequential tasks: - id: http_download_0 type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/products.csv - id: postgres_upload_0 type: io.kestra.plugin.jdbc.postgresql.CopyIn url: jdbc:postgresql://{{ kv('postgres_host', 'company.infra') }}:{{ kv('postgres_port', 'company.infra') }}/{{ kv('postgres_db', 'company.infra') }} username: "{{ secret('POSTGRES_USERNAME') }}" password: "{{ secret('POSTGRES_PASSWORD') }}" format: CSV from: '{{ outputs.http_download_0.uri }}' table: products header: true - id: sequential_task_1 type: io.kestra.plugin.core.flow.Sequential tasks: - id: http_download_1 type: io.kestra.plugin.core.http.Download uri: https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv - id: postgres_upload_1 type: io.kestra.plugin.jdbc.postgresql.CopyIn url: jdbc:postgresql://{{ kv('postgres_host', 'company.infra') }}:{{ kv('postgres_port', 'company.infra') }}/{{ kv('postgres_db', 'company.infra') }} username: "{{ secret('POSTGRES_USERNAME') }}" password: "{{ secret('POSTGRES_PASSWORD') }}" format: CSV from: '{{ outputs.http_download_1.uri }}' table: orders header: true
In order to generate this flow dynamically for any number of items, we will use the following python code.
import osfrom ruamel.yaml import YAML
# Get the items from the environment variable and split it by comma(,)items = os.getenv('EXTRACT_ITEMS', "products,orders").split(",")
def http_download_task(idx, item): """Create HTTP Download task based on the item. The task id and uri will get dynamically generated based on `idx` and `item` respectively. """ return { "id": f"http_download_{str(idx)}", "type": "io.kestra.plugin.core.http.Download", "uri": f"https://huggingface.co/datasets/kestra/datasets/raw/main/csv/{item}.csv" }
def postgres_upload_task(idx, item): """Create postgres CopyIn task to upload the data from CSV to the corresponding postgres table. """ return { "id": f"postgres_upload_{str(idx)}", "type": "io.kestra.plugin.jdbc.postgresql.CopyIn", "url": "jdbc:postgresql://" + "{{ kv('postgres_host', 'company.infra') }}:{{ kv('postgres_port', 'company.infra') }}/{{ kv('postgres_db', 'company.infra') }}", "username": "{{ secret('POSTGRES_USERNAME') }}", "password": "{{ secret('POSTGRES_PASSWORD') }}", "format": "CSV", "from": "{{ outputs.http_download_" + str(idx) + ".uri }}", "table": f"{item}", "header": True }
def create_sequential_task(idx, task_list): """Create Sequential task for every item which will have two tasks: 1. Download the CSV data using HTTP Download task 2. Upload the CSV file into the corresponding postgres table using CopyIn task """ return { "id": f"sequential_task_{str(idx)}", "type": "io.kestra.plugin.core.flow.Sequential", "tasks": task_list }
tasks_per_item = []
# Iterate over the items and generate Sequential task for each item, and append it to `tasks_per_item`for idx, item in enumerate(items): sequential_tasks = [] sequential_tasks.append(http_download_task(idx, item)) sequential_tasks.append(postgres_upload_task(idx, item)) sequential_task = create_sequential_task(idx, sequential_tasks) tasks_per_item.append(sequential_task)
# Generate the dynamic flowkestra_flow = { "id": os.getenv('FLOW_ID', "postgres_upload_flow"), "namespace": os.getenv('FLOW_NAMESPACE', "company.team"), "tasks": [ { "id": "parallel", "type": "io.kestra.plugin.core.flow.Parallel", "tasks": tasks_per_item } ]}
yaml = YAML()yaml.indent(mapping=2, sequence=4, offset=2)yaml.preserve_quotes = True
# Write the generated dynamic flow in yaml format in `kestra_flow.yaml` fileoutput_path = "kestra_flow.yaml"with open(output_path, "w") as f: yaml.dump(kestra_flow, f)
The above python code will generate a dynamic flow with multiple Sequential tasks that will download the data from an HTTP URL and upload the CSV into the corresponding PostgreSQL table. You can write the above code in a namespace file, say dynamic_flow.py
.
Next, we will write a Kestra flow that will invoke the dynamic_flow.py
python script and load the generated dynamic flow into Kestra.
id: generate_dynamic_flownamespace: company.team
inputs: - id: flow_id type: STRING description: Name for the dynamic flow to be created defaults: dynamic_flow
- id: flow_namespace type: STRING description: Namespace in which the dynamic flow is to be created defaults: company.team
- id: extract_items type: STRING description: Comma separated list of items to be extracted defaults: products,orders
- id: kestra_host type: STRING description: Your Kestra hostname defaults: "localhost:8080"
tasks: - id: generate_kestra_flow type: io.kestra.plugin.scripts.python.Commands env: FLOW_ID: "{{ inputs.flow_id }}" FLOW_NAMESPACE: "{{ inputs.flow_namespace }}" EXTRACT_ITEMS: "{{ inputs.extract_items }}" beforeCommands: - pip install -q ruamel.yaml namespaceFiles: enabled: true inputFiles: script.py: "{{ read('dynamic_flow.py') }}" commands: - python script.py outputFiles: - "*.yaml"
- id: create_flow type: io.kestra.plugin.scripts.shell.Commands inputFiles: flow.yaml: "{{ outputs.create_kestra_flow.outputFiles['kestra_flow.yaml'] }}" beforeCommands: - apt-get update - apt-get -y install curl commands: - curl -X POST http://{{inputs.kestra_host}}/api/v1/main/flows/import -F fileUpload=@flow.yaml - echo "Executing the flow from http://{{inputs.kestra_host}}/ui/flows/edit/{{ inputs.flow_namespace }}/{{ inputs.flow_id }}"
- id: subflow type: io.kestra.plugin.core.flow.Subflow namespace: "{{ inputs.flow_namespace }}" flowId: "{{ inputs.flow_id }}" wait: true transmitFailed: true
The flow has the following tasks:
-
generate_kestra_flow: This task invokes the
dynamic_flow.py
and generates the dynamic flow by running the Python code. The environment variables likeFLOW_ID
,FLOW_NAMESPACE
, andEXTRACT_ITEMS
are provided in the task, which are then used by the Python script to dynamically generate the flow. -
create_flow: This task creates the flow in Kestra by uploading the yaml file containing the dynamically generated flow.
-
subflow: This task triggers the newly created dynamic flow.
Thus, you can generate the dynamic flow by generating it in the language of your choice and then loading it into Kestra.
Dynamic Flow using Terraform
Yet another way of generating dynamic flows in Kestra is using Terraform templates. Check out the detailed guide on implementing dynamic flows using Terraform templating.