Skip to content

Building a Workflow

This tutorial will guide you through creating workflows with Kubiya SDK, from simple linear flows to complex DAG workflows.

Prerequisites

Before you begin, make sure you:

  1. Have completed the Creating Your First Tool tutorial
  2. Understand the basic concepts of Kubiya's Docker-based architecture

What is a Workflow?

A workflow in Kubiya is a collection of tools that work together to accomplish a task. Workflows can be:

  • Simple linear sequences of tools executed one after another
  • Complex directed acyclic graphs (DAGs) with dependencies between tools
  • Hybrid combinations of tools and teammates (AI agents)

Creating a Simple Linear Workflow

Let's start by creating a simple workflow that processes data in a linear sequence:

Python
# simple_workflow.py
from kubiya_sdk import Workflow, tool

@tool(image="python:3.12-slim", requirements=["requests"])
def fetch_data(url: str) -> dict:
    """Fetch data from an API"""
    import requests
    response = requests.get(url)
    return response.json()

@tool(image="python:3.12-slim", requirements=["pandas"])
def analyze_data(data: dict) -> dict:
    """Analyze the data"""
    import pandas as pd
    import json

    # Convert data to DataFrame
    df = pd.DataFrame(data)

    # Perform analysis
    analysis = {
        "shape": df.shape,
        "columns": df.columns.tolist(),
        "summary": df.describe().to_dict()
    }

    return analysis

@tool(image="python:3.12-slim", requirements=["matplotlib"])
def visualize_data(data: dict, analysis: dict) -> dict:
    """Create visualizations of the data"""
    import matplotlib.pyplot as plt
    import base64
    import io
    import pandas as pd

    # Convert data to DataFrame
    df = pd.DataFrame(data)

    # Create histogram for each numeric column
    visualizations = {}
    for column in df.select_dtypes(include=['number']).columns:
        plt.figure(figsize=(8, 4))
        plt.hist(df[column], bins=10)
        plt.title(f"Histogram of {column}")
        plt.xlabel(column)
        plt.ylabel("Frequency")

        # Convert plot to base64 image
        buffer = io.BytesIO()
        plt.savefig(buffer, format="png")
        buffer.seek(0)
        visualizations[column] = base64.b64encode(buffer.read()).decode("utf-8")
        plt.close()

    return {
        "visualizations": visualizations,
        "analysis": analysis
    }

# Create the workflow
data_workflow = Workflow(
    id="data-analysis",
    description="Fetch, analyze, and visualize data",
    tools=[fetch_data, analyze_data, visualize_data]
)

# Execute the workflow
if __name__ == "__main__":
    result = data_workflow.execute({
        "url": "https://api.example.com/data"
    })
    print("Workflow completed")
    print(f"Analysis: {result['analysis']}")
    print(f"Visualizations for columns: {list(result['visualizations'].keys())}")

In this example:

  1. We define three tools: fetch_data, analyze_data, and visualize_data
  2. We create a workflow that includes these tools
  3. When executed, the workflow runs the tools in sequence:
  4. fetch_data retrieves data from an API
  5. analyze_data processes the data
  6. visualize_data generates visualizations

Creating a DAG Workflow

For more complex scenarios, Kubiya supports Directed Acyclic Graph (DAG) workflows, where tools have explicit dependencies:

Python
# dag_workflow.py
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
from kubiya_sdk import tool

@tool(image="python:3.12-slim", requirements=["requests"])
def fetch_sales_data(date_range: str) -> dict:
    """Fetch sales data for the specified date range"""
    # Fetch data implementation
    return {"sales": [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}]}

@tool(image="python:3.12-slim", requirements=["requests"])
def fetch_marketing_data(date_range: str) -> dict:
    """Fetch marketing data for the specified date range"""
    # Fetch data implementation
    return {"campaigns": [{"id": "A", "spend": 50}, {"id": "B", "spend": 75}]}

@tool(image="python:3.12-slim", requirements=["pandas"])
def analyze_sales(sales_data: dict) -> dict:
    """Analyze sales data"""
    # Analysis implementation
    return {"total_sales": sum(item["amount"] for item in sales_data["sales"])}

