对巡风vulscan的理解
>
对巡风vulscan的理解

# coding:utf-8

# 漏洞检测引擎

import urllib2

import thread

import time

import pymongo

import sys

import datetime

import hashlib

import json

import re

import uuid

import os

from kunpeng import kunpeng

sys.path.append(sys.path[0] + '/vuldb')

# 加载漏洞插件的目录

sys.path.append(sys.path[0] + "/../")

from config import ProductionConfig

db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT)

na_db = getattr(db_conn, ProductionConfig.DBNAME)

na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD)

na_task = na_db.Task

na_result = na_db.Result

na_plugin = na_db.Plugin

na_config = na_db.Config

na_heart = na_db.Heartbeat

na_update = na_db.Update

lock = thread.allocate()

PASSWORD_DIC = []

THREAD_COUNT = 50

TIMEOUT = 10

PLUGIN_DB = {}

TASK_DATE_DIC = {}

WHITE_LIST = []

kp = kunpeng()

#巡风的漏洞扫描技术要先看初始化的方法,看完初始化的方法,再从main函数开始看,这样容易理解,分析完main函数之后,再从start函数慢慢看

class vulscan():

# 初始化操作

def __init__(self, task_id, task_netloc, task_plugin):

self.task_id = task_id #任务id

self.task_netloc = task_netloc #任务端口

self.task_plugin = task_plugin #任务插件

self.result_info = '' #任务结果

self.start() #任务开始

# start ---> 开始检测

def start(self):

self.get_plugin_info()

#获取插件列表,这个在80多行有个get_plugin_info方法,主要就是info = xxx。

#这个是获取.json格式的漏洞库,主要用来是探测,并不能起到exp的作用

if '.json' in self.plugin_info['filename']: # 标示符检测模式

self.load_json_plugin() # 读取漏洞标示

# 跟踪load_json_plugin()函数,读取的时候,是字符串。所以需要转换成json的形式。

self.set_request() # 标示符转换为请求

# 这个就是发送request的请求了

self.poc_check() # 检测

# 进行poc验证,这个地方需要是check(),里面有py,md5,json三种格式的请求,所以要分别的验证一下

# 如果时kunpeng的脚本开始验证,分为两种一种是web端的http,https,一种是除了web端。没咋懂,就这样吧,有空再看

elif 'KP-' in self.plugin_info['filename']:

self.log(str(self.task_netloc) + 'call kunpeng - ' + self.plugin_info['filename'])

kp.set_config(TIMEOUT, PASSWORD_DIC)

if self.task_netloc[1] != 80:

