Outputs
Outputs let you pass data between tasks and flows.
Tasks and flows can generate outputs that are passed to downstream processes. These outputs can be variables or files stored in the internal storage.
How to retrieve outputs
Similar to inputs, use expressions to access outputs in downstream tasks. Use the syntax {{ outputs.task_id.output_property }}
to retrieve a specific output value of a task.
If your task id
contains one or more hyphens (-
), wrap the task id
in square brackets, for example: {{ outputs['task-id'].output_property }}
.
To see which outputs have been generated during a flow execution, go to the Outputs tab on the Execution page:
Outputs are useful for troubleshooting and auditing. Additionally, you can use outputs to:
- share downloadable artifacts with business stakeholders (e.g., a table generated by a SQL query or a CSV file generated by a Python script)
- pass data between decoupled processes (e.g., pass subflow’s outputs or a file detected by S3 trigger to downstream tasks)
Use outputs in your flow
When fetching data from a REST API, Kestra stores that fetched data in the internal storage and makes it available to downstream tasks using the body
output argument.
Use the {{ outputs.task_id.body }}
syntax to process that fetched data in a downstream task, as shown in the Python script task below.
id: getting_started_outputnamespace: company.team
inputs: - id: api_url type: STRING defaults: https://dummyjson.com/products
tasks: - id: api type: io.kestra.plugin.core.http.Request uri: "{{ inputs.api_url }}"
- id: python type: io.kestra.plugin.scripts.python.Script containerImage: python:slim beforeCommands: - pip install polars outputFiles: - "products.csv" script: | import polars as pl data = {{outputs.api.body | jq('.products') | first}} df = pl.from_dicts(data) df.glimpse() df.select(["brand", "price"]).write_csv("products.csv")
This flow processes data using Polars and stores the result as a CSV file.
To avoid package dependency conflicts, the Python task runs in an independent Docker container. You can optionally provide a custom Docker image from a private container registry or use a public Python image from DockerHub and install any custom package dependencies using the beforeCommands
argument. The beforeCommands
argument allows you to install any custom package dependencies — here, we install Polars. Use as many commands as needed to prepare the containerized environment for script execution.
Debug Expressions
When referencing the output from the previous task, this flow uses jq
language to extract the products
array from the API response — jq
is available in all Kestra tasks without having to install it.
You can test {{ outputs.task_id.body | jq('.products') | first }}
and any other output parsing expression using the built-in expressions evaluator on the Outputs page:
Passing data between tasks
Let’s add another task to the flow to process the CSV file generated by the Python script task. We use the io.kestra.plugin.jdbc.duckdb.Query
task to run a SQL query on the CSV file and store the result as a downloadable artifact in the internal storage.
id: getting_startednamespace: company.team
tasks: - id: api type: io.kestra.plugin.core.http.Request uri: https://dummyjson.com/products
- id: python type: io.kestra.plugin.scripts.python.Script containerImage: python:slim beforeCommands: - pip install polars outputFiles: - "products.csv" script: | import polars as pl data = {{ outputs.api.body | jq('.products') | first }} df = pl.from_dicts(data) df.glimpse() df.select(["brand", "price"]).write_csv("products.csv")
- id: sqlQuery type: io.kestra.plugin.jdbc.duckdb.Query inputFiles: in.csv: "{{ outputs.python.outputFiles['products.csv'] }}" sql: | SELECT brand, round(avg(price), 2) as avg_price FROM read_csv_auto('{{ workingDir }}/in.csv', header=True) GROUP BY brand ORDER BY avg_price DESC; store: true
This example flow passes data between tasks using outputs. The inputFiles
argument of the io.kestra.plugin.jdbc.duckdb.Query
task allows you to pass files from internal storage to the task. The store: true
property ensures that the result of the SQL query is stored in the internal storage and can be previewed and downloaded from the Outputs tab.
To sum up, our flow extracts data from an API, uses that data in a Python script, executes a SQL query, and generates a downloadable artifact.
If you encounter any issues while executing the above flow, this might be a Docker-related issue (i.e., Docker-in-Docker setup, which can be difficult to configure on Windows). Set the runner property to PROCESS
to run the Python script task in the same process as the flow rather than in a Docker container, as shown in the example below. This will avoid any Docker related issues.
id: getting_startednamespace: company.team
inputs: - id: api_url type: STRING defaults: https://dummyjson.com/products
tasks: - id: api type: io.kestra.plugin.core.http.Request uri: "{{ inputs.api_url }}"
- id: python type: io.kestra.plugin.scripts.python.Script taskRunner: type: io.kestra.plugin.core.runner.Process beforeCommands: - pip install polars outputFiles: - "products.csv" script: | import polars as pl data = {{ outputs.api.body | jq('.products') | first }} df = pl.from_dicts(data) df.glimpse() df.select(["brand", "price"]).write_csv("products.csv")
- id: sqlQuery type: io.kestra.plugin.jdbc.duckdb.Query inputFiles: in.csv: "{{ outputs.python.outputFiles['products.csv'] }}" sql: | SELECT brand, round(avg(price), 2) as avg_price FROM read_csv_auto('{{ workingDir }}/in.csv', header=True) GROUP BY brand ORDER BY avg_price DESC; store: true
To learn more about outputs, check out the full outputs documentation.