流量分析:基于热加载的历史流量自动化分析
在之前的推文中已经简单介绍了流量分析功能以及工作流程,
在“让历史流量开口说话”之后,我们更希望它能自己“做总结”。
本篇文章将带你深入了解 Yakit 流量分析功能中的两个热加载入口-- analyzeHTTPFlow 与 onAnalyzeHTTPFlowFinish 并通过真实的安全分析场景展示它们的组合威力。
从“单流量分析”到“全局洞察”
Yakit 的流量分析功能,为安全研究者提供了一个强大的“历史流量再利用”平台。我们可以通过筛选条件、规则匹配、热加载代码等方式,对数据库中已存的流量进行二次挖掘。
在早期版本中,用户主要使用:
analyzeHTTPFlow = func(flow, extract) { ... }
它能逐条处理流量,实现染色、内容提取、敏感匹配等操作。
但在新版中,我们迎来了一个新的成员:
onAnalyzeHTTPFlowFinish = func(totalCount, matchedCount) { ... }
这个回调会在所有流量分析完毕后被调用,为“全局统计、报告生成、通知推送”打开了无限可能。
该回调有两个参数:totalCount 和 matchedCount
- totalCount:进行流量分析的流量总数。
- matchedCount:匹配到的流量总数。这里的匹配到的总数可以理解为【执行结果】中所有的个数,如下图为24。
在 MITM 规则匹配流量分析中,一个规则匹配一条流量,可能会有多条匹配数;例如一个匹配流量中手机号的规则,在匹配到一个流量的相应包包含多个手机号的时候,那么会有很多匹配数。在热加载中,每次调用一次 extract 函数,即为一次匹配。
热加载双引擎:协同工作机制
analyzeHTTPFlow:逐条分析,每个流量都会触发。onAnalyzeHTTPFlowFinish:全局回调,仅在分析任务结束时触发。
工作流程如下:
这种“双引擎”模式让你既能精确处理单条流量,又能在任务结束时做全局整合。
流量分析实战:让热加载更有“生命力”
下面展示几个常用、具有代表性的实战案例。
敏感信息提取 + 汇总导出
这个例子展示了如何批量检测历史流量中是否存在手机号、Token 等敏感数据,并输出报告。
m = sync.NewMutex()
phones = []
tokens = []
path = yakit.GetHomeTempDir()
f = file.OpenFile(str.PathJoin(path, "result.json"), file.O_CREATE|file.O_RDWR, 0o777)~
analyzeHTTPFlow = func(flow, extract) {
// analyzeHTTPFlow是并发的,因此需要加锁
m.Lock()
defer m.Unlock()
// 使用GetResponse()方法获得响应体
// 不要直接使用flow.Response,因为这样拿到是存在数据库的被转义后的数据
resp = flow.GetResponse()
req = flow.GetRequest()
phoneRegex = re.MustCompile(`\b1[3-9]\d{9}\b`)
tokenRegex = re.MustCompile(`(?i)Authorization:\s*Bearer\s+([A-Za-z0-9._-]+)`)
phoneMatches = phoneRegex.FindAllString(resp, -1)
if len(phoneMatches) > 0 {
flow.Red()
phones = append(phones, phoneMatches...)
for _,matched = range phoneMatches{
extract("手机号泄露", flow,matched)
}
}
tokenMatches = tokenRegex.FindAllString(req, -1)
if len(tokenMatches) > 0 {
flow.Orange()
tokens = append(tokens, tokenMatches...)
for _,matched = range tokenMatches{
extract("Token 泄露", flow,matched)
}
}
}
onAnalyzeHTTPFlowFinish = func(totalCount, matchedCount) {
// 导出为json数据
result = map[string]any{
"total_flows": totalCount,
"matched_flows": matchedCount,
"phones": phones,
"tokens": tokens,
}
raw = json.Marshal(result)~
f.Write(raw)~
// 关闭文件句柄
f.Close()
yakit.Info(sprintf("[分析完成] 共分析 %d 条流量,提取手机号 %d 个,Token %d 个",
totalCount, len(phones), len(tokens)))
}
解释:
这个热加载代码会搜集历史流量中 token 和手机号信息,并且将其以 json 格式写到文件中。有几个点值得注意:
- 锁的使用。因为 analyzeHTTPFlow 是并发执行的,为了保证手机号、token 数据被正确的收集,因此需要加入锁来维护状态。
- 避免直接拿到转义后的内容。请求体和响应体调用的是 flow 自带的
GetResponse和GetRequest方法,这两个方法才能拿到数据包的原内容。如果直接使用像 flow.Response 拿取响应体内容,那么会拿到被转义后的数据而非原数据。
HTTPFlow 在存入数据库的时候,其 Request、Response 内容会被转义后再存入数据库。
- 需要关闭文件句柄。在流量分析结束以后,需要在 onAnalyzeHTTPFlowFinish 里面调用
f.Close()以关闭文件句柄。
热加载运行后得到的结果如下:
并且在 Yakit 的 temp 目录下生成了 json 文件:
域名访问统计与异常检测
第二个例子是快速统计不同主机的访问次数,并标记出返回 500 的流量。
热加载代码如下:
hostStats = make(map[string]int)
m = sync.NewMutex()
analyzeHTTPFlow = func(flow, extract) {
m.Lock()
defer m.Unlock()
host = flow.Host
if hostStats.Has(host){
count = hostStats[host]
count ++
hostStats.Set(host,count )
}else{
hostStats.Set(host,1)
}
if flow.StatusCode >= 500 {
flow.Red()
extract("服务器异常", flow)
}
}
onAnalyzeHTTPFlowFinish = func(totalCount, matchedCount) {
yakit.Info("=== 主机访问统计 ===")
for host, cnt = range hostStats {
yakit.Info(sprintf("%s: %d 次", host, cnt))
}
}
解释:
这个热加载使用 map 收集主机访问的信息,最后通过 yakit.Info 打印收集到的信息:
智能标签分类分析
这个例子是自动识别并标注不同类型的请求(登录、上传、后台访问等)。
m = sync.NewMutex()
codeMap = make(map[int]int)
analyzeHTTPFlow = func(flow, extract) {
m.Lock()
defer m.Unlock()
request = flow.GetRequest()
if str.Contains(request, "/admin") {
flow.Blue()
// 添加标签
flow.AddTag("后台访问")
extract("后台访问", flow)
} else if str.Contains(request, "upload") {
flow.Purple()
flow.AddTag("上传接口")
extract("上传接口", flow)
} else if str.Contains(request, "login") {
flow.Green()
flow.AddTag("登录行为")
extract("登录行为", flow)
}
}
onAnalyzeHTTPFlowFinish = func(totalCount, matchedCount) {
yakit.Info(sprintf("完成分类标注,共识别 %d 条特殊流量", matchedCount))
}
解释:
这个热加载代码会根据请求体的内容来大致判断这个请求包的类型,然后调用flow.AddTag 方法来添加标签、调用 flow.Purple 等方法给流量染色。
添加标签的好处是后续可以很快筛选出感兴趣的流量,实现流量“行为可视化”。
如下图:
自动生成分析报告文件
热加载代码的价值正在于其能够灵活调用 YAK 代码,实现各种复杂功能,因此也能将流量信息导出为报告。
yakit.AutoInitYakit()
loglevel("info")
m = sync.NewMutex()
reportInstance = report.New()
reportInstance.From("http-anomaly-detector")
reportInstance.Title("HTTP 异常流量分析报告")
abnormalFlows=[]
// 进入分析阶段
analyzeHTTPFlow = func(flow, extract) {
m.Lock()
defer m.Unlock()
code = flow.StatusCode
if code >= 400 {
abnormalFlow = [flow.Url, flow.StatusCode, flow.Method]
abnormalFlows = append(abnormalFlows, abnormalFlow)
extract("异常流量", flow, sprintf("状态码 %v",code))
}
}
// 所有流量结束后生成报告
onAnalyzeHTTPFlowFinish = func(totalCount, matchedCount) {
abnormalPercent = 0.0
abnormalPercent = float64(matchedCount) /float(totalCount)
// 报告使用markdown写概述
reportInstance.Markdown(sprintf(`
# 异常流量检测报告
- 总流量数:%v
- 异常流量数:%v
- 异常比例:%.2f%%
`, totalCount,matchedCount, abnormalPercent))
// 报告使用饼图展示异常流量的比例
reportInstance.PieGraph(
{"name": "正常流量", "value": totalCount-matchedCount, "color": "#43ab42"},
{"name": "异常流量", "value": matchedCount, "color": "#da4943"},
)
reportInstance.Markdown(`
# 异常流量具体信息
`)
// 报告使用表格
reportInstance.Table(["URL","状态码","方法"],abnormalFlows...)
reportInstance.Save()
}
解释:
这里简单将状态码大于等于 400 的流量当作异常流量,并将异常流量信息整理成报告。其中:
- 每次检测 flow 的 statusCode 大于等于 400 的时候,就会认为流量是异常的并调用
extract函数。每次调用extract函数会让热加载 onAnalyzeHTTPFlowFinish 的参数matchedCount加 1。因此后续计算异常流量数目时,直接使用的是matchedCount。 - 使用
report.New()来生成报告,这样后续能在 Yakit 页面中直接查看报告具体内容。 - 报告支持多种格式,如 Markdown、饼图、表格等;其中这个例子使用了饼图来展示正常流量和异常流量的比例;使用表格来记录异常流量的具体数据。
- 最后在 onAnalyzeHTTPFlowFinish 阶段调用
reportInstance.Save()来保持报告。
生成的报告可以直接在【数据库】中的【报告】进行查看:
总结
通过今天的 Yakit 实战分享,我们看到:流量分析不再只是单纯的匹配工具。
通过analyzeHTTPFlow + onAnalyzeHTTPFlowFinish的组合,
我们可以实现以下三点:
- 自动化数据提取
- 全局统计汇总
- 报告生成与系统联动
让每一次分析都能成为一次“数据挖掘实验”。Yakit 的热加载功能,正在让流量分析真正“聪明”起来。
最后,今日份不只有 Yakit 的 “武林秘籍”,重头戏来了!
【叮咚,请查收你的节日祝福!】
请允许 YAK 团队对所有 YAKer 送上节日的祝福
Dear YAKer,程序员节日快乐!
本文首发于 Yak Project 公众号,阅读原文。
