Quick Start Guide¶
This guide will help you quickly get started with Kubiya SDK by creating your first Docker-based tools and workflows.
Installation¶
First, install Kubiya SDK:
Ensure you have Docker installed and running on your system as Kubiya relies on Docker to execute tools.
Creating Your First Tool¶
Let's create a simple tool that uses a Docker image to process text:
# simple_tool.py
from kubiya_sdk import tool
@tool(image="python:3.12-slim")
def hello_world(name: str) -> str:
"""A simple hello world tool"""
return f"Hello, {name}! Welcome to Kubiya."
This tool uses the official Python Docker image. When executed, Kubiya will:
- Pull the
python:3.12-slimimage if not already available - Create a container from this image
- Execute your function inside the container
- Return the results
Creating a Workflow¶
Now, let's create a simple workflow that uses our tool:
# simple_workflow.py
from kubiya_sdk import Workflow
from simple_tool import hello_world
# Create a workflow
workflow = Workflow(
id="greeting-workflow",
description="A simple greeting workflow",
tools=[hello_world]
)
# Execute the workflow
result = workflow.execute({"name": "Developer"})
print(result) # Output: Hello, Developer! Welcome to Kubiya.
Building a Real-World Tool¶
Let's create a more practical tool that leverages an existing Docker image to scan container images for vulnerabilities:
# security_tool.py
from kubiya_sdk import tool, Workflow
@tool(
name="vulnerability-scanner",
description="Scans container images for vulnerabilities",
image="aquasec/trivy:latest",
command=["image", "--format", "json", "${IMAGE_TO_SCAN}"],
environment={
"IMAGE_TO_SCAN": "${image_name}" # Maps to input parameter
}
)
def scan_image(image_name: str) -> dict:
"""Scan a container image for vulnerabilities"""
# No code needed here - execution happens in the container
pass
# Create a workflow
security_workflow = Workflow(
id="security-workflow",
description="Security scanning workflow",
tools=[scan_image]
)
# Execute the workflow
if __name__ == "__main__":
result = security_workflow.execute({
"image_name": "python:3.12-slim"
})
# Print vulnerability counts
vulnerabilities = result.get("Results", [])
total_vulns = sum(len(vuln.get("Vulnerabilities", [])) for vuln in vulnerabilities if "Vulnerabilities" in vuln)
print(f"Found {total_vulns} vulnerabilities")
This example shows how to use a specialized Docker image (Trivy) for security scanning without writing any of the scanning logic yourself.
Data Processing Example¶
Here's a more complex example that demonstrates a data processing pipeline:
# data_pipeline.py
from kubiya_sdk import tool, Workflow
@tool(
name="data-processor",
image="python:3.12-slim",
requirements=["pandas", "numpy", "matplotlib"]
)
def process_data(data: list) -> dict:
"""Process numeric data and generate statistics"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import base64
import io
# Convert input data to DataFrame
df = pd.DataFrame(data)
# Calculate statistics
stats = {
"mean": df.mean().to_dict(),
"median": df.median().to_dict(),
"std": df.std().to_dict(),
"min": df.min().to_dict(),
"max": df.max().to_dict()
}
# Generate a histogram
plt.figure(figsize=(10, 6))
for column in df.columns:
plt.hist(df[column], alpha=0.5, label=column)
plt.legend()
plt.title("Data Distribution")
# Convert plot to base64
buffer = io.BytesIO()
plt.savefig(buffer, format='png')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.read()).decode('utf-8')
return {
"statistics": stats,
"histogram": image_base64
}
# Create a workflow
data_workflow = Workflow(
id="data-analysis",
description="Analyze numeric data",
tools=[process_data]
)
# Execute the workflow
if __name__ == "__main__":
# Sample data
sample_data = [
{"x": 1, "y": 5},
{"x": 2, "y": 8},
{"x": 3, "y": 11},
{"x": 4, "y": 14},
{"x": 5, "y": 17}
]
result = data_workflow.execute({"data": sample_data})
print("Statistics:", result["statistics"])
print("Histogram generated:", bool(result["histogram"]))
Integrating with External Services¶
Let's integrate with an external API using a Docker-based tool:
# weather_tool.py
from kubiya_sdk import tool, Workflow, config_model
from pydantic import BaseModel
@config_model(name="weather_api_config", description="Weather API Configuration")
class WeatherAPIConfig(BaseModel):
"""Weather API Configuration"""
api_key: str
base_url: str = "https://api.openweathermap.org/data/2.5"
@tool(
name="weather-service",
image="python:3.12-slim",
requirements=["requests"],
required_configs=["weather_api_config"]
)
def get_weather(city: str, config=None) -> dict:
"""Get current weather for a city"""
import requests
if not config:
raise ValueError("Weather API configuration is required")
api_key = config.get("api_key")
base_url = config.get("base_url")
url = f"{base_url}/weather?q={city}&appid={api_key}&units=metric"
response = requests.get(url)
data = response.json()
if response.status_code != 200:
return {"error": data.get("message", "Unknown error")}
return {
"city": data["name"],
"country": data["sys"]["country"],
"temperature": data["main"]["temp"],
"feels_like": data["main"]["feels_like"],
"description": data["weather"][0]["description"],
"humidity": data["main"]["humidity"],
"wind_speed": data["wind"]["speed"]
}
# Create a workflow
weather_workflow = Workflow(
id="weather-service",
description="Get weather information",
tools=[get_weather]
)
# Configure the tool (normally done through the SDK config system)
from kubiya_sdk.tools.registry import tool_registry
tool_registry.set_dynamic_config({
"weather_api_config": {
"api_key": "YOUR_API_KEY", # Replace with your actual API key
"base_url": "https://api.openweathermap.org/data/2.5"
}
})
# Execute the workflow
if __name__ == "__main__":
result = weather_workflow.execute({"city": "London"})
print(f"Weather in {result['city']}, {result['country']}:")
print(f"Temperature: {result['temperature']}°C (feels like {result['feels_like']}°C)")
print(f"Description: {result['description']}")
print(f"Humidity: {result['humidity']}%")
print(f"Wind Speed: {result['wind_speed']} m/s")
Running on Kubernetes¶
To run your tools on Kubernetes instead of local Docker:
# kubernetes_tool.py
from kubiya_sdk import tool, Workflow
from kubiya_sdk.infrastructure import KubernetesConfig
# Define Kubernetes configuration
k8s_config = KubernetesConfig(
namespace="kubiya-tools",
service_account="tool-runner",
resources={
"requests": {
"memory": "256Mi",
"cpu": "100m"
},
"limits": {
"memory": "512Mi",
"cpu": "200m"
}
}
)
@tool(
name="k8s-data-processor",
image="python:3.12-slim",
requirements=["pandas", "numpy", "scikit-learn"],
infrastructure=k8s_config # Specify Kubernetes execution
)
def process_large_dataset(dataset_url: str) -> dict:
"""Process large datasets on Kubernetes"""
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
# Download and process large dataset
df = pd.read_csv(dataset_url)
# Perform clustering
features = df.select_dtypes(include=['float64', 'int64']).fillna(0)
kmeans = KMeans(n_clusters=5, random_state=0).fit(features)
# Return results
return {
"clusters": kmeans.cluster_centers_.tolist(),
"labels": kmeans.labels_.tolist(),
"sample_count": len(df)
}
# Create a workflow
k8s_workflow = Workflow(
id="kubernetes-data-processing",
description="Process data on Kubernetes",
tools=[process_large_dataset]
)
# Execute the workflow
# Note: This requires a configured Kubernetes environment
if __name__ == "__main__":
result = k8s_workflow.execute({
"dataset_url": "https://example.com/large-dataset.csv"
})
print(f"Processed {result['sample_count']} samples into {len(result['clusters'])} clusters")
Next Steps¶
Now that you've created your first Docker-based tools and workflows with Kubiya SDK, you can:
- Learn more about the Docker-based Architecture
- Explore Tool Building techniques
- Dive into Workflow Composition
- Try Kubernetes Deployment for scaling
Happy building with Kubiya!