Subflows
Subflows let you build modular and reusable workflow components.
They work like function calls: executing a subflow creates a new flow run from within another flow.
Why use a subflow?
Subflows allow you to build modular and reusable components that you can use across multiple flows. For example, you might define a subflow that handles error alerts by posting to Slack and email. By using a Subflow, you can reuse these two tasks together for all flows that you want to send error notifications, instead of having to copy the individual tasks for every flow.
How to declare a subflow
To call a flow from another flow, use the io.kestra.plugin.core.flow.Subflow
task, and in that task, specify the flowId
and namespace
of the subflow that you want to execute. You can also specify custom inputs
, similar to passing arguments to a function.
The optional properties wait
and transmitFailed
control the execution behavior. By default, if wait
is omitted or set to false
, the parent flow continues without waiting for the subflow to finish. The transmitFailed
property determines whether a failure in the subflow execution should cause the parent flow to fail.
Practical example
A subflow can encapsulate critical business logic, making it reusable across flows and easier to test in isolation.
Here is a simple example of a subflow:
id: critical_servicenamespace: company.team
tasks: - id: return_data type: io.kestra.plugin.jdbc.duckdb.Query sql: | INSTALL httpfs; LOAD httpfs; SELECT sum(total) as total, avg(quantity) as avg_quantity FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True); store: true
outputs: - id: some_output type: STRING value: "{{ outputs.return_data.uri }}"
In this example, return_data
outputs uri
of the query output. That URI is a reference to the internal storage location of the stored file. This output can be used in the parent flow to perform further processing.
id: parent_servicenamespace: company.team
tasks: - id: subflow_call type: io.kestra.plugin.core.flow.Subflow namespace: company.team flowId: critical_service wait: true transmitFailed: true
- id: log_subflow_output type: io.kestra.plugin.scripts.shell.Commands taskRunner: type: io.kestra.plugin.core.runner.Process commands: - cat "{{ outputs.subflow_call.outputs.some_output }}"
The outputs
map task IDs to their results. Here, the parent flow accesses the some_output
value from the subflow_call
task.
Subflow properties
Below is a full list of all properties of the io.kestra.plugin.core.flow.Subflow
task. You don’t need to memorize all properties — the task documentation always lists them.
Field | Description |
---|---|
flowId | The subflow’s identifier |
namespace | The namespace where the subflow is located |
inheritLabels | Determines if the subflow inherits labels from the parent (default: false). |
inputs | Inputs passed to the subflow |
labels | Labels assigned to the subflow |
outputs (deprecated) | Allows passing outputs from the subflow execution to the parent flow. |
revision | The subflow revision to execute (defaults to the latest) |
scheduleDate | Schedule subflow execution on a specific date rather than immediately. |
transmitFailed | If true, parent flow fails on subflow failure (requires wait to be true). |
wait | If true, parent flow waits for subflow completion (default: true). |
Passing data between parent and child flows
Flows can emit outputs that can be accessed by the parent flow. Using the io.kestra.plugin.core.flow.Subflow
task you can call any flow as a subflow and access its outputs in downstream tasks. For more details and examples, check the Outputs page.
Accessing Outputs from a subflow execution
Outputs include the execution ID, extracted outputs, and the final state (if wait
is true).
To sum up, subflows improve maintainability of complex workflows. They allow you to build modular and reusable workflow components and share them across multiple namespaces, projects, and teams.
Here’s an example of a subflow with explicitly defined outputs.
id: flow_outputsnamespace: company.team
tasks: - id: mytask type: io.kestra.plugin.core.debug.Return format: this is a task output used as a final flow output
outputs: - id: final type: STRING value: "{{ outputs.mytask.value }}"
We can access these outputs from a parent task as seen in the example below:
id: parent_flownamespace: company.team
tasks: - id: subflow type: io.kestra.plugin.core.flow.Subflow flowId: flow_outputs namespace: company.team wait: true
- id: log_subflow_output type: io.kestra.plugin.core.log.Log message: "{{ outputs.subflow.outputs.final }}"
For more details, see the sublow outputs documentation.
Passing inputs to a subflow
You can pass inputs to a Subflow task. The example below passes two inputs to a subflow.
Subflow:
id: subflow_examplenamespace: company.team
inputs: - id: http_uri type: STRING
tasks: - id: download type: io.kestra.plugin.core.http.Request uri: "{{ inputs.http_uri }}"
- id: log type: io.kestra.plugin.core.log.Log message: "{{ outputs.download.body }}"
outputs: - id: data type: STRING value: "{{ outputs.download.body }}"
Parent flow:
id: inputs_subflownamespace: company.team
inputs: - id: url type: STRING
tasks: - id: subflow type: io.kestra.plugin.core.flow.Subflow flowId: subflow_example namespace: company.team inputs: http_uri: "{{ inputs.url }}" wait: true
- id: hello type: io.kestra.plugin.core.log.Log message: "{{ outputs.subflow.outputs.data }}"
In this example, the parent flow successfully passes an input to the subflow.
Nested inputs
In the example below, the flow extracts JSON data from a REST API and passes it to a subflow as a nested input:
id: extract_jsonnamespace: company.team
tasks: - id: api type: io.kestra.plugin.core.http.Request uri: https://dummyjson.com/users
- id: read_json type: io.kestra.plugin.core.log.Log message: "{{ outputs.api.body }}"
- id: subflow type: io.kestra.plugin.core.flow.Subflow namespace: company.team flowId: subflow inputs: users.firstName: "{{ outputs.api.body | jq('.users') | first | first | jq('.firstName') | first }}" users.lastName: "{{ outputs.api.body | jq('.users') | first | first | jq('.lastName') | first }}" wait: true transmitFailed: true
To provide type validation to extracted JSON fields, you can use nested inputs in the subflow definition:
id: subflownamespace: company.team
inputs: - id: users.firstName type: STRING defaults: Rick
- id: users.lastName type: STRING defaults: Astley
tasks: - id: process_user_data type: io.kestra.plugin.core.log.Log message: hello {{ inputs.users }}
You can then pass the entire users
object, including nested fields, to any task in the subflow.