Skip to content

Workflow API Reference

The Workflow API in Kubiya SDK enables you to create, configure, and execute workflows that orchestrate multiple steps and tools.

Core Concepts

A workflow in Kubiya SDK is a directed acyclic graph (DAG) of nodes, where each node represents a step in the workflow. Workflows can be as simple as a linear sequence of steps or as complex as a branching, conditional graph with parallel execution paths.

Key Components

Component Description
Workflow Creation Creating and configuring workflows
Workflow Nodes Defining the steps in a workflow
Node Types Different types of nodes for various operations
Parameters Defining and passing parameters to workflows
Visualization Visualizing workflows as diagrams

Basic Workflow Example

Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType

# Define a simple workflow with two sequential steps
workflow = Workflow(
    id="data-processing",
    name="Data Processing Workflow",
    description="Process and analyze data",
    nodes=[
        WorkflowNode(
            name="fetch_data",
            description="Fetch data from API",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "fetch_api_data",
                "input_mapping": {
                    "endpoint": "$params.api_endpoint"
                }
            }
        ),
        WorkflowNode(
            name="analyze_data",
            description="Analyze the data",
            node_type=NodeType.TOOL,
            depends_on=["fetch_data"],
            tool_config={
                "tool_name": "analyze_data",
                "input_mapping": {
                    "data": "$fetch_data.result"
                }
            }
        )
    ]
)

# Execute the workflow
result = workflow.execute({
    "api_endpoint": "https://api.example.com/data"
})

Advanced Workflow Features

Kubiya workflows support a wide range of advanced features:

Conditional Execution

Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType

workflow = Workflow(
    id="conditional-workflow",
    name="Conditional Workflow",
    description="Execute different paths based on conditions",
    nodes=[
        WorkflowNode(
            name="check_data",
            description="Check if data exists",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "check_data",
                "input_mapping": {
                    "source": "$params.data_source"
                }
            }
        ),
        WorkflowNode(
            name="data_exists_check",
            description="Check if data exists",
            node_type=NodeType.CONDITIONAL,
            depends_on=["check_data"],
            condition="${check_data.exists} == true",
            if_true="process_data",
            if_false="create_data"
        ),
        WorkflowNode(
            name="process_data",
            description="Process existing data",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "process_data",
                "input_mapping": {
                    "data_source": "$params.data_source"
                }
            }
        ),
        WorkflowNode(
            name="create_data",
            description="Create new data",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "create_data",
                "input_mapping": {
                    "data_source": "$params.data_source",
                    "template": "$params.template"
                }
            }
        )
    ]
)

Parallel Execution

Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType

workflow = Workflow(
    id="parallel-workflow",
    name="Parallel Workflow",
    description="Execute multiple steps in parallel",
    nodes=[
        WorkflowNode(
            name="fetch_data",
            description="Fetch initial data",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "fetch_data",
                "input_mapping": {
                    "source": "$params.data_source"
                }
            }
        ),
        # These nodes will run in parallel since they both depend on fetch_data
        # but don't depend on each other
        WorkflowNode(
            name="analyze_trends",
            description="Analyze trends in the data",
            node_type=NodeType.TOOL,
            depends_on=["fetch_data"],
            tool_config={
                "tool_name": "analyze_trends",
                "input_mapping": {
                    "data": "$fetch_data.result"
                }
            }
        ),
        WorkflowNode(
            name="generate_report",
            description="Generate a report from the data",
            node_type=NodeType.TOOL,
            depends_on=["fetch_data"],
            tool_config={
                "tool_name": "generate_report",
                "input_mapping": {
                    "data": "$fetch_data.result",
                    "template": "$params.report_template"
                }
            }
        ),
        # This node will wait for both parallel steps to complete
        WorkflowNode(
            name="notify_completion",
            description="Send notification when everything is done",
            node_type=NodeType.TOOL,
            depends_on=["analyze_trends", "generate_report"],
            tool_config={
                "tool_name": "send_notification",
                "input_mapping": {
                    "message": "Analysis and report generation complete",
                    "trends": "$analyze_trends.result",
                    "report_url": "$generate_report.report_url"
                }
            }
        )
    ]
)

Error Handling

Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
from kubiya_sdk.workflows.models import RetryPolicy, ContinueOnPolicy

workflow = Workflow(
    id="error-handling-workflow",
    name="Error Handling Workflow",
    description="Handle errors in workflow execution",
    nodes=[
        WorkflowNode(
            name="fetch_data",
            description="Fetch data from API",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "fetch_api_data",
                "input_mapping": {
                    "endpoint": "$params.api_endpoint"
                }
            },
            # Retry up to 3 times with exponential backoff
            retry_policy=RetryPolicy(
                limit=3,
                interval_sec=5,
                backoff_multiplier=2,
                status_codes=[500, 502, 503, 504]
            )
        ),
        WorkflowNode(
            name="process_data",
            description="Process the data",
            node_type=NodeType.TOOL,
            depends_on=["fetch_data"],
            tool_config={
                "tool_name": "process_data",
                "input_mapping": {
                    "data": "$fetch_data.result"
                }
            },
            # Continue workflow even if this step fails or is skipped
            continue_on=ContinueOnPolicy(
                failure=True,
                skipped=True
            )
        ),
        WorkflowNode(
            name="handle_error",
            description="Handle errors in data processing",
            node_type=NodeType.CONDITIONAL,
            depends_on=["process_data"],
            condition="${process_data.status} == 'failed'",
            if_true="send_error_notification",
            if_false="send_success_notification"
        ),
        WorkflowNode(
            name="send_error_notification",
            description="Send error notification",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "send_notification",
                "input_mapping": {
                    "message": "Data processing failed: ${process_data.error}",
                    "level": "error"
                }
            }
        ),
        WorkflowNode(
            name="send_success_notification",
            description="Send success notification",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "send_notification",
                "input_mapping": {
                    "message": "Data processing successful",
                    "level": "info"
                }
            }
        )
    ]
)

Workflow Execution

Workflows can be executed with input parameters:

Python
result = workflow.execute({
    "api_endpoint": "https://api.example.com/data",
    "template": "default"
})

The result will contain the outputs from all workflow nodes.