Skip to content

插件系统

了解和开发 ai-fixer 的插件系统。

插件概述

ai-fixer 使用插件系统扩展诊断和修复能力。插件分为两类:

  • 诊断插件(Diagnostic):只读操作,用于收集信息
  • 修复插件(Remediation):写操作,会修改系统状态

内置插件

Kubernetes 插件

插件类型说明
list_podsdiagnostic列出 Pod
describe_poddiagnostic查看 Pod 详情
get_pod_logsdiagnostic获取 Pod 日志
get_eventsdiagnostic获取 K8s 事件
top_podsdiagnostic查看 Pod 资源使用
restart_podremediation重启 Pod
scale_deploymentremediation扩缩容
rollback_deploymentremediation回滚部署
cordon_noderemediation节点维护
delete_evicted_podsremediation清理驱逐 Pod

数据库插件

插件类型说明
slow_queriesdiagnostic查询慢查询
lock_waitsdiagnostic查询锁等待
active_connectionsdiagnostic查询活跃连接
replication_lagdiagnostic查询复制延迟
table_bloatdiagnostic查询表膨胀
kill_deadlockremediation终止死锁进程
vacuum_tableremediation清理表
terminate_queryremediation终止查询

监控插件

插件类型说明
prom.querydiagnosticPrometheus 即时查询
prom.query_rangediagnosticPrometheus 范围查询
loki.querydiagnosticLoki 日志查询
sentry.get_issuediagnosticSentry 问题查询

其他插件

插件类型说明
shell.execdiagnostic执行 Shell 命令(只读)
shell.exec_writeremediation执行 Shell 命令(写操作)
runbook.searchdiagnostic搜索 Runbook
llm.kubectl_actiondiagnosticLLM 生成 kubectl 命令

插件开发

1. 创建插件文件

app/plugins/builtin/custom_plugins/ 目录下创建新文件:

python
# app/plugins/builtin/my_plugin.py

from app.plugins.base import Plugin, PluginSpec, register
from app.plugins.registry import global_registry

@register(global_registry)
class MyPlugin(Plugin):
    """我的自定义插件"""

    @property
    def spec(self) -> PluginSpec:
        return PluginSpec(
            name="my_plugin",
            description="查询自定义服务状态",
            category="diagnostic",  # 或 "remediation"
            parameters={
                "type": "object",
                "properties": {
                    "service_name": {
                        "type": "string",
                        "description": "服务名称",
                    },
                    "namespace": {
                        "type": "string",
                        "description": "命名空间",
                        "default": "default",
                    },
                },
                "required": ["service_name"],
            },
        )

    async def execute(self, service_name: str, namespace: str = "default") -> dict:
        """
        执行插件逻辑

        Args:
            service_name: 服务名称
            namespace: 命名空间

        Returns:
            dict: 执行结果
        """
        # 在这里实现你的逻辑
        # 例如:查询服务状态、调用 API 等

        return {
            "status": "running",
            "replicas": 3,
            "ready_replicas": 3,
            "message": f"Service {service_name} in {namespace} is healthy",
        }

2. 插件规范

PluginSpec 字段

python
PluginSpec(
    name="plugin_name",           # 插件名称(唯一)
    description="插件描述",        # 简短描述
    category="diagnostic",        # 类型:diagnostic 或 remediation
    parameters={                  # JSON Schema 定义参数
        "type": "object",
        "properties": {
            "param1": {
                "type": "string",
                "description": "参数说明",
            },
        },
        "required": ["param1"],
    },
)

execute 方法

python
async def execute(self, **kwargs) -> dict:
    """
    执行插件逻辑

    Args:
        **kwargs: 根据 parameters 定义的参数

    Returns:
        dict: 执行结果,建议包含以下字段:
            - success: bool - 是否成功
            - data: Any - 返回数据
            - message: str - 结果描述
            - error: str - 错误信息(如果失败)
    """
    try:
        # 执行逻辑
        result = await some_operation()
        return {
            "success": True,
            "data": result,
            "message": "操作成功",
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "message": f"操作失败: {e}",
        }

