Skip to content

Dynamic Configuration Tools

This example demonstrates how to create tools that use dynamic configuration for improved flexibility and security.

Overview

Dynamic configurations allow you to:

  1. Define reusable configuration schemas
  2. Use configurations across multiple tools
  3. Securely store and share sensitive values
  4. Validate configuration values against schemas

Basic Configuration Schema

First, let's define a configuration schema using Pydantic models:

Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.decorators import config_model
from pydantic import BaseModel, Field

@config_model(name="api_credentials", description="API connection credentials")
class APICredentials(BaseModel):
    """Configuration for API access"""
    api_key: str = Field(..., description="API key for authentication")
    base_url: str = Field(..., description="Base URL for the API")
    timeout: int = Field(30, description="Request timeout in seconds")
    max_retries: int = Field(3, description="Maximum number of retry attempts")

Using Configuration in a Tool

Now, let's create a tool that uses this configuration:

Python
@kubiya.tool(
    description="Fetch data from an external API",
    required_configs=["api_credentials"]
)
def fetch_api_data(endpoint: str, params: dict = None):
    """
    Fetch data from an external API using stored credentials

    Args:
        endpoint: API endpoint path
        params: Optional query parameters

    Returns:
        API response data
    """
    import requests
    import os
    from time import sleep

    # Get configuration values
    config = os.environ.get("KUBIYA_CONFIG_api_credentials")
    if not config:
        raise ValueError("API credentials configuration is required")

    import json
    config = json.loads(config)

    # Extract configuration values
    api_key = config["api_key"]
    base_url = config["base_url"]
    timeout = config.get("timeout", 30)
    max_retries = config.get("max_retries", 3)

    # Build the request URL
    url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"

    # Set up headers with authentication
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    # Make the request with retry logic
    for attempt in range(max_retries):
        try:
            response = requests.get(
                url,
                headers=headers,
                params=params or {},
                timeout=timeout
            )
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            if attempt < max_retries - 1:
                # Exponential backoff
                sleep_time = 2 ** attempt
                sleep(sleep_time)
                continue
            return {
                "error": str(e),
                "status_code": getattr(e.response, "status_code", None)
            }

Alternative Dictionary-Based Configuration

You can also define configuration schemas using dictionaries:

Python
from kubiya_sdk.tools.decorators import config_dict

config_dict(
    name="database_connection",
    config_dict={
        "host": {
            "type": "string",
            "description": "Database server hostname",
            "required": True
        },
        "port": {
            "type": "integer",
            "description": "Database server port",
            "default": 5432
        },
        "username": {
            "type": "string",
            "description": "Database username",
            "required": True
        },
        "password": {
            "type": "string",
            "description": "Database password",
            "required": True,
            "secret": True
        },
        "database": {
            "type": "string",
            "description": "Database name",
            "required": True
        }
    },
    description="PostgreSQL database connection settings"
)

Using a Database Tool with Configuration

Here's a tool that uses the database configuration:

Python
@kubiya.tool(
    description="Query a PostgreSQL database",
    requirements=["psycopg2-binary"],
    required_configs=["database_connection"]
)
def query_database(sql_query: str):
    """
    Execute a SQL query against a PostgreSQL database

    Args:
        sql_query: SQL query to execute

    Returns:
        Query results
    """
    import os
    import json
    import psycopg2
    import psycopg2.extras

    # Get configuration from environment
    config = json.loads(os.environ.get("KUBIYA_CONFIG_database_connection"))

    # Extract configuration values
    host = config["host"]
    port = config.get("port", 5432)
    username = config["username"]
    password = config["password"]
    database = config["database"]

    # Connect to the database
    conn = psycopg2.connect(
        host=host,
        port=port,
        user=username,
        password=password,
        dbname=database
    )

    try:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
            cursor.execute(sql_query)

            # Check if this is a SELECT query
            if cursor.description is not None:
                results = cursor.fetchall()
                return list(results)
            else:
                # For non-SELECT queries (INSERT, UPDATE, DELETE)
                conn.commit()
                return {
                    "affected_rows": cursor.rowcount,
                    "status": "success"
                }
    except Exception as e:
        return {
            "error": str(e),
            "status": "error"
        }
    finally:
        conn.close()

Using Multiple Configurations

Tools can require multiple configurations:

Python
@kubiya.tool(
    description="Send product data to an external API",
    required_configs=["database_connection", "api_credentials"]
)
def sync_products_to_api():
    """
    Fetch product data from the database and send it to an external API

    Returns:
        Synchronization status
    """
    import os
    import json
    import psycopg2
    import psycopg2.extras
    import requests

    # Load configurations
    db_config = json.loads(os.environ.get("KUBIYA_CONFIG_database_connection"))
    api_config = json.loads(os.environ.get("KUBIYA_CONFIG_api_credentials"))

    # Connect to database
    conn = psycopg2.connect(
        host=db_config["host"],
        port=db_config.get("port", 5432),
        user=db_config["username"],
        password=db_config["password"],
        dbname=db_config["database"]
    )

    try:
        # Fetch products from the database
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
            cursor.execute("SELECT id, name, price, stock FROM products WHERE active = TRUE")
            products = cursor.fetchall()

        # Prepare API request
        api_url = f"{api_config['base_url'].rstrip('/')}/products/sync"
        headers = {
            "Authorization": f"Bearer {api_config['api_key']}",
            "Content-Type": "application/json"
        }

        # Send products to the API
        response = requests.post(
            api_url,
            headers=headers,
            json={"products": products},
            timeout=api_config.get("timeout", 30)
        )

        response.raise_for_status()

        return {
            "products_synced": len(products),
            "api_response": response.json(),
            "status": "success"
        }

    except Exception as e:
        return {
            "error": str(e),
            "status": "error"
        }
    finally:
        conn.close()

Manual Configuration with @with_config Decorator

For more control, you can use the @with_config decorator to manually handle configurations:

Python
from kubiya_sdk.tools.decorators import config_model, with_config
from pydantic import BaseModel, Field

@config_model(name="aws_credentials")
class AWSCredentials(BaseModel):
    """AWS credentials configuration"""
    access_key: str = Field(..., description="AWS Access Key ID")
    secret_key: str = Field(..., description="AWS Secret Access Key")
    region: str = Field("us-east-1", description="AWS Region")

@with_config("aws_credentials")
def list_s3_buckets(config):
    """
    List all S3 buckets for the AWS account

    Note: This function is not a Kubiya tool by itself,
    but can be used within a tool
    """
    import boto3

    # Create an S3 client using the config
    s3 = boto3.client(
        's3',
        aws_access_key_id=config["access_key"],
        aws_secret_access_key=config["secret_key"],
        region_name=config["region"]
    )

    # List all buckets
    response = s3.list_buckets()

    return [bucket['Name'] for bucket in response['Buckets']]

@kubiya.tool(description="Manage S3 buckets")
def manage_s3_buckets(action: str = "list", bucket_name: str = None):
    """
    Manage S3 buckets

    Args:
        action: Action to perform (list, create, delete)
        bucket_name: Name of the bucket (for create/delete actions)

    Returns:
        Action result
    """
    # Use the function with the configuration
    if action == "list":
        buckets = list_s3_buckets()
        return {"buckets": buckets}
    elif action == "create" and bucket_name:
        # Implementation omitted for brevity
        return {"status": "created", "bucket": bucket_name}
    elif action == "delete" and bucket_name:
        # Implementation omitted for brevity
        return {"status": "deleted", "bucket": bucket_name}
    else:
        return {"error": "Invalid action or missing bucket name"}

Working with Configuration in Workflows

You can use tools with dynamic configurations in workflows:

Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType

# Create a workflow that uses tools with configurations
data_pipeline_workflow = Workflow(
    name="Data Pipeline",
    description="Extract data from the database and send it to the API",
    nodes=[
        WorkflowNode(
            name="extract_data",
            description="Extract data from the database",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "query_database",
                "input_mapping": {
                    "sql_query": "SELECT * FROM customers WHERE updated_at > NOW() - INTERVAL '1 day'"
                }
            }
        ),
        WorkflowNode(
            name="send_to_api",
            description="Send data to the API",
            node_type=NodeType.TOOL,
            tool_config={
                "tool_name": "fetch_api_data",
                "input_mapping": {
                    "endpoint": "customers/sync",
                    "params": {
                        "data": "$extract_data.result"
                    }
                }
            },
            depends_on=["extract_data"]
        )
    ]
)

# Execute the workflow
result = data_pipeline_workflow.execute()

Conclusion

Dynamic configurations provide a powerful way to manage settings and credentials for your tools. They help with:

  1. Reusability: Define configurations once and use them across multiple tools
  2. Security: Keep sensitive information like API keys and passwords separate from your code
  3. Validation: Ensure that configurations contain the required fields and correct data types
  4. Flexibility: Easily update configurations without changing tool code