Skip to content

File Handling Tools

This page demonstrates how to correctly work with files and directories in Kubiya SDK.

Understanding File Handling Models

Kubiya SDK provides three distinct ways to work with files:

  1. FileSpec - Include specific files within your container
  2. Volume - Mount persistent directories between your host and container
  3. GitRepoSpec - Clone Git repositories into your container

Let's explore each approach with real-world examples.

FileSpec for Static File Content

FileSpec is used when you need to include specific file content in your container. This is ideal for configuration files, scripts, or any static content needed by your tool.

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec
import json

# Define a helper script to include in the container
helper_script = """#!/bin/bash
# Process input data
echo "Processing data: $1"
echo "$1" | tr '[:lower:]' '[:upper:]'
"""

@kubiya.tool(
    name="file_processor",
    description="Process text using a helper script",
    with_files=[
        FileSpec(
            destination="/app/scripts/helper.sh",
            content=helper_script  # Inline content is provided
        ),
        FileSpec(
            destination="/app/config/settings.json",
            content=json.dumps({
                "timeout": 30,
                "retries": 3,
                "version": "1.0.0"
            })
        )
    ]
)
def process_with_helper(text: str) -> dict:
    """
    Process text using an included helper script

    Args:
        text: The text to process

    Returns:
        Processing results
    """
    import subprocess
    import json
    import os

    # Execute helper script with text as argument
    subprocess.run(["chmod", "+x", "/app/scripts/helper.sh"], check=True)
    result = subprocess.run(
        ["/app/scripts/helper.sh", text],
        capture_output=True,
        text=True,
        check=True
    )

    # Load configuration
    with open("/app/config/settings.json", "r") as f:
        config = json.load(f)

    # Return results
    return {
        "original": text,
        "processed": result.stdout.strip(),
        "length": len(text),
        "config_version": config["version"]
    }

You can also use FileSpec to include files from the host system by specifying the source parameter:

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec

@kubiya.tool(
    name="config_reader",
    description="Read a configuration file",
    with_files=[
        FileSpec(
            source="${HOME}/configs/app.yaml",  # Source file on host
            destination="/app/config/app.yaml"  # Destination in container
        )
    ]
)
def read_config() -> dict:
    """Read and parse the configuration file"""
    import yaml

    with open("/app/config/app.yaml", "r") as f:
        config = yaml.safe_load(f)

    return config

Volume for Persistent Storage

Volume is used when you need to mount a directory for persistent storage or accessing a collection of files. This is ideal for data that needs to be preserved between tool executions or when processing multiple files.

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import Volume
from typing import Dict, List

@kubiya.tool(
    name="log_analyzer",
    description="Analyze log files in a directory",
    requirements=["pandas", "matplotlib"],
    with_files=[
        Volume(
            name="logs_volume",  # Volume name - identifies the storage
            path="/app/logs"     # Mount path inside the container
        )
    ]
)
def analyze_logs(pattern: str = "*.log", days: int = 7) -> Dict[str, List]:
    """
    Analyze log files matching a pattern

    Args:
        pattern: Glob pattern to match log files
        days: Number of days of logs to analyze

    Returns:
        Analysis results with error counts by type
    """
    import os
    import glob
    import pandas as pd
    from datetime import datetime, timedelta
    import re

    # Path where the volume is mounted
    logs_dir = "/app/logs"

    # Find all log files matching the pattern
    log_files = glob.glob(os.path.join(logs_dir, pattern))

    # Calculate the cutoff date
    cutoff_date = datetime.now() - timedelta(days=days)

    # Error patterns to look for
    error_patterns = {
        "connection_error": r"connection (?:failed|refused|reset|timeout)",
        "permission_error": r"permission denied|access forbidden|unauthorized",
        "not_found": r"404 not found|resource not found|no such file",
        "internal_error": r"internal server error|500 error|crashed|exception",
        "validation_error": r"invalid (input|format|data)|validation failed"
    }

    # Initialize counters
    error_counts = {error_type: 0 for error_type in error_patterns}
    error_examples = {error_type: [] for error_type in error_patterns}

    # Analyze each log file
    for log_file in log_files:
        # Get file modification time
        mod_time = datetime.fromtimestamp(os.path.getmtime(log_file))

        # Skip files older than the cutoff date
        if mod_time < cutoff_date:
            continue

        # Process the file
        with open(log_file, 'r', errors='ignore') as f:
            for line in f:
                # Check each error pattern
                for error_type, pattern in error_patterns.items():
                    if re.search(pattern, line, re.IGNORECASE):
                        error_counts[error_type] += 1

                        # Store a sample of errors (up to 3 per type)
                        if len(error_examples[error_type]) < 3:
                            error_examples[error_type].append(line.strip())

    # Generate results
    return {
        "error_counts": error_counts,
        "error_examples": error_examples,
        "files_analyzed": len(log_files),
        "date_range": f"Last {days} days"
    }