self.result_info = kp.check('service', '{}:{}'.format(

self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename'])

if not self.result_info:

scheme = 'http'

if self.task_netloc[1] == 443:

scheme = 'https'

self.result_info = kp.check('web', '{}://{}:{}'.format(

scheme, self.task_netloc[0], self.task_netloc[1]), self.plugin_info['filename'])

else: # 脚本检测模式,这个利用的py脚本的poc,还是比较有意思的,可能能说到说到的就是这一个了,这个和我们平常写的跑py脚本差不多。

plugin_filename = self.plugin_info['filename']

# log里面封装的是一个print方法

self.log(str(self.task_netloc) + 'call ' + self.task_plugin)

if task_plugin not in PLUGIN_DB:

plugin_res = __import__(plugin_filename)

setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 给插件声明密码字典

PLUGIN_DB[plugin_filename] = plugin_res

self.result_info = PLUGIN_DB[plugin_filename].check(

str(self.task_netloc[0]), int(self.task_netloc[1]), TIMEOUT)

self.save_request() # 保存结果

def get_plugin_info(self):

info = na_plugin.find_one({"name": self.task_plugin})

self.plugin_info = info

def load_json_plugin(self):

json_plugin = open(sys.path[0] + '/vuldb/' +

self.plugin_info['filename']).read()

self.plugin_info['plugin'] = json.loads(json_plugin)['plugin']

def set_request(self):

url = 'http://' + \

self.task_netloc[0] + ":" + \

str(self.task_netloc[1]) + self.plugin_info['plugin']['url']

if self.plugin_info['plugin']['method'] == 'GET':

request = urllib2.Request(url)

else:

request = urllib2.Request(url, self.plugin_info['plugin']['data'])

self.poc_request = request

def get_code(self, header, html):

try:

m = re.search(r'| |/)', html, flags=re.I)

if m:

return m.group(1).replace('"', '')

except:

pass

try:

if 'Content-Type' in header:

Content_Type = header['Content-Type']

m = re.search(r'.*?charset=(.*?)(;|$)',

Content_Type, flags=re.I)

if m:

return m.group(1)

except:

pass

def poc_check(self):

try:

res = urllib2.urlopen(self.poc_request, timeout=30)

res_html = res.read(204800)

header = res.headers

# res_code = res.code

except urllib2.HTTPError, e:

# res_code = e.code

header = e.headers

res_html = e.read(204800)

except Exception, e:

return

try:

html_code = self.get_code(header, res_html).strip()

if html_code and len(html_code) < 12:

res_html = res_html.decode(html_code).encode('utf-8')

except:

pass

an_type = self.plugin_info['plugin']['analyzing']

vul_tag = self.plugin_info['plugin']['tag']

analyzingdata = self.plugin_info['plugin']['analyzingdata']

if an_type == 'keyword':

# print poc['analyzingdata'].encode("utf-8")

if analyzingdata.encode("utf-8") in res_html:

self.result_info = vul_tag

elif an_type == 'regex':

if re.search(analyzingdata, res_html, re.I):

self.result_info = vul_tag

elif an_type == 'md5':

md5 = hashlib.md5()

md5.update(res_html)

if md5.hexdigest() == analyzingdata:

self.result_info = vul_tag

def save_request(self):

if self.result_info:

time_ = datetime.datetime.now()

self.log(str(self.task_netloc) + " " + self.result_info)

v_count = na_result.find(

{"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count()

if not v_count:

na_plugin.update({"name": self.task_plugin},

{"$inc": {'count': 1}})

vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'],

"vul_type": self.plugin_info['type']}

w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1],

"vul_info": vulinfo, "info": self.result_info, "time": time_,

"task_date": TASK_DATE_DIC[str(self.task_id)]}

na_result.insert(w_vul)

# self.wx_send(w_vul) # 自行定义漏洞提醒

def log(self, info):

lock.acquire()

try:

time_str = time.strftime('%X', time.localtime(time.time()))

print "[%s] %s" % (time_str, info)

except:

pass

lock.release()

def queue_get():

global TASK_DATE_DIC

task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={

"$set": {"status": 1}}, sort={'time': 1})

if task_req:

TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now()

return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']

else:

task_req_row = na_task.find({"plan": {"$ne": 0}})

if task_req_row:

for task_req in task_req_row:

if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']):

if task_req['isupdate'] == 1:

task_req['target'] = update_target(

json.loads(task_req['query']))

na_task.update({"_id": task_req['_id']}, {

"$set": {"target": task_req['target']}})

na_task.update({"_id": task_req['_id']}, {

"$inc": {"status": 1}})

TASK_DATE_DIC[str(task_req['_id'])

] = datetime.datetime.now()

return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin']

return '', '', '', ''

def update_target(query):

target_list = []

try:

result_list = na_db.Info.find(query)

for result in result_list:

target = [result["ip"], result["port"]]

target_list.append(target)

except:

pass

return target_list

def monitor():

global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST

while True:

queue_count = na_task.find({"status": 0, "plan": 0}).count()

if queue_count:

load = 1

else:

ac_count = thread._count()

load = float(ac_count - 6) / THREAD_COUNT

if load > 1:

load = 1

if load < 0:

load = 0

na_heart.update({"name": "load"}, {

"$set": {"value": load, "up_time": datetime.datetime.now()}})

PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config()

if load > 0:

time.sleep(8)

else:

time.sleep(60)

def get_config():

try:

config_info = na_config.find_one({"type": "vulscan"})

pass_row = config_info['config']['Password_dic']

thread_row = config_info['config']['Thread']

timeout_row = config_info['config']['Timeout']

white_row = config_info['config']['White_list']

password_dic = pass_row['value'].split('\n')

thread_count = int(thread_row['value'])

timeout = int(timeout_row['value'])

white_list = white_row['value'].split('\n')

return password_dic, thread_count, timeout, white_list

except Exception, e:

print e

def install_kunpeng_plugin():

time_ = datetime.datetime.now()

for plugin in kp.get_plugin_list():

level_list = ['紧急','高危','中危','低危','提示']

