Workflow API¶
Introduction¶
The Datus Agent Workflow API Service is a RESTful API that exposes the power of Datus Agent's natural language to SQL capabilities through HTTP endpoints. This service enables applications to integrate intelligent SQL generation, execution, and workflow management into their systems.
Quick Start¶
Starting the Service¶
# Start the API service
python -m datus.api.server --host 0.0.0.0 --port 8000
# Start with multiple workers
python -m datus.api.server --workers 4 --port 8000
# Start in daemon mode (background)
python -m datus.api.server --daemon --port 8000
Authentication¶
OAuth2 Client Credentials Flow¶
The API uses OAuth2 client credentials authentication:
1. Get Authentication Token¶
curl -X POST "http://localhost:8000/auth/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "client_id=your_client_id&client_secret=your_client_secret&grant_type=client_credentials"
2. Use Token in Requests¶
curl -X POST "http://localhost:8000/workflows/run" \
-H "Authorization: Bearer your_jwt_token" \
-H "Content-Type: application/json" \
-d '{"workflow": "fixed", "namespace": "your_db", "task": "Show me users"}'
Configuration¶
Create auth_clients.yml
to configure clients:
clients:
your_client_id: your_client_secret
another_client: another_secret
jwt:
secret_key: your-jwt-secret-key-change-in-production
algorithm: HS256
expiration_hours: 2
API Endpoints¶
Authentication¶
POST /auth/token¶
Obtain JWT access token.
Request:
POST /auth/token
Content-Type: application/x-www-form-urlencoded
client_id=your_client_id&client_secret=your_client_secret&grant_type=client_credentials
Response:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "Bearer",
"expires_in": 7200
}
Workflow Execution¶
POST /workflows/run¶
Execute a workflow to convert natural language to SQL.
Common Request Parameters:
Parameter | Type | Required | Description |
---|---|---|---|
workflow |
string | ✅ | Workflow name (nl2sql, reflection, fixed, metric_to_sql) |
namespace |
string | ✅ | Database namespace |
task |
string | ✅ | Natural language task description |
mode |
string | ✅ | Execution mode (sync or async) |
task_id |
string | ❌ | Custom task ID for idempotency |
catalog_name |
string | ❌ | Database catalog |
database_name |
string | ❌ | Database name |
schema_name |
string | ❌ | Schema name |
current_date |
string | ❌ | Reference date for time expressions |
domain |
string | ❌ | Business domain |
layer1 |
string | ❌ | Business layer 1 |
layer2 |
string | ❌ | Business layer 2 |
ext_knowledge |
string | ❌ | Additional business context |
Synchronous Mode (mode: "sync")¶
Request Headers:
Request Body:
{
"workflow": "nl2sql",
"namespace": "your_database_namespace",
"task": "Show me monthly revenue by product category",
"mode": "sync",
"catalog_name": "your_catalog",
"database_name": "your_database"
}
Response:
{
"task_id": "client_20240115143000",
"status": "completed",
"workflow": "nl2sql",
"sql": "SELECT DATE_TRUNC('month', order_date) as month, product_category, SUM(amount) as revenue FROM orders WHERE order_date >= '2023-01-01' GROUP BY month, product_category ORDER BY month, revenue DESC",
"result": [
{
"month": "2023-01-01",
"product_category": "Electronics",
"revenue": 150000.00
},
{
"month": "2023-01-01",
"product_category": "Clothing",
"revenue": 85000.00
}
],
"metadata": {
"execution_time": 12.5,
"nodes_executed": 5,
"reflection_rounds": 0
},
"error": null,
"execution_time": 12.5
}
Asynchronous Mode (mode: "async")¶
Request Headers:
Authorization: Bearer your_jwt_token
Content-Type: application/json
Accept: text/event-stream
Cache-Control: no-cache
Request Body:
{
"workflow": "nl2sql",
"namespace": "your_database_namespace",
"task": "Show me monthly revenue by product category",
"mode": "async",
"catalog_name": "your_catalog",
"database_name": "your_database"
}
Response (Server-Sent Events stream):
Content-Type: text/event-stream
event: started
data: {"task_id": "client_20240115143000", "workflow": "nl2sql"}
event: progress
data: {"message": "Initializing workflow", "progress": 10}
event: node_progress
data: {"node": "schema_linking", "status": "processing", "progress": 25}
event: node_detail
data: {"node": "schema_linking", "description": "Analyzing user query and finding relevant tables", "details": {"tables_found": ["orders", "products"]}}
event: sql_generated
data: {"sql": "SELECT DATE_TRUNC('month', order_date) as month, product_category, SUM(amount) as revenue FROM orders GROUP BY month, product_category"}
event: execution_complete
data: {"status": "success", "rows_affected": 24, "execution_time": 2.1}
event: output_ready
data: {"result": [...], "metadata": {...}}
event: done
data: {"task_id": "client_20240115143000", "status": "completed", "total_time": 15.2}
Feedback Submission¶
POST /workflows/feedback¶
Submit feedback on workflow execution quality.
Request:
Response:
Workflow Types¶
reflection¶
Intelligent, self-improving SQL generation:
- Includes reflection for error correction
- Can adapt and retry queries
- Best for complex or uncertain queries
fixed¶
Deterministic SQL generation:
- Predictable execution path
- No adaptive behavior
- Best for well-understood queries
metric_to_sql¶
Generate SQL from business metrics:
- Leverages predefined business metrics
- Includes date parsing for temporal queries
- Best for standardized business intelligence
Configuration¶
Server Configuration¶
Server Parameters¶
Parameter | Description | Default |
---|---|---|
--host |
Server host address | 127.0.0.1 |
--port |
Server port | 8000 |
--workers |
Number of worker processes | 1 |
--reload |
Auto-reload on code changes | False |
--debug |
Enable debug mode | False |
--daemon |
Run in background | False |
Best Practices¶
Security¶
- Use strong, unique JWT secret keys in production
- Rotate client credentials regularly
- Implement rate limiting for production deployments
- Use HTTPS in production environments
Performance¶
- Use async mode for long-running queries
- Configure appropriate worker count based on expected load
- Monitor memory usage with multiple workers
- Implement client-side timeouts for sync requests
Error Handling¶
- Always check response status codes
- Implement retry logic for transient failures
- Handle streaming disconnections in async mode
- Log detailed error information for debugging
Conclusion¶
The Datus Agent Workflow API Service provides a powerful, flexible interface for integrating natural language to SQL capabilities into your applications. With support for multiple execution modes, real-time progress streaming, and comprehensive authentication, it enables developers to build intelligent data analysis applications.