The key difference between Volume and FileSpec is that volumes persist data across container runs and are designed for directories, while FileSpec is for individual files that are created fresh each time the container runs.

Using Volumes for File Uploads

When you need to upload files from the host to a container, use Volume as a separate parameter:

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import Volume
from typing import Dict, Any

@kubiya.tool(
    name="csv_processor",
    description="Process a CSV file and return statistics",
    requirements=["pandas", "numpy"],
    with_volumes=[  # Note this is using with_volumes, not with_files
        Volume(
            name="data_volume",
            path="/app/data"
        )
    ]
)
def process_csv(filename: str) -> Dict[str, Any]:
    """
    Process a CSV file and return statistics

    Args:
        filename: Name of the CSV file in the data volume

    Returns:
        Statistical summary of the CSV data
    """
    import os
    import pandas as pd

    # Full path to the CSV file
    file_path = os.path.join("/app/data", filename)

    # Check if the file exists
    if not os.path.exists(file_path):
        available_files = os.listdir("/app/data")
        return {
            "error": f"File not found: {filename}",
            "available_files": available_files
        }

    # Read the CSV file
    df = pd.read_csv(file_path)

    # Generate statistical summary
    return {
        "filename": filename,
        "rows": len(df),
        "columns": list(df.columns),
        "summary": df.describe().to_dict(),
        "missing_values": df.isnull().sum().to_dict()
    }

Git Repository Integration

For accessing code from a Git repository, use GitRepoSpec:

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import GitRepoSpec
from typing import Dict, Any

@kubiya.tool(
    name="git_code_analyzer",
    description="Analyze code from a Git repository",
    requirements=["pygit2", "radon"],
    with_files=[  # GitRepoSpec is added to with_files
        GitRepoSpec(
            url="${GIT_REPO_URL}",  # URL to the Git repository
            branch="main",          # Branch to clone
            destination="/app/repo" # Where to clone the repo
        )
    ]
)
def analyze_code(file_pattern: str = "*.py") -> Dict[str, Any]:
    """
    Analyze code complexity and quality metrics

    Args:
        file_pattern: Pattern to match files for analysis

    Returns:
        Code quality metrics
    """
    import os
    import glob
    import radon.complexity as cc
    from radon.metrics import h_visit, mi_visit

    # Path where the repo is cloned
    repo_path = "/app/repo"

    # Check if repository was cloned
    if not os.path.exists(repo_path):
        return {
            "error": "Repository not found. Please ensure GIT_REPO_URL environment variable is set."
        }

    # Find all files matching the pattern
    files = glob.glob(os.path.join(repo_path, "**", file_pattern), recursive=True)

    # Initialize results
    results = {
        "file_count": len(files),
        "complexities": {},
        "maintainability": {},
        "summary": {
            "average_complexity": 0,
            "average_maintainability": 0,
            "highest_complexity": {"file": "", "value": 0},
            "lowest_maintainability": {"file": "", "value": 100}
        }
    }

    # Process each file
    total_complexity = 0
    total_maintainability = 0

    for filepath in files:
        rel_path = os.path.relpath(filepath, repo_path)

        # Read file content
        with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
            content = f.read()

        # Calculate code complexity
        try:
            complexity_results = list(cc.cc_visit(content))
            if complexity_results:
                avg_complexity = sum(result.complexity for result in complexity_results) / len(complexity_results)
                results["complexities"][rel_path] = avg_complexity
                total_complexity += avg_complexity

                # Update highest complexity
                if avg_complexity > results["summary"]["highest_complexity"]["value"]:
                    results["summary"]["highest_complexity"] = {
                        "file": rel_path,
                        "value": avg_complexity
                    }

            # Calculate maintainability index
            mi_result = mi_visit(content, multi=True)
            if mi_result:
                maintainability = mi_result
                results["maintainability"][rel_path] = maintainability
                total_maintainability += maintainability

                # Update lowest maintainability
                if maintainability < results["summary"]["lowest_maintainability"]["value"]:
                    results["summary"]["lowest_maintainability"] = {
                        "file": rel_path,
                        "value": maintainability
                    }
        except Exception as e:
            # Skip files that can't be analyzed
            pass

    # Calculate averages
    if files:
        results["summary"]["average_complexity"] = total_complexity / len(files)
        results["summary"]["average_maintainability"] = total_maintainability / len(files)

    return results

Combining File Handling Approaches

You can combine different file handling approaches in a single tool by using both with_files and with_volumes:

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec, Volume, GitRepoSpec
from typing import Dict, Any

