是啊,世界那么残酷,无论你怎么反抗它,它都沉默无声地运转着
使用的字典为 lijiejie 的大字典
当然为了省事儿,支持自定义导入字典
使用相关库为:aiofiles,aiomultiprocess,asyncio,aiodns,aiohttp
通过查看源码发现好像不支持一个aiodns检测多个host,不像aiohttp那样
本来想尝试一下把aiodns改成向aiohttp支持多个请求,并且可以指定DNS服务器,但是实在太忙了….
注意事项:
支持批量导入子域名爆破,格式如下:
baidu.com
qq.com
jd.com
……..
然后通过检测存活性,判断状态码为200,301,302 保存
# -*- coding:utf-8 -*-
#__author__:langzi
#__blog__:www.langzi.fun
import asyncio
import aiodns
import aiomultiprocess
import aiohttp
from urllib.parse import urlparse
import multiprocessing
import os
import random
import time
Check_Alive_Status = [200,301,302]
# 这里可以进行设置状态码存活,符合状态码将会判断为网页存活
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'}
async def run(url):
# print('Scan:'+url)
async with asyncio.Semaphore(1000):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
try:
async with session.get('http://'+url,timeout=15) as resp:
if resp.status in Check_Alive_Status:
content = await resp.read()
#print(content)
if b'Service Unavailable' not in content and b'The requested URL was not found on' not in content and b'The server encountered an internal error or miscon' not in content:
u = urlparse(str(resp.url))
return u.scheme+'://'+u.netloc
except Exception as e:
#print(e)
pass
try:
async with session.get('https://' + url,timeout=15) as resp:
if resp.status in Check_Alive_Status:
content = await resp.read()
#print(content)
if b'Service Unavailable' not in content and b'The requested URL was not found on' not in content and b'The server encountered an internal error or miscon' not in content:
u = urlparse(str(resp.url))
return u.scheme+'://'+u.netloc
except Exception as e:
#print(e)
pass
async def Aio_Subdomain(subdomain):
resolver = aiodns.DNSResolver(timeout=1)
try:
result = await resolver.query(subdomain, 'A')
return subdomain, result
except Exception as e:
return None, None
async def submain(subhosts):
res = set()
async with aiomultiprocess.Pool() as pool:
results = await pool.map(Aio_Subdomain,subhosts)
for result in results:
subdomain, answers = result
if answers != None and subdomain!=None:
res.add(subdomain)
return list(res)
def get_result(inp,loop,dicdic):
dicts = list(set([subdoma.strip()+'.'+inp for subdoma in open(dicdic, 'r').readlines()]))
result = loop.run_until_complete(submain(dicts))
return result
async def main(urls):
async with aiomultiprocess.Pool() as pool:
result = await pool.map(run, urls)
return result
def Write_Database(domain,domain_lists):
with open(domain+'.txt','a+',encoding='utf-8')as a:
a.writelines([x+'\n' for x in domain_lists])
if __name__ == '__main__':
result = []
if os.path.exists('domain_log'):
os.remove('domain_log')
multiprocessing.freeze_support()
loop = asyncio.get_event_loop()
domains = list(set([x.strip() for x in open('domains.txt', 'r', encoding='utf-8').readlines()]))
# 要扫描的保存在domains.txt
for domain in domains:
print('当前检测域名为:{}'.format(domain))
dicdic = 'Sub_Big_Dict.txt'
t1 = time.time()
result_0 = get_result(inp=domain, loop=loop,dicdic=dicdic)
print(result_0)
print('二级域名获取数量为 : {} '.format(len(result_0)))
print('耗时 : {}'.format(time.time()-t1))
if result_0 != []:
for url in result_0:
with open('domain_log', 'a+', encoding='utf-8')as a:
a.write(url + '\n')
res = loop.run_until_complete(main(result_0))
#http_result = [x for x in res if x != None]
http_result = result_0
http_result = list(set(http_result))
if http_result != []:
print('二级域名存活数量为 : {} '.format(len(http_result)))
Write_Database(domain,http_result)
# 从这开始爆破三级域名,不过我直接注释掉了,感觉有些浪费时间
# for url_1 in result_0:
# print('\n当前爆破二级域名为: {}'.format(url_1))
# dicdic = 'Sub_Sma_Dict.txt'
# resul_start = get_result(url_1, loop,dicdic=dicdic)
# print('三级域名获取数量为 : {} '.format(len(resul_start)))
# if resul_start != []:
# res = loop.run_until_complete(main(resul_start))
# http_result = [x for x in res if x != None]
# http_result = list(set(http_result))
# if http_result != []:
# print('三级域名存活数量为 : {} '.format(len(http_result)))
# if len(http_result) > 3:
# # 三级域名还能爆破超过三个?判断检测为泛解析,那么随机选择三个就凑合了
# Write_Database(domain, random.sample(http_result,3))
或者这么写
# -*- coding:utf-8 -*-
#__author__:langzi
#__blog__:www.langzi.fun
import asyncio
import aiodns
import aiomultiprocess
import aiohttp
from urllib.parse import urlparse
import multiprocessing
import os
import random
import time
Check_Alive_Status = [200,301,302,404,401]
# 这里可以进行设置状态码存活,符合状态码将会判断为网页存活
# 这里可以进行设置状态码存活,符合状态码将会判断为网页存活
# 这里可以进行设置状态码存活,符合状态码将会判断为网页存活
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36'}
async def run(url):
# print('Scan:'+url)
async with asyncio.Semaphore(1000):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
try:
async with session.get('http://'+url,timeout=15) as resp:
if resp.status in Check_Alive_Status:
content = await resp.read()
#print(content)
if b'Service Unavailable' not in content and b'The requested URL was not found on' not in content and b'The server encountered an internal error or miscon' not in content:
u = urlparse(str(resp.url))
return u.scheme+'://'+u.netloc
except Exception as e:
#print(e)
pass
try:
async with session.get('https://' + url,timeout=15) as resp:
if resp.status in Check_Alive_Status:
content = await resp.read()
#print(content)
if b'Service Unavailable' not in content and b'The requested URL was not found on' not in content and b'The server encountered an internal error or miscon' not in content:
u = urlparse(str(resp.url))
return u.scheme+'://'+u.netloc
except Exception as e:
#print(e)
pass
async def Aio_Subdomain(subdomain):
resolver = aiodns.DNSResolver(timeout=1)
try:
result = await resolver.query(subdomain, 'A')
return subdomain, result
except Exception as e:
return None, None
async def submain(subhosts):
res = set()
async with aiomultiprocess.Pool() as pool:
results = await pool.map(Aio_Subdomain,subhosts)
for result in results:
subdomain, answers = result
if answers != None and subdomain!=None:
res.add(subdomain)
return list(res)
async def main(urls):
async with aiomultiprocess.Pool() as pool:
result = await pool.map(run, urls)
return result
def Write_Database(domain_lists):
domain = 'result__01'
with open(domain+'.txt','a+',encoding='utf-8')as a:
a.writelines([x+'\n' for x in domain_lists])
if __name__ == '__main__':
result = []
if os.path.exists('domain_log'):
os.remove('domain_log')
multiprocessing.freeze_support()
loop = asyncio.get_event_loop()
dicdic = 'Sub_Big_Dict.txt'
domains = list(set([x.strip() for x in open('domains.txt', 'r', encoding='utf-8').readlines()]))
# 要扫描的保存在domains.txt
tasks = list(set([subdoma.strip()+'.'+domain for subdoma in open(dicdic, 'r').readlines() for domain in domains]))
print('检测子域名总数 : {} 总检测数 : {}'.format(len(domains),len(tasks)))
t1 = time.time()
sub_domain_result = loop.run_until_complete(submain(tasks))
print('二级域名获取数量为 : {} '.format(len(sub_domain_result)))
print('耗时 : {}'.format(time.time() - t1))
print('*'*20)
print('url存活检测开始')
res = loop.run_until_complete(main(sub_domain_result))
http_result = list(set([x for x in res if x != None]))
t2 = time.time()
#print(http_result)
if len(http_result)>1000:
# 泛解析 浪费本菜感情
http_result = random.sample(http_result,10)
print('二级域名存活数量为 : {} '.format(len(http_result)))
print('耗时 : {}'.format(time.time()-t2))
if http_result != []:
Write_Database(http_result)
运行结果:
检测子域名总数 : 131 总检测数 : 174230
二级域名获取数量为 : 44156
耗时 : 256.4449830055237
运行速度与dns服务器,本机cpu,宽带,运营商dns请求限制有关系
为了好玩,做了一些有意思的功课,比如py3在cmd下播放音乐。
from playsound import playsound
playsound('1.mp3')
将视频转成图片呀,字符串文本呀,需要使用安装
pip3 install opencv-python
使用的库为
import os
import cv2
import subprocess
from cv2 import VideoWriter, VideoWriter_fourcc, imread, resize
from PIL import Image, ImageFont, ImageDraw
第一种方法为:目录下准备一个1.mp4文件,准备一个file文件夹
代码如下:
# -*- coding:utf-8 -*-
# coding:utf-8
import argparse
import os
import cv2
import subprocess
from cv2 import VideoWriter, VideoWriter_fourcc, imread, resize
from PIL import Image, ImageFont, ImageDraw
# 命令行输入参数处理
# aparser = argparse.ArgumentParser()
# aparser.add_argument('file')
# aparser.add_argument('-o','--output')
# aparser.add_argument('-f','--fps',type = float, default = 24)#帧
# aparser.add_argument('-s','--save',type = bool, nargs='?', default = False, const = True)
# 是否保留Cache文件,默认不保存
# 获取参数
# args = parser.parse_args()
# INPUT = args.file
# OUTPUT = args.output
# SAVE = args.save
# FPS = args.fps
# 像素对应ascii码
ascii_char = list("***")
# ascii_char = list("MNHQ$OC67+>!:-. ")
# ascii_char = list("MNHQ$OC67)oa+>!:+. ")
# 将像素转换为ascii码
def get_char(r, g, b, alpha=256):
if alpha == 0:
return ''
length = len(ascii_char)
gray = int(0.2126 * r + 0.7152 * g + 0.0722 * b)
unit = (256.0 + 1) / length
return ascii_char[int(gray / unit)]
# 将txt转换为图片
def txt2image(file_name):
im = Image.open(file_name).convert('RGB')
# gif拆分后的图像,需要转换,否则报错,由于gif分割后保存的是索引颜色
raw_width = im.width
raw_height = im.height
width = int(raw_width / 6)
height = int(raw_height / 15)
im = im.resize((width, height), Image.NEAREST)
txt = ""
colors = []
for i in range(height):
for j in range(width):
pixel = im.getpixel((j, i))
colors.append((pixel[0], pixel[1], pixel[2]))
if (len(pixel) == 4):
txt += get_char(pixel[0], pixel[1], pixel[2], pixel[3])
else:
txt += get_char(pixel[0], pixel[1], pixel[2])
txt += '\n'
colors.append((255, 255, 255))
im_txt = Image.new("RGB", (raw_width, raw_height), (255, 255, 255))
dr = ImageDraw.Draw(im_txt)
# font = ImageFont.truetype(os.path.join("fonts","汉仪楷体简.ttf"),18)
font = ImageFont.load_default().font
x = y = 0
# 获取字体的宽高
font_w, font_h = font.getsize(txt[1])
font_h *= 1.37 # 调整后更佳
# ImageDraw为每个ascii码进行上色
for i in range(len(txt)):
if (txt[i] == '\n'):
x += font_h
y = -font_w
# self, xy, text, fill = None, font = None, anchor = None,
# *args, ** kwargs
dr.text((y, x), txt[i], fill=colors[i])
# dr.text((y, x), txt[i], font=font, fill=colors[i])
y += font_w
name = file_name
# print(name + ' changed')
im_txt.save(name)
# 将视频拆分成图片
def video2txt_jpg(file_name):
vc = cv2.VideoCapture(file_name)
c = 1
if vc.isOpened():
r, frame = vc.read()
if not os.path.exists('Cache'):
os.mkdir('Cache')
os.chdir('Cache')
else:
r = False
while r:
cv2.imwrite(str(c) + '.jpg', frame)
txt2image(str(c) + '.jpg') # 同时转换为ascii图
r, frame = vc.read()
c += 1
os.chdir('..')
return vc
# 将图片合成视频
def jpg2video(outfile_name, fps):
fourcc = VideoWriter_fourcc(*"MJPG")
images = os.listdir('Cache')
im = Image.open('Cache/' + images[0])
vw = cv2.VideoWriter(outfile_name + '.avi', fourcc, fps, im.size)
os.chdir('Cache')
for image in range(len(images)):
# Image.open(str(image)+'.jpg').convert("RGB").save(str(image)+'.jpg')
frame = cv2.imread(str(image + 1) + '.jpg')
vw.write(frame)
# print(str(image + 1) + '.jpg' + ' finished')
os.chdir('..')
vw.release()
# 递归删除目录
def remove_dir(path):
if os.path.exists(path):
if os.path.isdir(path):
dirs = os.listdir(path)
for d in dirs:
if os.path.isdir(path + '/' + d):
remove_dir(path + '/' + d)
elif os.path.isfile(path + '/' + d):
os.remove(path + '/' + d)
os.rmdir(path)
return
elif os.path.isfile(path):
os.remove(path)
return
# 调用ffmpeg获取mp3音频文件
def video2mp3(file_name):
outfile_name = file_name.split('.')[0] + '.mp3'
subprocess.call('ffmpeg -i ' + file_name + ' -f mp3 ' + outfile_name, shell=True)
# 合成音频和视频文件
def video_add_mp3(file_name, mp3_file):
outfile_name = file_name.split('.')[0] + '-txt.mp4'
subprocess.call('ffmpeg -i ' + file_name + ' -i ' + mp3_file + ' -strict -2 -f mp4 ' + outfile_name, shell=True)
if __name__ == '__main__':
INPUT = r"1.mp4"
OUTPUT = r"2.mp4"
SAVE = r"file"
FPS = "24"
print('开始将视频转换成图片和文本')
vc = video2txt_jpg(INPUT)
print('开始获取帧率')
FPS = vc.get(cv2.CAP_PROP_FPS) # 获取帧率
print('视频帧率为 : '+str(FPS))
vc.release()
print('开始将图片转成视频')
jpg2video(INPUT.split('.')[0], FPS)
# print('开始将视频转成mp3')
# print(INPUT, INPUT.split('.')[0] + '.mp3')
# print('开始将视频声音添加到文本')
# video2mp3(INPUT)
# video_add_mp3(INPUT.split('.')[0] + '.avi', INPUT.split('.')[0] + '.mp3')
#
# if (not SAVE):
# remove_dir("Cache")
# os.remove(INPUT.split('.')[0] + '.mp3')
# os.remove(INPUT.split('.')[0] + '.avi')
方法二,使用ffmpeg对视频处理,需要下载ffmpeg,这里打包,然后下载地址为:
添加到系统环境变量
在cmd下使用命令:
#('ffmpeg -i 1.mp4 -r 60 -f image2 files\%05d.png'
# 1.mp4 视频文件 -r 24 代表帧率 files 输出图片文件夹
在目录下新建一个txtfiles文件夹,准备一个1.MP4文件
代码:
# -*- coding:utf-8 -*-
#__author__:langzi
#__blog__:www.langzi.fun
import os
from PIL import Image # 如果没有该库,请 pip install PIL
import numpy # 如果没有该库,请 pip install numpy
#('ffmpeg -i 1.mp4 -r 24 -f image2 files\%05d.png'(video_path, frame, image_path))
#('ffmpeg -i 1.mp4 -r 60 -f image2 files\%05d.png'
# 1.mp4 视频文件 -r 24 代表帧率 files 输出图片文件夹
import random
ascii_char = list("***")
def image_to_txt(image_path, txt_path):
txt_count = 1 # 用于命名txt文件
fileList = os.listdir(image_path) # 返回所有图片名称,是个字符串列表
for file in fileList: # 遍历每一张图片
img = Image.open(image_path + '\\'+ file).convert('L')
# 这里使用到PIL库convert函数,将RGB图片转化为灰度图,参数'L'代表转化为灰度图
charWidth = 140
# 这个是设置你后面在cmd里面显示内容的窗口大小,请根据自己的情况,适当调整值
img = img.resize((charWidth, 40))
target_width, target_height = img.size
data = numpy.array(img)[:target_height, :target_width]
# 使用numpy库,将图像转化为数组
with open(txt_path + '\\' + str(txt_count) + '.txt', 'w', encoding='utf-8') as f:
txt_count += 1 # 一张图对应一个txt文件,所以每遍历一张图,该值加一
for row in data:
for pixel in row:
if pixel < 127: # 如果灰度值小于127,也就是偏黑的,就写一个字符 '*'
f.write(random.choice(ascii_char))
else:
f.write(' ')
f.write('\n')
image_to_txt('files','txtfiles')
import time
def run(txt_path):
fileList = os.listdir(txt_path)
for i in range(1, len(fileList)+1): # 遍历所有的txt文件
try:
os.system('type ' + txt_path + '\\' + str(i) + '.txt')
# 这里type命令是Windows下的命令,相信很多人没怎么用过,你试一下就知道,type+文件名,就可以在cmd里面显示文件内容
os.system('cls')
# 清屏的意思,每显示一次txt文件,就清屏,然后显示下一个txt
# 这里还可以适当的加延时函数,如果显示的太快的话
#time.sleep(0.02)
except:
print('ERROR !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
if __name__ == '__main__':
txt_path = r'txtfiles'
run(txt_path)