@tool(image="python:3.12-slim", requirements=["pandas"])
def analyze_marketing(marketing_data: dict) -> dict:
    """Analyze marketing data"""
    # Analysis implementation
    return {"total_spend": sum(item["spend"] for item in marketing_data["campaigns"])}

@tool(image="python:3.12-slim", requirements=["pandas"])
def calculate_roi(sales_analysis: dict, marketing_analysis: dict) -> dict:
    """Calculate ROI based on sales and marketing data"""
    total_sales = sales_analysis["total_sales"]
    total_spend = marketing_analysis["total_spend"]
    roi = (total_sales - total_spend) / total_spend if total_spend > 0 else 0
    return {"roi": roi, "sales": total_sales, "spend": total_spend}

@tool(image="python:3.12-slim", requirements=["matplotlib"])
def generate_report(roi_data: dict) -> dict:
    """Generate a report with the ROI data"""
    # Report generation implementation
    return {
        "report": f"Sales: ${roi_data['sales']}, Marketing: ${roi_data['spend']}, ROI: {roi_data['roi']:.2f}",
        "timestamp": "2023-01-01T12:00:00Z"
    }

# Create DAG workflow
dag_workflow = Workflow(
    id="marketing-roi-analysis",
    name="Marketing ROI Analysis",
    description="Analyze marketing ROI",
    nodes=[
        WorkflowNode(
            id="fetch_sales",
            name="Fetch Sales Data",
            node_type=NodeType.TOOL,
            tool_name="fetch_sales_data"
        ),
        WorkflowNode(
            id="fetch_marketing",
            name="Fetch Marketing Data",
            node_type=NodeType.TOOL,
            tool_name="fetch_marketing_data"
        ),
        WorkflowNode(
            id="analyze_sales",
            name="Analyze Sales",
            node_type=NodeType.TOOL,
            tool_name="analyze_sales",
            depends_on=["fetch_sales"]
        ),
        WorkflowNode(
            id="analyze_marketing",
            name="Analyze Marketing",
            node_type=NodeType.TOOL,
            tool_name="analyze_marketing",
            depends_on=["fetch_marketing"]
        ),
        WorkflowNode(
            id="calculate_roi",
            name="Calculate ROI",
            node_type=NodeType.TOOL,
            tool_name="calculate_roi",
            depends_on=["analyze_sales", "analyze_marketing"]
        ),
        WorkflowNode(
            id="generate_report",
            name="Generate Report",
            node_type=NodeType.TOOL,
            tool_name="generate_report",
            depends_on=["calculate_roi"]
        )
    ]
)

# Execute the workflow
if __name__ == "__main__":
    result = dag_workflow.execute({"date_range": "2023-01-01/2023-01-31"})
    print(f"Report: {result['report']}")

In this example:

  1. We define six tools for different aspects of the analysis
  2. We create a workflow with explicit dependencies:
  3. analyze_sales depends on fetch_sales
  4. analyze_marketing depends on fetch_marketing
  5. calculate_roi depends on both analyze_sales and analyze_marketing
  6. generate_report depends on calculate_roi
  7. The workflow executes tools in the correct order, and can run independent tools in parallel

Adding Workflow Parameters

You can define explicit parameters for your workflows:

Python
from kubiya_sdk.workflows.models import WorkflowParameter, WorkflowParameterSet, ParameterType

# Define workflow parameters
params = WorkflowParameterSet(parameters=[
    WorkflowParameter(
        name="DATE_RANGE",
        type=ParameterType.STRING,
        description="Date range for analysis (YYYY-MM-DD/YYYY-MM-DD)",
        default="2023-01-01/2023-01-31"
    ),
    WorkflowParameter(
        name="INCLUDE_VISUALIZATIONS",
        type=ParameterType.BOOLEAN,
        description="Whether to include visualizations in the report",
        default=True
    )
])

# Create workflow with parameters
parameterized_workflow = Workflow(
    id="parameterized-analysis",
    name="Parameterized Analysis",
    description="Analysis with parameters",
    parameters=params,
    # nodes and other configuration...
)

