欢迎光临
我们一直在努力

如何使用python flask包装semgrep构建自定义的代码安全扫描平台?

在现代DevSecOps流程中,将静态应用安全测试(SAST)工具集成到自动化流水线中至关重要。Semgrep是一个高性能、易于配置的SAST工具。本文将指导您如何利用Python Flask框架,将Semgrep命令行工具封装成一个可供内部服务调用的RESTful API,从而构建一个轻量级、自定义的代码安全扫描平台。

1. 为什么需要API封装?

直接在CI/CD管道中调用Semgrep CLI虽然可行,但将其封装为API有以下优势:

  1. 集中控制: 统一管理Semgrep配置和规则集。
  2. 松耦合集成: 任何服务(如Git Hook, Webhook Handler, CI Runner)都可以通过HTTP调用,无需安装Semgrep依赖。
  3. 自定义报告: 在API层可以对原始JSON输出进行解析、过滤和美化,以适应组织特定的报告格式。

2. 环境准备与依赖安装

我们只需要安装Flask和Semgrep CLI。

安装依赖:


1
2
3
pip install Flask gunicorn
# 安装 Semgrep CLI (推荐使用 pipx 或直接 pip)
pip install semgrep

3. Flask API 核心实现

我们的API将提供一个POST请求端点

1
/scan

,接收待扫描的代码内容(字符串格式),并返回Semgrep扫描结果。

为了安全地执行扫描,我们会将接收到的代码写入一个临时文件,然后调用

1
subprocess

运行

1
semgrep

1
app.py

:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import os
import json
import subprocess
import tempfile
from flask import Flask, request, jsonify

app = Flask(__name__)

# 配置 Semgrep 默认规则集
# 使用 'auto' 可以自动检测语言,并使用默认的社区规则集
SEMGREP_CONFIG = "auto"

@app.route('/scan', methods=['POST'])
def scan_code():
    """接收代码内容,写入临时文件,并调用 Semgrep 进行扫描。"""
    data = request.get_json()
    if not data or 'code' not in data:
        return jsonify({"error": "Missing 'code' field in request body"}), 400

    code_content = data['code']

    # 1. 使用临时文件存储代码内容
    # 使用 NamedTemporaryFile 确保文件在关闭后自动清理
    try:
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as tmp_file:
            tmp_file.write(code_content)
            file_path = tmp_file.name

        # 2. 构建 Semgrep CLI 命令
        # --json: 确保输出为 JSON 格式
        # --config: 指定使用的规则集
        command = [
            "semgrep",
            "--config", SEMGREP_CONFIG,
            "--json",
            file_path
        ]

        # 3. 执行 Semgrep 扫描
        process = subprocess.run(
            command,
            capture_output=True,
            text=True,
            check=True
        )

        # 4. 解析结果
        raw_output = process.stdout
        results = json.loads(raw_output)

        # 5. 提取关键安全发现
        findings = results.get('results', [])

        # 6. 返回自定义格式的发现列表
        response_findings = []
        for finding in findings:
            response_findings.append({
                "check_id": finding['check_id'],
                "severity": finding['extra']['severity'],
                "message": finding['extra']['message'],
                "start_line": finding['start']['line'],
                "end_line": finding['end']['line'],
                "path": finding['path']
            })

        return jsonify({
            "status": "success",
            "findings_count": len(response_findings),
            "findings": response_findings
        })

    except subprocess.CalledProcessError as e:
        # Semgrep 可能会在发现问题时返回非零退出码 (但这不是错误)
        # 捕获其标准输出中的 JSON 结果(如果有)
        try:
            raw_output = e.stdout
            results = json.loads(raw_output)
            findings = results.get('results', [])
            # ... (可以重复上述结果提取逻辑)
            return jsonify({"status": "completed_with_findings", "message": "Scan found security issues.", "findings": findings})
        except Exception:
             # 如果 Semgrep 运行失败(例如配置错误)
            return jsonify({"error": "Semgrep execution failed", "details": e.stderr}), 500

    except Exception as e:
        return jsonify({"error": "Internal server error", "details": str(e)}), 500

    finally:
        # 确保临时文件被清理
        if 'file_path' in locals() and os.path.exists(file_path):
            os.remove(file_path)

if __name__ == '__main__':
    # 生产环境中推荐使用 Gunicorn 或 uWSGI
    app.run(host='0.0.0.0', port=5000)

4. 测试API

假设我们想扫描一段包含潜在SQL注入漏洞的Python代码。

启动服务:


1
python app.py

发起请求 (使用

1
curl

):

我们将发送一段故意引入

1
flask-sql-alchemy

规则能捕捉到的风险代码:


1
2
3
curl -X POST http://127.0.0.1:5000/scan -H "Content-Type: application/json" -d '{
    "code": "from flask_sqlalchemy import SQLAlchemy\n\ndb = SQLAlchemy()\n\n@app.route("/user/<name>")\ndef get_user(name):\n    query = f"SELECT * FROM users WHERE username='{name}'"\n    result = db.engine.execute(query)\n    return result"
}'

预期输出:

API将返回一个结构化的JSON响应,包含Semgrep发现的漏洞信息:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
    "findings": [
        {
            "check_id": "python.lang.security.audit.flask-sql-alchemy.string-interpolation-in-query.string-interpolation-in-query",
            "end_line": 6,
            "message": "Detected string interpolation in SQL query, which can lead to SQL injection.",
            "path": "/tmp/tmpXXXXXX.py",
            "severity": "ERROR",
            "start_line": 6
        }
    ],
    "findings_count": 1,
    "status": "success"
}

5. 生产环境部署建议

在生产环境中,不应直接运行

1
app.py

中的

1
app.run()

  1. 容器化: 使用Docker打包Flask应用和Semgrep依赖。
  2. Web服务器: 使用Gunicorn或uWSGI来管理并发和稳定性。
  3. 安全性: 由于此API执行外部命令,务必确保输入(
    1
    code

    内容)不会被用于路径遍历或命令注入(通过将输入严格限制为内容字符串而非文件路径)。

使用 Gunicorn 启动示例:


1
gunicorn -w 4 'app:app' -b 0.0.0.0:5000
【本站文章皆为原创,未经允许不得转载】:汤不热吧 » 如何使用python flask包装semgrep构建自定义的代码安全扫描平台?
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址