在现代DevSecOps流程中,将静态应用安全测试(SAST)工具集成到自动化流水线中至关重要。Semgrep是一个高性能、易于配置的SAST工具。本文将指导您如何利用Python Flask框架,将Semgrep命令行工具封装成一个可供内部服务调用的RESTful API,从而构建一个轻量级、自定义的代码安全扫描平台。
1. 为什么需要API封装?
直接在CI/CD管道中调用Semgrep CLI虽然可行,但将其封装为API有以下优势:
- 集中控制: 统一管理Semgrep配置和规则集。
- 松耦合集成: 任何服务(如Git Hook, Webhook Handler, CI Runner)都可以通过HTTP调用,无需安装Semgrep依赖。
- 自定义报告: 在API层可以对原始JSON输出进行解析、过滤和美化,以适应组织特定的报告格式。
2. 环境准备与依赖安装
我们只需要安装Flask和Semgrep CLI。
安装依赖:
1
2
3 pip install Flask gunicorn
# 安装 Semgrep CLI (推荐使用 pipx 或直接 pip)
pip install semgrep
3. Flask API 核心实现
我们的API将提供一个POST请求端点
1 | /scan |
,接收待扫描的代码内容(字符串格式),并返回Semgrep扫描结果。
为了安全地执行扫描,我们会将接收到的代码写入一个临时文件,然后调用
1 | subprocess |
运行
1 | semgrep |
。
1 | app.py |
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 import os
import json
import subprocess
import tempfile
from flask import Flask, request, jsonify
app = Flask(__name__)
# 配置 Semgrep 默认规则集
# 使用 'auto' 可以自动检测语言,并使用默认的社区规则集
SEMGREP_CONFIG = "auto"
@app.route('/scan', methods=['POST'])
def scan_code():
"""接收代码内容,写入临时文件,并调用 Semgrep 进行扫描。"""
data = request.get_json()
if not data or 'code' not in data:
return jsonify({"error": "Missing 'code' field in request body"}), 400
code_content = data['code']
# 1. 使用临时文件存储代码内容
# 使用 NamedTemporaryFile 确保文件在关闭后自动清理
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.py') as tmp_file:
tmp_file.write(code_content)
file_path = tmp_file.name
# 2. 构建 Semgrep CLI 命令
# --json: 确保输出为 JSON 格式
# --config: 指定使用的规则集
command = [
"semgrep",
"--config", SEMGREP_CONFIG,
"--json",
file_path
]
# 3. 执行 Semgrep 扫描
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True
)
# 4. 解析结果
raw_output = process.stdout
results = json.loads(raw_output)
# 5. 提取关键安全发现
findings = results.get('results', [])
# 6. 返回自定义格式的发现列表
response_findings = []
for finding in findings:
response_findings.append({
"check_id": finding['check_id'],
"severity": finding['extra']['severity'],
"message": finding['extra']['message'],
"start_line": finding['start']['line'],
"end_line": finding['end']['line'],
"path": finding['path']
})
return jsonify({
"status": "success",
"findings_count": len(response_findings),
"findings": response_findings
})
except subprocess.CalledProcessError as e:
# Semgrep 可能会在发现问题时返回非零退出码 (但这不是错误)
# 捕获其标准输出中的 JSON 结果(如果有)
try:
raw_output = e.stdout
results = json.loads(raw_output)
findings = results.get('results', [])
# ... (可以重复上述结果提取逻辑)
return jsonify({"status": "completed_with_findings", "message": "Scan found security issues.", "findings": findings})
except Exception:
# 如果 Semgrep 运行失败(例如配置错误)
return jsonify({"error": "Semgrep execution failed", "details": e.stderr}), 500
except Exception as e:
return jsonify({"error": "Internal server error", "details": str(e)}), 500
finally:
# 确保临时文件被清理
if 'file_path' in locals() and os.path.exists(file_path):
os.remove(file_path)
if __name__ == '__main__':
# 生产环境中推荐使用 Gunicorn 或 uWSGI
app.run(host='0.0.0.0', port=5000)
4. 测试API
假设我们想扫描一段包含潜在SQL注入漏洞的Python代码。
启动服务:
1 python app.py
发起请求 (使用
1 | curl |
):
我们将发送一段故意引入
1 | flask-sql-alchemy |
规则能捕捉到的风险代码:
1
2
3 curl -X POST http://127.0.0.1:5000/scan -H "Content-Type: application/json" -d '{
"code": "from flask_sqlalchemy import SQLAlchemy\n\ndb = SQLAlchemy()\n\n@app.route("/user/<name>")\ndef get_user(name):\n query = f"SELECT * FROM users WHERE username='{name}'"\n result = db.engine.execute(query)\n return result"
}'
预期输出:
API将返回一个结构化的JSON响应,包含Semgrep发现的漏洞信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14 {
"findings": [
{
"check_id": "python.lang.security.audit.flask-sql-alchemy.string-interpolation-in-query.string-interpolation-in-query",
"end_line": 6,
"message": "Detected string interpolation in SQL query, which can lead to SQL injection.",
"path": "/tmp/tmpXXXXXX.py",
"severity": "ERROR",
"start_line": 6
}
],
"findings_count": 1,
"status": "success"
}
5. 生产环境部署建议
在生产环境中,不应直接运行
1 | app.py |
中的
1 | app.run() |
。
- 容器化: 使用Docker打包Flask应用和Semgrep依赖。
- Web服务器: 使用Gunicorn或uWSGI来管理并发和稳定性。
- 安全性: 由于此API执行外部命令,务必确保输入(
1code
内容)不会被用于路径遍历或命令注入(通过将输入严格限制为内容字符串而非文件路径)。
使用 Gunicorn 启动示例:
1 gunicorn -w 4 'app:app' -b 0.0.0.0:5000
汤不热吧