Configuring Retry Policies

For more robust workflows, you can configure retry policies:

Python
from kubiya_sdk.workflows.models import RetryPolicy

# Node with retry policy
fetch_node = WorkflowNode(
    id="fetch_data",
    name="Fetch Data",
    node_type=NodeType.TOOL,
    tool_name="fetch_data",
    retry_policy=RetryPolicy(
        max_attempts=3,
        backoff_factor=2.0,
        initial_delay_seconds=5
    )
)

Adding Preconditions

You can add preconditions to nodes so they only execute when certain conditions are met:

Python
from kubiya_sdk.workflows.models import Precondition

# Node with precondition
alert_node = WorkflowNode(
    id="send_alert",
    name="Send Alert",
    node_type=NodeType.TOOL,
    tool_name="send_alert",
    precondition=Precondition(
        condition="${TOTAL_SALES} < ${SALES_THRESHOLD}",
        description="Only send alert if sales are below threshold"
    )
)

Integrating Teammates into Workflows

You can include teammates (AI agents) in your workflows:

Python
from kubiya_sdk import Teammate, Workflow, tool, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType

@tool(image="python:3.12-slim")
def process_data(data: dict) -> dict:
    """Process data"""
    # Processing implementation
    return {"processed": True, "data": data}

# Create an analyst teammate
data_analyst = Teammate(
    id="data-analyst",
    description="Analyzes complex data and provides insights",
    tools=[...]  # Tools available to this teammate
)

# Create a workflow with a tool and teammate
analysis_workflow = Workflow(
    id="team-analysis",
    name="Team Analysis",
    description="Data analysis with AI assistant",
    nodes=[
        WorkflowNode(
            id="process",
            name="Process Data",
            node_type=NodeType.TOOL,
            tool_name="process_data"
        ),
        WorkflowNode(
            id="analyze",
            name="Analyze Data",
            node_type=NodeType.TEAMMATE,
            teammate_id="data-analyst",
            depends_on=["process"]
        )
    ]
)

Visualizing Workflows

You can generate Mermaid diagrams to visualize your workflows:

Python
# Generate Mermaid diagram for the workflow
mermaid_diagram = dag_workflow.to_mermaid()
print(mermaid_diagram)

This generates a diagram like:

Text Only
graph TD
    fetch_sales["Fetch Sales Data"]
    fetch_marketing["Fetch Marketing Data"]
    analyze_sales["Analyze Sales"]
    analyze_marketing["Analyze Marketing"]
    calculate_roi["Calculate ROI"]
    generate_report["Generate Report"]

    fetch_sales --> analyze_sales
    fetch_marketing --> analyze_marketing
    analyze_sales --> calculate_roi
    analyze_marketing --> calculate_roi
    calculate_roi --> generate_report

Serializing Workflows

You can save workflows as YAML or JSON and load them later:

Python
from kubiya_sdk.workflows.adapters import EnhancedFormatAdapter

# Convert workflow to YAML
workflow_yaml = EnhancedFormatAdapter.to_yaml(dag_workflow)
print(workflow_yaml)

# Save to file
with open("workflow.yaml", "w") as f:
    f.write(workflow_yaml)

# Load from file
with open("workflow.yaml", "r") as f:
    loaded_yaml = f.read()

loaded_workflow = EnhancedFormatAdapter.from_yaml(loaded_yaml)

Best Practices for Workflows

  1. Design for Reusability: Create workflows that can be reused across different scenarios
  2. Keep Tools Focused: Each tool should do one thing well
  3. Handle Errors Gracefully: Use retry policies and error handling for robust workflows
  4. Document Thoroughly: Add clear descriptions to workflows, nodes, and parameters
  5. Test Edge Cases: Verify your workflows work correctly under various conditions
  6. Optimize Data Flow: Minimize data transfer between nodes
  7. Use Parameterization: Make workflows flexible with parameters

Next Steps

Now that you've learned how to build workflows, you can:

  1. Learn about Docker image integration to use specialized Docker images
  2. Explore file mounting for workflows that work with files
  3. Learn how to run your workflows on Kubernetes for scalability