3. 使用装饰器注册

python
from app.plugins.base import Plugin, PluginSpec, register
from app.plugins.registry import global_registry

@register(global_registry)
class MyPlugin(Plugin):
    # ...

4. 测试插件

python
import pytest
from app.plugins.registry import global_registry

def test_my_plugin():
    # 获取插件
    plugin = global_registry.get("my_plugin")
    assert plugin is not None

    # 测试执行
    result = await plugin.execute(
        service_name="user-service",
        namespace="production"
    )
    assert result["success"] is True

自定义插件目录

将自定义插件放入 custom_plugins/ 目录即可自动加载:

ai-fixer/
├── app/
│   └── plugins/
│       └── builtin/      # 内置插件
├── custom_plugins/        # 自定义插件(自动加载)
│   ├── __init__.py
│   ├── my_plugin1.py
│   └── my_plugin2.py

命名建议

自定义插件建议使用 custom. 前缀:

python
@register(global_registry)
class CustomMyPlugin(Plugin):
    @property
    def spec(self) -> PluginSpec:
        return PluginSpec(
            name="custom.my_plugin",
            # ...
        )

插件最佳实践

1. 错误处理

python
async def execute(self, **kwargs) -> dict:
    try:
        # 验证参数
        if not kwargs.get("service_name"):
            return {
                "success": False,
                "error": "Missing required parameter: service_name",
            }

        # 执行逻辑
        result = await self._do_something(kwargs)
        return {"success": True, "data": result}

    except ConnectionError as e:
        return {
            "success": False,
            "error": f"Connection failed: {e}",
            "retry": True,  # 可重试
        }
    except Exception as e:
        logger.exception("Plugin execution failed")
        return {
            "success": False,
            "error": str(e),
        }

2. 超时控制

python
import asyncio

async def execute(self, **kwargs) -> dict:
    try:
        result = await asyncio.wait_for(
            self._long_running_operation(),
            timeout=30.0
        )
        return {"success": True, "data": result}
    except asyncio.TimeoutError:
        return {
            "success": False,
            "error": "Operation timed out",
        }

3. 日志记录

python
import structlog

logger = structlog.get_logger()

async def execute(self, **kwargs) -> dict:
    logger.info("plugin.execute.start", plugin="my_plugin", **kwargs)

    try:
        result = await self._do_something(kwargs)
        logger.info("plugin.execute.success", plugin="my_plugin", result=result)
        return {"success": True, "data": result}
    except Exception as e:
        logger.error("plugin.execute.failed", plugin="my_plugin", error=str(e))
        return {"success": False, "error": str(e)}

4. 资源清理

python
async def execute(self, **kwargs) -> dict:
    client = None
    try:
        client = await create_client()
        result = await client.fetch_data()
        return {"success": True, "data": result}
    finally:
        if client:
            await client.close()

插件配置

部分插件支持通过配置调整行为:

环境变量配置

python
import os

async def execute(self, **kwargs) -> dict:
    api_url = os.getenv("MY_PLUGIN_API_URL", "http://default-url")
    api_key = os.getenv("MY_PLUGIN_API_KEY")

    if not api_key:
        return {"success": False, "error": "API key not configured"}

    # ...

动态配置

python
from app.config.dynamic import DynamicConfig

async def execute(self, **kwargs) -> dict:
    config = DynamicConfig()
    timeout = await config.get("my_plugin.timeout", default=30)

    # ...

故障排查

插件未加载

bash
# 检查插件是否注册
curl http://localhost:8080/api/plugins | jq '.[] | select(.name == "my_plugin")'

# 查看日志
docker-compose logs app | grep -i "plugin"

插件执行失败

bash
# 查看 Incident 详情
curl http://localhost:8080/api/incidents/{id} | jq '.events[] | select(.type == "tool_call")'

下一步

基于 MIT 许可发布