plugin_info = {

'_id': plugin['references']['kpid'],

'name': 'Kunpeng -' + plugin['name'],

'info': plugin['remarks'] + ' ' + plugin['references']['cve'],

'level': level_list[int(plugin['level'])],

'type': plugin['type'],

'author': plugin['author'],

'url': plugin['references']['url'],

'source': 1,

'keyword': '',

'add_time': time_,

'filename': plugin['references']['kpid'],

'count': 0

}

na_plugin.insert(plugin_info)

def init():

time_ = datetime.datetime.now()

if na_plugin.find().count() >= 1:

return

script_plugin = []

json_plugin = []

print 'init plugins'

file_list = os.listdir(sys.path[0] + '/vuldb')

for filename in file_list:

try:

if filename.split('.')[1] == 'py':

script_plugin.append(filename.split('.')[0])

if filename.split('.')[1] == 'json':

json_plugin.append(filename)

except:

pass

for plugin_name in script_plugin:

try:

res_tmp = __import__(plugin_name)

plugin_info = res_tmp.get_plugin_info()

plugin_info['add_time'] = time_

plugin_info['filename'] = plugin_name

plugin_info['count'] = 0

na_plugin.insert(plugin_info)

except:

pass

for plugin_name in json_plugin:

try:

json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read()

plugin_info = json.loads(json_text)

plugin_info['add_time'] = time_

plugin_info['filename'] = plugin_name

plugin_info['count'] = 0

del plugin_info['plugin']

na_plugin.insert(plugin_info)

except:

pass

install_kunpeng_plugin()

def kp_check():

while True:

try:

new_release = kp.check_version()

print new_release

if new_release:

info = new_release['body']

if '###' in new_release['body']:

info = new_release['body'].split('###')[1]

row = {

'info': info,

'isInstall': 0,

'name': new_release['name'],

'author': new_release['author']['login'],

'pushtime': new_release['published_at'],

'location': "",

'unicode': new_release['tag_name'],

'coverage': 0,

'source': 'kunpeng'

}

na_update.insert(row)

time.sleep(60 * 60 * 48)

except Exception as e:

print e

time.sleep(60 * 30)

def kp_update():

while True:

try:

row = na_update.find_one_and_delete(

{'source': 'kunpeng', 'isInstall': 1})

if row:

kp.update_version(row['unicode'])

na_plugin.delete_many({'_id':re.compile('^KP')})

install_kunpeng_plugin()

except Exception as e:

print e

time.sleep(10)

if __name__ == '__main__':

init()

PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() #从数据库里面获得配置参数,比如白名单,引擎的线程

thread.start_new_thread(monitor, ()) #巡风的心跳线程

thread.start_new_thread(kp_check, ()) #巡风的Kunpeng库,检查更新去情况,跟踪kp_check-->check_version-->_get_release_lates,这个函数发现的是与云上的库进行对比,如果有更新的话,就提示更新

# 如果没有更新的话,就不提示了

thread.start_new_thread(kp_update, ()) #kunpeng库更新,上面那个只是推送出来,但是还没有更新,这个是巡风更新的线程,就是如果你把kunpeng库确定更新了之后,然后才能是执行这个线程

while True:

try:

task_id, task_plan, task_target, task_plugin = queue_get() #获取队列,简单来说就是获得任务的参数

#如果status : 0 ,这个是未执行的状态,如果是status : 1就是正在执行的状态

if task_id == '':

time.sleep(10)

# 每个间隔10秒钟

continue

if PLUGIN_DB:

del sys.modules[PLUGIN_DB.keys()[0]] # 清理插件缓存,这个我也没太懂学长讲的怎么回事...

PLUGIN_DB.clear()

for task_netloc in task_target:

while True:

if int(thread._count()) < THREAD_COUNT:

#如果任务的线程数<50

if task_netloc[0] in WHITE_LIST:

#如果探测的资产是在白名单里面,那就break,结束 (^-^)

break

try:

thread.start_new_thread(

vulscan, (task_id, task_netloc, task_plugin))

except Exception as e:

print e

break

else:

time.sleep(2)

if task_plan == 0:

na_task.update({"_id": task_id}, {"$set": {"status": 2}})

except Exception as e:

print e

巡风如果说作为一个检测内网的手段,真的是非常好用。另外分析下巡风的扫描,也能学到不少东西,比如我之前以为扫描摄像头的时候,以为是扫描的数据流,其实扫描的路径,拼接路径就可以了。还有巡风打poc的地方,也可以学习学习。不过自我感觉,还是巡风的资产探测比较好用,主要的还是要把巡风的资产探测学好,巡风的资产探测还是比较好的

Shopping Cart