@kubiya.tool(
    name="comprehensive_file_handler",
    description="Demonstrates all file handling approaches",
    requirements=["pandas", "pygit2"],
    with_files=[
        # Static script included directly in the tool
        FileSpec(
            destination="/app/scripts/process.sh",
            content="""#!/bin/bash
echo "Processing data..."
cat $1 | sort | uniq -c | sort -nr > $2
echo "Done!"
"""
        ),
        # Git repository for code
        GitRepoSpec(
            url="${GIT_REPO_URL}",
            branch="main",
            destination="/app/repo"
        )
    ],
    with_volumes=[
        # Persistent volume for data storage
        Volume(
            name="data_volume",
            path="/app/data"
        )
    ]
)
def handle_files(input_file: str, output_file: str) -> Dict[str, Any]:
    """
    Process files using different file handling approaches

    Args:
        input_file: Name of input file in data volume
        output_file: Name of output file to create in data volume

    Returns:
        Processing results
    """
    import os
    import subprocess

    # Make process script executable
    subprocess.run(["chmod", "+x", "/app/scripts/process.sh"], check=True)

    # Check if input file exists
    input_path = os.path.join("/app/data", input_file)
    if not os.path.exists(input_path):
        return {"error": f"Input file not found: {input_file}"}

    # Set output path
    output_path = os.path.join("/app/data", output_file)

    # Process the file using the script
    result = subprocess.run(
        ["/app/scripts/process.sh", input_path, output_path],
        capture_output=True,
        text=True
    )

    # Check if Git repo exists and count files
    repo_path = "/app/repo"
    repo_files = []
    if os.path.exists(repo_path):
        for root, _, files in os.walk(repo_path):
            for file in files:
                repo_files.append(os.path.join(root, file))

    return {
        "input_file": input_file,
        "output_file": output_file,
        "process_stdout": result.stdout,
        "process_stderr": result.stderr,
        "exit_code": result.returncode,
        "repo_file_count": len(repo_files)
    }

Real-World Example: Just-In-Time Access Tool

Here's a real-world example inspired by the just-in-time access tool implementation:

Python
import inspect
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec, Volume
from typing import Dict, Any

# Import handler script
from scripts import access_approval_handler

@kubiya.tool(
    name="approve_access_request",
    description="Handle the approval or rejection of an access request",
    with_files=[
        FileSpec(
            destination="/opt/scripts/access_approval_handler.py",
            content=inspect.getsource(access_approval_handler),
        ),
    ],
    with_volumes=[
        Volume(name="db_data", path="/var/lib/database")
    ],
    env=[
        "KUBIYA_AGENT_NAME",
        "SLACK_CHANNEL_ID",
    ],
    secrets=[
        "SLACK_API_TOKEN",
        "KUBIYA_API_KEY",
    ]
)
def approve_access(request_id: str, approval_action: str, ttl: str = None) -> Dict[str, Any]:
    """
    Process an access request approval or rejection

    Args:
        request_id: The unique identifier of the access request
        approval_action: Either 'approve' or 'reject'
        ttl: If approving, how long the access should be valid (e.g., '1h', '30m')

    Returns:
        Results of the approval/rejection process
    """
    import subprocess
    import json

    # Validate inputs
    if approval_action not in ['approve', 'reject']:
        return {
            "error": f"Invalid approval action: {approval_action}. Must be 'approve' or 'reject'."
        }

    if approval_action == 'approve' and not ttl:
        return {
            "error": "TTL is required when approving an access request."
        }

    # Run the approval handler script
    cmd = [
        "python", 
        "/opt/scripts/access_approval_handler.py",
        request_id,
        approval_action
    ]

    if ttl:
        cmd.append(ttl)

    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True
    )

    # Parse the output
    try:
        return json.loads(result.stdout)
    except:
        return {
            "success": result.returncode == 0,
            "output": result.stdout,
            "error": result.stderr if result.stderr else None
        }

Running Examples in Google Colab

You can run these examples in Google Colab. Here's how to set up a Colab notebook for Kubiya SDK:

Python
# Install Kubiya SDK
!pip install kubiya-sdk

# Import the necessary modules
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec, Volume
from typing import Dict, Any

# Define your tool
@kubiya.tool(
    name="colab_file_demo",
    description="Process files in a Colab environment",
    with_files=[
        FileSpec(
            destination="/app/script.py",
            content="""
import pandas as pd
import os

def process_csv(filename):
    df = pd.read_csv(filename)
    return {
        "rows": len(df),
        "columns": list(df.columns),
        "summary": df.describe().to_dict()
    }
"""
        )
    ],
    with_volumes=[
        Volume(
            name="colab_data",
            path="/app/data"
        )
    ],
    requirements=["pandas", "numpy"]
)
def process_colab_data(filename: str) -> Dict[str, Any]:
    """Process data in a Colab notebook"""
    import sys
    import os

    # Add script directory to path
    sys.path.append("/app")

    # Import the function from our script
    from script import process_csv

    # Process the file
    file_path = os.path.join("/app/data", filename)
    if os.path.exists(file_path):
        return process_csv(file_path)
    else:
        return {"error": f"File not found: {filename}"}

