Workflow API Reference¶
The Workflow API in Kubiya SDK enables you to create, configure, and execute workflows that orchestrate multiple steps and tools.
Core Concepts¶
A workflow in Kubiya SDK is a directed acyclic graph (DAG) of nodes, where each node represents a step in the workflow. Workflows can be as simple as a linear sequence of steps or as complex as a branching, conditional graph with parallel execution paths.
Key Components¶
| Component | Description |
|---|---|
| Workflow Creation | Creating and configuring workflows |
| Workflow Nodes | Defining the steps in a workflow |
| Node Types | Different types of nodes for various operations |
| Parameters | Defining and passing parameters to workflows |
| Visualization | Visualizing workflows as diagrams |
Basic Workflow Example¶
Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
# Define a simple workflow with two sequential steps
workflow = Workflow(
id="data-processing",
name="Data Processing Workflow",
description="Process and analyze data",
nodes=[
WorkflowNode(
name="fetch_data",
description="Fetch data from API",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "fetch_api_data",
"input_mapping": {
"endpoint": "$params.api_endpoint"
}
}
),
WorkflowNode(
name="analyze_data",
description="Analyze the data",
node_type=NodeType.TOOL,
depends_on=["fetch_data"],
tool_config={
"tool_name": "analyze_data",
"input_mapping": {
"data": "$fetch_data.result"
}
}
)
]
)
# Execute the workflow
result = workflow.execute({
"api_endpoint": "https://api.example.com/data"
})
Advanced Workflow Features¶
Kubiya workflows support a wide range of advanced features:
Conditional Execution¶
Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
workflow = Workflow(
id="conditional-workflow",
name="Conditional Workflow",
description="Execute different paths based on conditions",
nodes=[
WorkflowNode(
name="check_data",
description="Check if data exists",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "check_data",
"input_mapping": {
"source": "$params.data_source"
}
}
),
WorkflowNode(
name="data_exists_check",
description="Check if data exists",
node_type=NodeType.CONDITIONAL,
depends_on=["check_data"],
condition="${check_data.exists} == true",
if_true="process_data",
if_false="create_data"
),
WorkflowNode(
name="process_data",
description="Process existing data",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "process_data",
"input_mapping": {
"data_source": "$params.data_source"
}
}
),
WorkflowNode(
name="create_data",
description="Create new data",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "create_data",
"input_mapping": {
"data_source": "$params.data_source",
"template": "$params.template"
}
}
)
]
)
Parallel Execution¶
Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
workflow = Workflow(
id="parallel-workflow",
name="Parallel Workflow",
description="Execute multiple steps in parallel",
nodes=[
WorkflowNode(
name="fetch_data",
description="Fetch initial data",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "fetch_data",
"input_mapping": {
"source": "$params.data_source"
}
}
),
# These nodes will run in parallel since they both depend on fetch_data
# but don't depend on each other
WorkflowNode(
name="analyze_trends",
description="Analyze trends in the data",
node_type=NodeType.TOOL,
depends_on=["fetch_data"],
tool_config={
"tool_name": "analyze_trends",
"input_mapping": {
"data": "$fetch_data.result"
}
}
),
WorkflowNode(
name="generate_report",
description="Generate a report from the data",
node_type=NodeType.TOOL,
depends_on=["fetch_data"],
tool_config={
"tool_name": "generate_report",
"input_mapping": {
"data": "$fetch_data.result",
"template": "$params.report_template"
}
}
),
# This node will wait for both parallel steps to complete
WorkflowNode(
name="notify_completion",
description="Send notification when everything is done",
node_type=NodeType.TOOL,
depends_on=["analyze_trends", "generate_report"],
tool_config={
"tool_name": "send_notification",
"input_mapping": {
"message": "Analysis and report generation complete",
"trends": "$analyze_trends.result",
"report_url": "$generate_report.report_url"
}
}
)
]
)
Error Handling¶
Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
from kubiya_sdk.workflows.models import RetryPolicy, ContinueOnPolicy
workflow = Workflow(
id="error-handling-workflow",
name="Error Handling Workflow",
description="Handle errors in workflow execution",
nodes=[
WorkflowNode(
name="fetch_data",
description="Fetch data from API",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "fetch_api_data",
"input_mapping": {
"endpoint": "$params.api_endpoint"
}
},
# Retry up to 3 times with exponential backoff
retry_policy=RetryPolicy(
limit=3,
interval_sec=5,
backoff_multiplier=2,
status_codes=[500, 502, 503, 504]
)
),
WorkflowNode(
name="process_data",
description="Process the data",
node_type=NodeType.TOOL,
depends_on=["fetch_data"],
tool_config={
"tool_name": "process_data",
"input_mapping": {
"data": "$fetch_data.result"
}
},
# Continue workflow even if this step fails or is skipped
continue_on=ContinueOnPolicy(
failure=True,
skipped=True
)
),
WorkflowNode(
name="handle_error",
description="Handle errors in data processing",
node_type=NodeType.CONDITIONAL,
depends_on=["process_data"],
condition="${process_data.status} == 'failed'",
if_true="send_error_notification",
if_false="send_success_notification"
),
WorkflowNode(
name="send_error_notification",
description="Send error notification",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "send_notification",
"input_mapping": {
"message": "Data processing failed: ${process_data.error}",
"level": "error"
}
}
),
WorkflowNode(
name="send_success_notification",
description="Send success notification",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "send_notification",
"input_mapping": {
"message": "Data processing successful",
"level": "info"
}
}
)
]
)
Workflow Execution¶
Workflows can be executed with input parameters:
Python
result = workflow.execute({
"api_endpoint": "https://api.example.com/data",
"template": "default"
})
The result will contain the outputs from all workflow nodes.