Dynamic Configuration Tools¶
This example demonstrates how to create tools that use dynamic configuration for improved flexibility and security.
Overview¶
Dynamic configurations allow you to:
- Define reusable configuration schemas
- Use configurations across multiple tools
- Securely store and share sensitive values
- Validate configuration values against schemas
Basic Configuration Schema¶
First, let's define a configuration schema using Pydantic models:
Python
from kubiya_sdk import kubiya
from kubiya_sdk.tools.decorators import config_model
from pydantic import BaseModel, Field
@config_model(name="api_credentials", description="API connection credentials")
class APICredentials(BaseModel):
"""Configuration for API access"""
api_key: str = Field(..., description="API key for authentication")
base_url: str = Field(..., description="Base URL for the API")
timeout: int = Field(30, description="Request timeout in seconds")
max_retries: int = Field(3, description="Maximum number of retry attempts")
Using Configuration in a Tool¶
Now, let's create a tool that uses this configuration:
Python
@kubiya.tool(
description="Fetch data from an external API",
required_configs=["api_credentials"]
)
def fetch_api_data(endpoint: str, params: dict = None):
"""
Fetch data from an external API using stored credentials
Args:
endpoint: API endpoint path
params: Optional query parameters
Returns:
API response data
"""
import requests
import os
from time import sleep
# Get configuration values
config = os.environ.get("KUBIYA_CONFIG_api_credentials")
if not config:
raise ValueError("API credentials configuration is required")
import json
config = json.loads(config)
# Extract configuration values
api_key = config["api_key"]
base_url = config["base_url"]
timeout = config.get("timeout", 30)
max_retries = config.get("max_retries", 3)
# Build the request URL
url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"
# Set up headers with authentication
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# Make the request with retry logic
for attempt in range(max_retries):
try:
response = requests.get(
url,
headers=headers,
params=params or {},
timeout=timeout
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt < max_retries - 1:
# Exponential backoff
sleep_time = 2 ** attempt
sleep(sleep_time)
continue
return {
"error": str(e),
"status_code": getattr(e.response, "status_code", None)
}
Alternative Dictionary-Based Configuration¶
You can also define configuration schemas using dictionaries:
Python
from kubiya_sdk.tools.decorators import config_dict
config_dict(
name="database_connection",
config_dict={
"host": {
"type": "string",
"description": "Database server hostname",
"required": True
},
"port": {
"type": "integer",
"description": "Database server port",
"default": 5432
},
"username": {
"type": "string",
"description": "Database username",
"required": True
},
"password": {
"type": "string",
"description": "Database password",
"required": True,
"secret": True
},
"database": {
"type": "string",
"description": "Database name",
"required": True
}
},
description="PostgreSQL database connection settings"
)
Using a Database Tool with Configuration¶
Here's a tool that uses the database configuration:
Python
@kubiya.tool(
description="Query a PostgreSQL database",
requirements=["psycopg2-binary"],
required_configs=["database_connection"]
)
def query_database(sql_query: str):
"""
Execute a SQL query against a PostgreSQL database
Args:
sql_query: SQL query to execute
Returns:
Query results
"""
import os
import json
import psycopg2
import psycopg2.extras
# Get configuration from environment
config = json.loads(os.environ.get("KUBIYA_CONFIG_database_connection"))
# Extract configuration values
host = config["host"]
port = config.get("port", 5432)
username = config["username"]
password = config["password"]
database = config["database"]
# Connect to the database
conn = psycopg2.connect(
host=host,
port=port,
user=username,
password=password,
dbname=database
)
try:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute(sql_query)
# Check if this is a SELECT query
if cursor.description is not None:
results = cursor.fetchall()
return list(results)
else:
# For non-SELECT queries (INSERT, UPDATE, DELETE)
conn.commit()
return {
"affected_rows": cursor.rowcount,
"status": "success"
}
except Exception as e:
return {
"error": str(e),
"status": "error"
}
finally:
conn.close()
Using Multiple Configurations¶
Tools can require multiple configurations:
Python
@kubiya.tool(
description="Send product data to an external API",
required_configs=["database_connection", "api_credentials"]
)
def sync_products_to_api():
"""
Fetch product data from the database and send it to an external API
Returns:
Synchronization status
"""
import os
import json
import psycopg2
import psycopg2.extras
import requests
# Load configurations
db_config = json.loads(os.environ.get("KUBIYA_CONFIG_database_connection"))
api_config = json.loads(os.environ.get("KUBIYA_CONFIG_api_credentials"))
# Connect to database
conn = psycopg2.connect(
host=db_config["host"],
port=db_config.get("port", 5432),
user=db_config["username"],
password=db_config["password"],
dbname=db_config["database"]
)
try:
# Fetch products from the database
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
cursor.execute("SELECT id, name, price, stock FROM products WHERE active = TRUE")
products = cursor.fetchall()
# Prepare API request
api_url = f"{api_config['base_url'].rstrip('/')}/products/sync"
headers = {
"Authorization": f"Bearer {api_config['api_key']}",
"Content-Type": "application/json"
}
# Send products to the API
response = requests.post(
api_url,
headers=headers,
json={"products": products},
timeout=api_config.get("timeout", 30)
)
response.raise_for_status()
return {
"products_synced": len(products),
"api_response": response.json(),
"status": "success"
}
except Exception as e:
return {
"error": str(e),
"status": "error"
}
finally:
conn.close()
Manual Configuration with @with_config Decorator¶
For more control, you can use the @with_config decorator to manually handle configurations:
Python
from kubiya_sdk.tools.decorators import config_model, with_config
from pydantic import BaseModel, Field
@config_model(name="aws_credentials")
class AWSCredentials(BaseModel):
"""AWS credentials configuration"""
access_key: str = Field(..., description="AWS Access Key ID")
secret_key: str = Field(..., description="AWS Secret Access Key")
region: str = Field("us-east-1", description="AWS Region")
@with_config("aws_credentials")
def list_s3_buckets(config):
"""
List all S3 buckets for the AWS account
Note: This function is not a Kubiya tool by itself,
but can be used within a tool
"""
import boto3
# Create an S3 client using the config
s3 = boto3.client(
's3',
aws_access_key_id=config["access_key"],
aws_secret_access_key=config["secret_key"],
region_name=config["region"]
)
# List all buckets
response = s3.list_buckets()
return [bucket['Name'] for bucket in response['Buckets']]
@kubiya.tool(description="Manage S3 buckets")
def manage_s3_buckets(action: str = "list", bucket_name: str = None):
"""
Manage S3 buckets
Args:
action: Action to perform (list, create, delete)
bucket_name: Name of the bucket (for create/delete actions)
Returns:
Action result
"""
# Use the function with the configuration
if action == "list":
buckets = list_s3_buckets()
return {"buckets": buckets}
elif action == "create" and bucket_name:
# Implementation omitted for brevity
return {"status": "created", "bucket": bucket_name}
elif action == "delete" and bucket_name:
# Implementation omitted for brevity
return {"status": "deleted", "bucket": bucket_name}
else:
return {"error": "Invalid action or missing bucket name"}
Working with Configuration in Workflows¶
You can use tools with dynamic configurations in workflows:
Python
from kubiya_sdk.workflows.workflow import Workflow, WorkflowNode
from kubiya_sdk.workflows.node_types import NodeType
# Create a workflow that uses tools with configurations
data_pipeline_workflow = Workflow(
name="Data Pipeline",
description="Extract data from the database and send it to the API",
nodes=[
WorkflowNode(
name="extract_data",
description="Extract data from the database",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "query_database",
"input_mapping": {
"sql_query": "SELECT * FROM customers WHERE updated_at > NOW() - INTERVAL '1 day'"
}
}
),
WorkflowNode(
name="send_to_api",
description="Send data to the API",
node_type=NodeType.TOOL,
tool_config={
"tool_name": "fetch_api_data",
"input_mapping": {
"endpoint": "customers/sync",
"params": {
"data": "$extract_data.result"
}
}
},
depends_on=["extract_data"]
)
]
)
# Execute the workflow
result = data_pipeline_workflow.execute()
Conclusion¶
Dynamic configurations provide a powerful way to manage settings and credentials for your tools. They help with:
- Reusability: Define configurations once and use them across multiple tools
- Security: Keep sensitive information like API keys and passwords separate from your code
- Validation: Ensure that configurations contain the required fields and correct data types
- Flexibility: Easily update configurations without changing tool code