# Create a sample CSV file
%%writefile sample_data.csv
id,name,value
1,Item 1,10.5
2,Item 2,20.3
3,Item 3,15.7

# Call the tool with the sample file
# Note: In Colab, you'll need to use Docker to run this properly

Open In Colab

Real-World Example: Kubernetes File Operations

Here's a real-world example for a Kubernetes tool:

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.models import FileSpec
from typing import Dict, Any

# Based on the kubiya/community-tools kubernetes implementation
@kubiya.tool(
    name="kubernetes_logs",
    description="Collect and analyze Kubernetes logs",
    image="kubiya/kubectl-light:latest",
    with_files=[
        FileSpec(
            source="/var/run/secrets/kubernetes.io/serviceaccount/token",
            destination="/tmp/kubernetes_context_token"
        ),
        FileSpec(
            source="/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
            destination="/tmp/kubernetes_context_cert"
        )
    ]
)
def collect_pod_logs(namespace: str, pod: str, container: str = None, lines: int = 100) -> Dict[str, Any]:
    """
    Collect logs from a Kubernetes pod

    Args:
        namespace: Kubernetes namespace
        pod: Pod name
        container: Container name (optional)
        lines: Number of log lines to retrieve

    Returns:
        Logs and metadata
    """
    import subprocess
    import os
    import json

    # Set up Kubernetes context using the mounted tokens
    setup_commands = [
        "TOKEN_LOCATION='/tmp/kubernetes_context_token'",
        "CERT_LOCATION='/tmp/kubernetes_context_cert'",
        "KUBE_TOKEN=$(cat $TOKEN_LOCATION)",
        "kubectl config set-cluster in-cluster --server=https://kubernetes.default.svc --certificate-authority=$CERT_LOCATION",
        "kubectl config set-credentials in-cluster --token=$KUBE_TOKEN",
        "kubectl config set-context in-cluster --cluster=in-cluster --user=in-cluster",
        "kubectl config use-context in-cluster"
    ]

    # Execute setup commands
    for cmd in setup_commands:
        subprocess.run(cmd, shell=True, check=True)

    # Build the logs command
    logs_cmd = ["kubectl", "logs", f"--namespace={namespace}", pod]
    if container:
        logs_cmd.extend(["--container", container])
    logs_cmd.extend(["--tail", str(lines)])

    # Get the logs
    logs_result = subprocess.run(
        logs_cmd,
        capture_output=True,
        text=True
    )

    # Get pod details
    describe_cmd = ["kubectl", "describe", f"--namespace={namespace}", f"pod/{pod}"]
    describe_result = subprocess.run(
        describe_cmd,
        capture_output=True,
        text=True
    )

    return {
        "pod": pod,
        "namespace": namespace,
        "container": container,
        "logs": logs_result.stdout,
        "error": logs_result.stderr if logs_result.stderr else None,
        "metadata": describe_result.stdout,
        "success": logs_result.returncode == 0
    }

Visualizing Workflows with Mermaid

You can visualize workflows that use file handling tools with Mermaid diagrams:

Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType

# Create a data processing workflow
data_processing_workflow = Workflow(
    name="Data Processing Pipeline",
    description="Process data files through multiple stages",
    nodes=[
        WorkflowNode(
            name="extract_data",
            description="Extract data from CSV files",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "csv_processor",
                "input_mapping": {
                    "filename": "input.csv"
                }
            }
        ),
        WorkflowNode(
            name="transform_data",
            description="Transform the extracted data",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "data_transformer",
                "input_mapping": {
                    "data": "$extract_data.result"
                }
            },
            depends_on=["extract_data"]
        ),
        WorkflowNode(
            name="analyze_data",
            description="Perform data analysis",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "data_analyzer",
                "input_mapping": {
                    "data": "$transform_data.result"
                }
            },
            depends_on=["transform_data"]
        )
    ]
)

# Generate a Mermaid diagram
mermaid_diagram = data_processing_workflow.to_mermaid()
print(mermaid_diagram)

This will generate a Mermaid diagram like:

graph TD
    extract_data[Extract Data]
    transform_data[Transform Data]
    analyze_data[Analyze Data]

    extract_data --> transform_data
    transform_data --> analyze_data

    classDef default fill:#f9f9f9,stroke:#333,stroke-width:1px;
    classDef active fill:#d3f9d8,stroke:#333,stroke-width:1px;
    classDef completed fill:#87ceeb,stroke:#333,stroke-width:1px;
    classDef failed fill:#ffcccc,stroke:#333,stroke-width:1px;

Next Steps

Now that you've learned about file handling in Kubiya SDK, explore these related topics: