Building a Workflow¶
This tutorial will guide you through creating workflows with Kubiya SDK, from simple linear flows to complex DAG workflows.
Prerequisites¶
Before you begin, make sure you:
- Have completed the Creating Your First Tool tutorial
- Understand the basic concepts of Kubiya's Docker-based architecture
What is a Workflow?¶
A workflow in Kubiya is a collection of tools that work together to accomplish a task. Workflows can be:
- Simple linear sequences of tools executed one after another
- Complex directed acyclic graphs (DAGs) with dependencies between tools
- Hybrid combinations of tools and teammates (AI agents)
Creating a Simple Linear Workflow¶
Let's start by creating a simple workflow that processes data in a linear sequence:
# simple_workflow.py
from kubiya_sdk import Workflow, tool
@tool(image="python:3.12-slim", requirements=["requests"])
def fetch_data(url: str) -> dict:
"""Fetch data from an API"""
import requests
response = requests.get(url)
return response.json()
@tool(image="python:3.12-slim", requirements=["pandas"])
def analyze_data(data: dict) -> dict:
"""Analyze the data"""
import pandas as pd
import json
# Convert data to DataFrame
df = pd.DataFrame(data)
# Perform analysis
analysis = {
"shape": df.shape,
"columns": df.columns.tolist(),
"summary": df.describe().to_dict()
}
return analysis
@tool(image="python:3.12-slim", requirements=["matplotlib"])
def visualize_data(data: dict, analysis: dict) -> dict:
"""Create visualizations of the data"""
import matplotlib.pyplot as plt
import base64
import io
import pandas as pd
# Convert data to DataFrame
df = pd.DataFrame(data)
# Create histogram for each numeric column
visualizations = {}
for column in df.select_dtypes(include=['number']).columns:
plt.figure(figsize=(8, 4))
plt.hist(df[column], bins=10)
plt.title(f"Histogram of {column}")
plt.xlabel(column)
plt.ylabel("Frequency")
# Convert plot to base64 image
buffer = io.BytesIO()
plt.savefig(buffer, format="png")
buffer.seek(0)
visualizations[column] = base64.b64encode(buffer.read()).decode("utf-8")
plt.close()
return {
"visualizations": visualizations,
"analysis": analysis
}
# Create the workflow
data_workflow = Workflow(
id="data-analysis",
description="Fetch, analyze, and visualize data",
tools=[fetch_data, analyze_data, visualize_data]
)
# Execute the workflow
if __name__ == "__main__":
result = data_workflow.execute({
"url": "https://api.example.com/data"
})
print("Workflow completed")
print(f"Analysis: {result['analysis']}")
print(f"Visualizations for columns: {list(result['visualizations'].keys())}")
In this example:
- We define three tools:
fetch_data,analyze_data, andvisualize_data - We create a workflow that includes these tools
- When executed, the workflow runs the tools in sequence:
fetch_dataretrieves data from an APIanalyze_dataprocesses the datavisualize_datagenerates visualizations
Creating a DAG Workflow¶
For more complex scenarios, Kubiya supports Directed Acyclic Graph (DAG) workflows, where tools have explicit dependencies:
# dag_workflow.py
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
from kubiya_sdk import tool
@tool(image="python:3.12-slim", requirements=["requests"])
def fetch_sales_data(date_range: str) -> dict:
"""Fetch sales data for the specified date range"""
# Fetch data implementation
return {"sales": [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}]}
@tool(image="python:3.12-slim", requirements=["requests"])
def fetch_marketing_data(date_range: str) -> dict:
"""Fetch marketing data for the specified date range"""
# Fetch data implementation
return {"campaigns": [{"id": "A", "spend": 50}, {"id": "B", "spend": 75}]}
@tool(image="python:3.12-slim", requirements=["pandas"])
def analyze_sales(sales_data: dict) -> dict:
"""Analyze sales data"""
# Analysis implementation
return {"total_sales": sum(item["amount"] for item in sales_data["sales"])}
@tool(image="python:3.12-slim", requirements=["pandas"])
def analyze_marketing(marketing_data: dict) -> dict:
"""Analyze marketing data"""
# Analysis implementation
return {"total_spend": sum(item["spend"] for item in marketing_data["campaigns"])}
@tool(image="python:3.12-slim", requirements=["pandas"])
def calculate_roi(sales_analysis: dict, marketing_analysis: dict) -> dict:
"""Calculate ROI based on sales and marketing data"""
total_sales = sales_analysis["total_sales"]
total_spend = marketing_analysis["total_spend"]
roi = (total_sales - total_spend) / total_spend if total_spend > 0 else 0
return {"roi": roi, "sales": total_sales, "spend": total_spend}
@tool(image="python:3.12-slim", requirements=["matplotlib"])
def generate_report(roi_data: dict) -> dict:
"""Generate a report with the ROI data"""
# Report generation implementation
return {
"report": f"Sales: ${roi_data['sales']}, Marketing: ${roi_data['spend']}, ROI: {roi_data['roi']:.2f}",
"timestamp": "2023-01-01T12:00:00Z"
}
# Create DAG workflow
dag_workflow = Workflow(
id="marketing-roi-analysis",
name="Marketing ROI Analysis",
description="Analyze marketing ROI",
nodes=[
WorkflowNode(
id="fetch_sales",
name="Fetch Sales Data",
node_type=NodeType.TOOL,
tool_name="fetch_sales_data"
),
WorkflowNode(
id="fetch_marketing",
name="Fetch Marketing Data",
node_type=NodeType.TOOL,
tool_name="fetch_marketing_data"
),
WorkflowNode(
id="analyze_sales",
name="Analyze Sales",
node_type=NodeType.TOOL,
tool_name="analyze_sales",
depends_on=["fetch_sales"]
),
WorkflowNode(
id="analyze_marketing",
name="Analyze Marketing",
node_type=NodeType.TOOL,
tool_name="analyze_marketing",
depends_on=["fetch_marketing"]
),
WorkflowNode(
id="calculate_roi",
name="Calculate ROI",
node_type=NodeType.TOOL,
tool_name="calculate_roi",
depends_on=["analyze_sales", "analyze_marketing"]
),
WorkflowNode(
id="generate_report",
name="Generate Report",
node_type=NodeType.TOOL,
tool_name="generate_report",
depends_on=["calculate_roi"]
)
]
)
# Execute the workflow
if __name__ == "__main__":
result = dag_workflow.execute({"date_range": "2023-01-01/2023-01-31"})
print(f"Report: {result['report']}")
In this example:
- We define six tools for different aspects of the analysis
- We create a workflow with explicit dependencies:
analyze_salesdepends onfetch_salesanalyze_marketingdepends onfetch_marketingcalculate_roidepends on bothanalyze_salesandanalyze_marketinggenerate_reportdepends oncalculate_roi- The workflow executes tools in the correct order, and can run independent tools in parallel
Adding Workflow Parameters¶
You can define explicit parameters for your workflows:
from kubiya_sdk.workflows.models import WorkflowParameter, WorkflowParameterSet, ParameterType
# Define workflow parameters
params = WorkflowParameterSet(parameters=[
WorkflowParameter(
name="DATE_RANGE",
type=ParameterType.STRING,
description="Date range for analysis (YYYY-MM-DD/YYYY-MM-DD)",
default="2023-01-01/2023-01-31"
),
WorkflowParameter(
name="INCLUDE_VISUALIZATIONS",
type=ParameterType.BOOLEAN,
description="Whether to include visualizations in the report",
default=True
)
])
# Create workflow with parameters
parameterized_workflow = Workflow(
id="parameterized-analysis",
name="Parameterized Analysis",
description="Analysis with parameters",
parameters=params,
# nodes and other configuration...
)
Configuring Retry Policies¶
For more robust workflows, you can configure retry policies:
from kubiya_sdk.workflows.models import RetryPolicy
# Node with retry policy
fetch_node = WorkflowNode(
id="fetch_data",
name="Fetch Data",
node_type=NodeType.TOOL,
tool_name="fetch_data",
retry_policy=RetryPolicy(
max_attempts=3,
backoff_factor=2.0,
initial_delay_seconds=5
)
)
Adding Preconditions¶
You can add preconditions to nodes so they only execute when certain conditions are met:
from kubiya_sdk.workflows.models import Precondition
# Node with precondition
alert_node = WorkflowNode(
id="send_alert",
name="Send Alert",
node_type=NodeType.TOOL,
tool_name="send_alert",
precondition=Precondition(
condition="${TOTAL_SALES} < ${SALES_THRESHOLD}",
description="Only send alert if sales are below threshold"
)
)
Integrating Teammates into Workflows¶
You can include teammates (AI agents) in your workflows:
from kubiya_sdk import Teammate, Workflow, tool, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
@tool(image="python:3.12-slim")
def process_data(data: dict) -> dict:
"""Process data"""
# Processing implementation
return {"processed": True, "data": data}
# Create an analyst teammate
data_analyst = Teammate(
id="data-analyst",
description="Analyzes complex data and provides insights",
tools=[...] # Tools available to this teammate
)
# Create a workflow with a tool and teammate
analysis_workflow = Workflow(
id="team-analysis",
name="Team Analysis",
description="Data analysis with AI assistant",
nodes=[
WorkflowNode(
id="process",
name="Process Data",
node_type=NodeType.TOOL,
tool_name="process_data"
),
WorkflowNode(
id="analyze",
name="Analyze Data",
node_type=NodeType.TEAMMATE,
teammate_id="data-analyst",
depends_on=["process"]
)
]
)
Visualizing Workflows¶
You can generate Mermaid diagrams to visualize your workflows:
# Generate Mermaid diagram for the workflow
mermaid_diagram = dag_workflow.to_mermaid()
print(mermaid_diagram)
This generates a diagram like:
graph TD
fetch_sales["Fetch Sales Data"]
fetch_marketing["Fetch Marketing Data"]
analyze_sales["Analyze Sales"]
analyze_marketing["Analyze Marketing"]
calculate_roi["Calculate ROI"]
generate_report["Generate Report"]
fetch_sales --> analyze_sales
fetch_marketing --> analyze_marketing
analyze_sales --> calculate_roi
analyze_marketing --> calculate_roi
calculate_roi --> generate_report
Serializing Workflows¶
You can save workflows as YAML or JSON and load them later:
from kubiya_sdk.workflows.adapters import EnhancedFormatAdapter
# Convert workflow to YAML
workflow_yaml = EnhancedFormatAdapter.to_yaml(dag_workflow)
print(workflow_yaml)
# Save to file
with open("workflow.yaml", "w") as f:
f.write(workflow_yaml)
# Load from file
with open("workflow.yaml", "r") as f:
loaded_yaml = f.read()
loaded_workflow = EnhancedFormatAdapter.from_yaml(loaded_yaml)
Best Practices for Workflows¶
- Design for Reusability: Create workflows that can be reused across different scenarios
- Keep Tools Focused: Each tool should do one thing well
- Handle Errors Gracefully: Use retry policies and error handling for robust workflows
- Document Thoroughly: Add clear descriptions to workflows, nodes, and parameters
- Test Edge Cases: Verify your workflows work correctly under various conditions
- Optimize Data Flow: Minimize data transfer between nodes
- Use Parameterization: Make workflows flexible with parameters
Next Steps¶
Now that you've learned how to build workflows, you can:
- Learn about Docker image integration to use specialized Docker images
- Explore file mounting for workflows that work with files
- Learn how to run your workflows on Kubernetes for scalability