File Handling Tools¶
This page demonstrates how to correctly work with files and directories in Kubiya SDK.
Understanding File Handling Models¶
Kubiya SDK provides three distinct ways to work with files:
- FileSpec - Include specific files within your container
- Volume - Mount persistent directories between your host and container
- GitRepoSpec - Clone Git repositories into your container
Let's explore each approach with real-world examples.
FileSpec for Static File Content¶
FileSpec is used when you need to include specific file content in your container. This is ideal for configuration files, scripts, or any static content needed by your tool.
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec
import json
# Define a helper script to include in the container
helper_script = """#!/bin/bash
# Process input data
echo "Processing data: $1"
echo "$1" | tr '[:lower:]' '[:upper:]'
"""
@kubiya.tool(
name="file_processor",
description="Process text using a helper script",
with_files=[
FileSpec(
destination="/app/scripts/helper.sh",
content=helper_script # Inline content is provided
),
FileSpec(
destination="/app/config/settings.json",
content=json.dumps({
"timeout": 30,
"retries": 3,
"version": "1.0.0"
})
)
]
)
def process_with_helper(text: str) -> dict:
"""
Process text using an included helper script
Args:
text: The text to process
Returns:
Processing results
"""
import subprocess
import json
import os
# Execute helper script with text as argument
subprocess.run(["chmod", "+x", "/app/scripts/helper.sh"], check=True)
result = subprocess.run(
["/app/scripts/helper.sh", text],
capture_output=True,
text=True,
check=True
)
# Load configuration
with open("/app/config/settings.json", "r") as f:
config = json.load(f)
# Return results
return {
"original": text,
"processed": result.stdout.strip(),
"length": len(text),
"config_version": config["version"]
}
You can also use FileSpec to include files from the host system by specifying the source parameter:
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec
@kubiya.tool(
name="config_reader",
description="Read a configuration file",
with_files=[
FileSpec(
source="${HOME}/configs/app.yaml", # Source file on host
destination="/app/config/app.yaml" # Destination in container
)
]
)
def read_config() -> dict:
"""Read and parse the configuration file"""
import yaml
with open("/app/config/app.yaml", "r") as f:
config = yaml.safe_load(f)
return config
Volume for Persistent Storage¶
Volume is used when you need to mount a directory for persistent storage or accessing a collection of files. This is ideal for data that needs to be preserved between tool executions or when processing multiple files.
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import Volume
from typing import Dict, List
@kubiya.tool(
name="log_analyzer",
description="Analyze log files in a directory",
requirements=["pandas", "matplotlib"],
with_files=[
Volume(
name="logs_volume", # Volume name - identifies the storage
path="/app/logs" # Mount path inside the container
)
]
)
def analyze_logs(pattern: str = "*.log", days: int = 7) -> Dict[str, List]:
"""
Analyze log files matching a pattern
Args:
pattern: Glob pattern to match log files
days: Number of days of logs to analyze
Returns:
Analysis results with error counts by type
"""
import os
import glob
import pandas as pd
from datetime import datetime, timedelta
import re
# Path where the volume is mounted
logs_dir = "/app/logs"
# Find all log files matching the pattern
log_files = glob.glob(os.path.join(logs_dir, pattern))
# Calculate the cutoff date
cutoff_date = datetime.now() - timedelta(days=days)
# Error patterns to look for
error_patterns = {
"connection_error": r"connection (?:failed|refused|reset|timeout)",
"permission_error": r"permission denied|access forbidden|unauthorized",
"not_found": r"404 not found|resource not found|no such file",
"internal_error": r"internal server error|500 error|crashed|exception",
"validation_error": r"invalid (input|format|data)|validation failed"
}
# Initialize counters
error_counts = {error_type: 0 for error_type in error_patterns}
error_examples = {error_type: [] for error_type in error_patterns}
# Analyze each log file
for log_file in log_files:
# Get file modification time
mod_time = datetime.fromtimestamp(os.path.getmtime(log_file))
# Skip files older than the cutoff date
if mod_time < cutoff_date:
continue
# Process the file
with open(log_file, 'r', errors='ignore') as f:
for line in f:
# Check each error pattern
for error_type, pattern in error_patterns.items():
if re.search(pattern, line, re.IGNORECASE):
error_counts[error_type] += 1
# Store a sample of errors (up to 3 per type)
if len(error_examples[error_type]) < 3:
error_examples[error_type].append(line.strip())
# Generate results
return {
"error_counts": error_counts,
"error_examples": error_examples,
"files_analyzed": len(log_files),
"date_range": f"Last {days} days"
}
The key difference between Volume and FileSpec is that volumes persist data across container runs and are designed for directories, while FileSpec is for individual files that are created fresh each time the container runs.
Using Volumes for File Uploads¶
When you need to upload files from the host to a container, use Volume as a separate parameter:
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import Volume
from typing import Dict, Any
@kubiya.tool(
name="csv_processor",
description="Process a CSV file and return statistics",
requirements=["pandas", "numpy"],
with_volumes=[ # Note this is using with_volumes, not with_files
Volume(
name="data_volume",
path="/app/data"
)
]
)
def process_csv(filename: str) -> Dict[str, Any]:
"""
Process a CSV file and return statistics
Args:
filename: Name of the CSV file in the data volume
Returns:
Statistical summary of the CSV data
"""
import os
import pandas as pd
# Full path to the CSV file
file_path = os.path.join("/app/data", filename)
# Check if the file exists
if not os.path.exists(file_path):
available_files = os.listdir("/app/data")
return {
"error": f"File not found: {filename}",
"available_files": available_files
}
# Read the CSV file
df = pd.read_csv(file_path)
# Generate statistical summary
return {
"filename": filename,
"rows": len(df),
"columns": list(df.columns),
"summary": df.describe().to_dict(),
"missing_values": df.isnull().sum().to_dict()
}
Git Repository Integration¶
For accessing code from a Git repository, use GitRepoSpec:
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import GitRepoSpec
from typing import Dict, Any
@kubiya.tool(
name="git_code_analyzer",
description="Analyze code from a Git repository",
requirements=["pygit2", "radon"],
with_files=[ # GitRepoSpec is added to with_files
GitRepoSpec(
url="${GIT_REPO_URL}", # URL to the Git repository
branch="main", # Branch to clone
destination="/app/repo" # Where to clone the repo
)
]
)
def analyze_code(file_pattern: str = "*.py") -> Dict[str, Any]:
"""
Analyze code complexity and quality metrics
Args:
file_pattern: Pattern to match files for analysis
Returns:
Code quality metrics
"""
import os
import glob
import radon.complexity as cc
from radon.metrics import h_visit, mi_visit
# Path where the repo is cloned
repo_path = "/app/repo"
# Check if repository was cloned
if not os.path.exists(repo_path):
return {
"error": "Repository not found. Please ensure GIT_REPO_URL environment variable is set."
}
# Find all files matching the pattern
files = glob.glob(os.path.join(repo_path, "**", file_pattern), recursive=True)
# Initialize results
results = {
"file_count": len(files),
"complexities": {},
"maintainability": {},
"summary": {
"average_complexity": 0,
"average_maintainability": 0,
"highest_complexity": {"file": "", "value": 0},
"lowest_maintainability": {"file": "", "value": 100}
}
}
# Process each file
total_complexity = 0
total_maintainability = 0
for filepath in files:
rel_path = os.path.relpath(filepath, repo_path)
# Read file content
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Calculate code complexity
try:
complexity_results = list(cc.cc_visit(content))
if complexity_results:
avg_complexity = sum(result.complexity for result in complexity_results) / len(complexity_results)
results["complexities"][rel_path] = avg_complexity
total_complexity += avg_complexity
# Update highest complexity
if avg_complexity > results["summary"]["highest_complexity"]["value"]:
results["summary"]["highest_complexity"] = {
"file": rel_path,
"value": avg_complexity
}
# Calculate maintainability index
mi_result = mi_visit(content, multi=True)
if mi_result:
maintainability = mi_result
results["maintainability"][rel_path] = maintainability
total_maintainability += maintainability
# Update lowest maintainability
if maintainability < results["summary"]["lowest_maintainability"]["value"]:
results["summary"]["lowest_maintainability"] = {
"file": rel_path,
"value": maintainability
}
except Exception as e:
# Skip files that can't be analyzed
pass
# Calculate averages
if files:
results["summary"]["average_complexity"] = total_complexity / len(files)
results["summary"]["average_maintainability"] = total_maintainability / len(files)
return results
Combining File Handling Approaches¶
You can combine different file handling approaches in a single tool by using both with_files and with_volumes:
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec, Volume, GitRepoSpec
from typing import Dict, Any
@kubiya.tool(
name="comprehensive_file_handler",
description="Demonstrates all file handling approaches",
requirements=["pandas", "pygit2"],
with_files=[
# Static script included directly in the tool
FileSpec(
destination="/app/scripts/process.sh",
content="""#!/bin/bash
echo "Processing data..."
cat $1 | sort | uniq -c | sort -nr > $2
echo "Done!"
"""
),
# Git repository for code
GitRepoSpec(
url="${GIT_REPO_URL}",
branch="main",
destination="/app/repo"
)
],
with_volumes=[
# Persistent volume for data storage
Volume(
name="data_volume",
path="/app/data"
)
]
)
def handle_files(input_file: str, output_file: str) -> Dict[str, Any]:
"""
Process files using different file handling approaches
Args:
input_file: Name of input file in data volume
output_file: Name of output file to create in data volume
Returns:
Processing results
"""
import os
import subprocess
# Make process script executable
subprocess.run(["chmod", "+x", "/app/scripts/process.sh"], check=True)
# Check if input file exists
input_path = os.path.join("/app/data", input_file)
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_file}"}
# Set output path
output_path = os.path.join("/app/data", output_file)
# Process the file using the script
result = subprocess.run(
["/app/scripts/process.sh", input_path, output_path],
capture_output=True,
text=True
)
# Check if Git repo exists and count files
repo_path = "/app/repo"
repo_files = []
if os.path.exists(repo_path):
for root, _, files in os.walk(repo_path):
for file in files:
repo_files.append(os.path.join(root, file))
return {
"input_file": input_file,
"output_file": output_file,
"process_stdout": result.stdout,
"process_stderr": result.stderr,
"exit_code": result.returncode,
"repo_file_count": len(repo_files)
}
Real-World Example: Just-In-Time Access Tool¶
Here's a real-world example inspired by the just-in-time access tool implementation:
import inspect
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec, Volume
from typing import Dict, Any
# Import handler script
from scripts import access_approval_handler
@kubiya.tool(
name="approve_access_request",
description="Handle the approval or rejection of an access request",
with_files=[
FileSpec(
destination="/opt/scripts/access_approval_handler.py",
content=inspect.getsource(access_approval_handler),
),
],
with_volumes=[
Volume(name="db_data", path="/var/lib/database")
],
env=[
"KUBIYA_AGENT_NAME",
"SLACK_CHANNEL_ID",
],
secrets=[
"SLACK_API_TOKEN",
"KUBIYA_API_KEY",
]
)
def approve_access(request_id: str, approval_action: str, ttl: str = None) -> Dict[str, Any]:
"""
Process an access request approval or rejection
Args:
request_id: The unique identifier of the access request
approval_action: Either 'approve' or 'reject'
ttl: If approving, how long the access should be valid (e.g., '1h', '30m')
Returns:
Results of the approval/rejection process
"""
import subprocess
import json
# Validate inputs
if approval_action not in ['approve', 'reject']:
return {
"error": f"Invalid approval action: {approval_action}. Must be 'approve' or 'reject'."
}
if approval_action == 'approve' and not ttl:
return {
"error": "TTL is required when approving an access request."
}
# Run the approval handler script
cmd = [
"python",
"/opt/scripts/access_approval_handler.py",
request_id,
approval_action
]
if ttl:
cmd.append(ttl)
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
# Parse the output
try:
return json.loads(result.stdout)
except:
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr if result.stderr else None
}
Running Examples in Google Colab¶
You can run these examples in Google Colab. Here's how to set up a Colab notebook for Kubiya SDK:
# Install Kubiya SDK
!pip install kubiya-sdk
# Import the necessary modules
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec, Volume
from typing import Dict, Any
# Define your tool
@kubiya.tool(
name="colab_file_demo",
description="Process files in a Colab environment",
with_files=[
FileSpec(
destination="/app/script.py",
content="""
import pandas as pd
import os
def process_csv(filename):
df = pd.read_csv(filename)
return {
"rows": len(df),
"columns": list(df.columns),
"summary": df.describe().to_dict()
}
"""
)
],
with_volumes=[
Volume(
name="colab_data",
path="/app/data"
)
],
requirements=["pandas", "numpy"]
)
def process_colab_data(filename: str) -> Dict[str, Any]:
"""Process data in a Colab notebook"""
import sys
import os
# Add script directory to path
sys.path.append("/app")
# Import the function from our script
from script import process_csv
# Process the file
file_path = os.path.join("/app/data", filename)
if os.path.exists(file_path):
return process_csv(file_path)
else:
return {"error": f"File not found: {filename}"}
# Create a sample CSV file
%%writefile sample_data.csv
id,name,value
1,Item 1,10.5
2,Item 2,20.3
3,Item 3,15.7
# Call the tool with the sample file
# Note: In Colab, you'll need to use Docker to run this properly
Real-World Example: Kubernetes File Operations¶
Here's a real-world example for a Kubernetes tool:
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec
from typing import Dict, Any
# Based on the kubiya/community-tools kubernetes implementation
@kubiya.tool(
name="kubernetes_logs",
description="Collect and analyze Kubernetes logs",
image="kubiya/kubectl-light:latest",
with_files=[
FileSpec(
source="/var/run/secrets/kubernetes.io/serviceaccount/token",
destination="/tmp/kubernetes_context_token"
),
FileSpec(
source="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
destination="/tmp/kubernetes_context_cert"
)
]
)
def collect_pod_logs(namespace: str, pod: str, container: str = None, lines: int = 100) -> Dict[str, Any]:
"""
Collect logs from a Kubernetes pod
Args:
namespace: Kubernetes namespace
pod: Pod name
container: Container name (optional)
lines: Number of log lines to retrieve
Returns:
Logs and metadata
"""
import subprocess
import os
import json
# Set up Kubernetes context using the mounted tokens
setup_commands = [
"TOKEN_LOCATION='/tmp/kubernetes_context_token'",
"CERT_LOCATION='/tmp/kubernetes_context_cert'",
"KUBE_TOKEN=$(cat $TOKEN_LOCATION)",
"kubectl config set-cluster in-cluster --server=https://kubernetes.default.svc --certificate-authority=$CERT_LOCATION",
"kubectl config set-credentials in-cluster --token=$KUBE_TOKEN",
"kubectl config set-context in-cluster --cluster=in-cluster --user=in-cluster",
"kubectl config use-context in-cluster"
]
# Execute setup commands
for cmd in setup_commands:
subprocess.run(cmd, shell=True, check=True)
# Build the logs command
logs_cmd = ["kubectl", "logs", f"--namespace={namespace}", pod]
if container:
logs_cmd.extend(["--container", container])
logs_cmd.extend(["--tail", str(lines)])
# Get the logs
logs_result = subprocess.run(
logs_cmd,
capture_output=True,
text=True
)
# Get pod details
describe_cmd = ["kubectl", "describe", f"--namespace={namespace}", f"pod/{pod}"]
describe_result = subprocess.run(
describe_cmd,
capture_output=True,
text=True
)
return {
"pod": pod,
"namespace": namespace,
"container": container,
"logs": logs_result.stdout,
"error": logs_result.stderr if logs_result.stderr else None,
"metadata": describe_result.stdout,
"success": logs_result.returncode == 0
}
Visualizing Workflows with Mermaid¶
You can visualize workflows that use file handling tools with Mermaid diagrams:
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
# Create a data processing workflow
data_processing_workflow = Workflow(
name="Data Processing Pipeline",
description="Process data files through multiple stages",
nodes=[
WorkflowNode(
name="extract_data",
description="Extract data from CSV files",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "csv_processor",
"input_mapping": {
"filename": "input.csv"
}
}
),
WorkflowNode(
name="transform_data",
description="Transform the extracted data",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "data_transformer",
"input_mapping": {
"data": "$extract_data.result"
}
},
depends_on=["extract_data"]
),
WorkflowNode(
name="analyze_data",
description="Perform data analysis",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "data_analyzer",
"input_mapping": {
"data": "$transform_data.result"
}
},
depends_on=["transform_data"]
)
]
)
# Generate a Mermaid diagram
mermaid_diagram = data_processing_workflow.to_mermaid()
print(mermaid_diagram)
This will generate a Mermaid diagram like:
graph TD
extract_data[Extract Data]
transform_data[Transform Data]
analyze_data[Analyze Data]
extract_data --> transform_data
transform_data --> analyze_data
classDef default fill:#f9f9f9,stroke:#333,stroke-width:1px;
classDef active fill:#d3f9d8,stroke:#333,stroke-width:1px;
classDef completed fill:#87ceeb,stroke:#333,stroke-width:1px;
classDef failed fill:#ffcccc,stroke:#333,stroke-width:1px;
Next Steps¶
Now that you've learned about file handling in Kubiya SDK, explore these related topics:
- Basic Tool Creation - Getting started with Kubiya tools
- Dynamic Configuration Tools - Using configurations with tools
- Service Integration Tools - Integrating with external services