分析Nginx日志并屏蔽高频访问IP
日志分析脚本
创建python脚本 /data/sh/block_ips.py
并设置可执行
#!/usr/bin/env python3
import re
import os
import time
import json
import argparse
from collections import Counter
from datetime import datetime, timedelta
# 配置参数
LOG_FILE = '/data/bt/wwwlogs/www.xxxxx.com.log' # 日志文件路径
BLOCK_FILE = '/data/bt/block_ips/block_ips.conf' # 封禁IP配置文件
TEMP_BLOCK_FILE = '/data/bt/block_ips/block_ips.conf.tmp' # 临时封禁文件
WHITELIST_FILE = '/data/bt/block_ips/whitelist.json' # 白名单文件
LOG_DATE_FORMAT = '%d/%b/%Y' # 日志中的日期格式
THRESHOLD = 5000 # 封禁阈值(请求次数)
BLOCK_DURATION = 864000 # 封禁时长(秒)
ANALYZE_DAYS = 1 # 分析最近多少天的日志
NGINX_RELOAD_CMD = 'systemctl reload nginx' # 重载Nginx命令
def load_whitelist():
"""加载IP白名单"""
try:
if os.path.exists(WHITELIST_FILE):
with open(WHITELIST_FILE, 'r') as f:
return json.load(f)
return []
except Exception as e:
print(f"加载白名单失败: {e}")
return []
def analyze_logs():
"""分析日志文件,找出需要封禁的IP"""
print(f"开始分析日志文件: {LOG_FILE}")
ip_counter = Counter()
whitelist = load_whitelist()
try:
# 计算需要分析的日期范围
today = datetime.now()
date_range = [(today - timedelta(days=i)).strftime(LOG_DATE_FORMAT)
for i in range(ANALYZE_DAYS)]
with open(LOG_FILE, 'r') as f:
line_count = 0
for line in f:
line_count += 1
if line_count % 100000 == 0:
print(f"已处理 {line_count} 行日志")
# 提取IP地址(假设格式为日志的第一个字段)
match = re.search(r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', line)
if not match:
continue
ip = match.group(1)
# 跳过白名单IP
if ip in whitelist:
continue
# 检查日志时间(假设格式为 [day/month/year:hour:minute:second zone])
date_match = re.search(r'\[(\d{2}/\w+/\d{4}):', line)
if date_match and date_match.group(1) in date_range:
ip_counter[ip] += 1
except Exception as e:
print(f"分析日志时出错: {e}")
return []
# 找出超过阈值的IP
blocked_ips = {ip: count for ip, count in ip_counter.items() if count > THRESHOLD}
print(f"分析完成,共找到 {len(blocked_ips)} 个需要封禁的IP")
return blocked_ips
def load_existing_blocks():
"""加载现有的封禁列表和时间"""
existing_blocks = {}
if os.path.exists(BLOCK_FILE):
try:
with open(BLOCK_FILE, 'r') as f:
for line in f:
# 匹配格式: # 封禁IP: 1.2.3.4, 封禁时间: 1620000000, 解封时间: 1620864000
match = re.search(r'# 封禁IP: (\d+\.\d+\.\d+\.\d+), 封禁时间: (\d+), 解封时间: (\d+)', line)
if match:
ip = match.group(1)
block_time = int(match.group(2))
unblock_time = int(match.group(3))
# 只保留未过期的封禁
if unblock_time > time.time():
existing_blocks[ip] = {
'block_time': block_time,
'unblock_time': unblock_time,
'is_new': False # 标记为现有封禁,不是新添加的
}
except Exception as e:
print(f"加载现有封禁列表时出错: {e}")
return existing_blocks
def generate_block_file(new_blocks, existing_blocks):
"""生成Nginx封禁IP配置文件"""
try:
current_time = int(time.time())
# 合并新封禁和现有封禁
all_blocks = existing_blocks.copy()
for ip, count in new_blocks.items():
if ip not in all_blocks:
all_blocks[ip] = {
'block_time': current_time,
'unblock_time': current_time + BLOCK_DURATION,
'is_new': True # 标记为新封禁
}
print(f"总共有 {len(all_blocks)} 个IP被封禁(包括现有封禁)")
# 生成临时封禁文件
with open(TEMP_BLOCK_FILE, 'w') as f:
f.write("# 自动生成的IP封禁列表\n")
f.write(f"# 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("# 格式: # 封禁IP: IP地址, 封禁时间: 时间戳, 解封时间: 时间戳\n")
f.write("# deny IP地址;\n\n")
# 写入封禁IP和时间信息
for ip, info in all_blocks.items():
f.write(f"# 封禁IP: {ip}, 封禁时间: {info['block_time']}, 解封时间: {info['unblock_time']}\n")
f.write(f"deny {ip};\n\n")
# 替换正式封禁文件
os.replace(TEMP_BLOCK_FILE, BLOCK_FILE)
print(f"封禁文件已更新: {BLOCK_FILE}")
# 返回新添加的封禁IP列表
new_ips = [ip for ip, info in all_blocks.items() if info['is_new']]
return new_ips
except Exception as e:
print(f"生成封禁文件时出错: {e}")
return []
def reload_nginx():
"""重新加载Nginx配置"""
try:
# 检查配置文件语法
check_cmd = "nginx -t"
check_result = os.system(check_cmd)
if check_result != 0:
print("Nginx配置检查失败,不会重新加载")
return False
# 重新加载配置
reload_result = os.system(NGINX_RELOAD_CMD)
if reload_result != 0:
print("重新加载Nginx失败")
return False
print("Nginx配置已成功重新加载")
return True
except Exception as e:
print(f"重新加载Nginx时出错: {e}")
return False
def main():
# 解析命令行参数
parser = argparse.ArgumentParser(description='Nginx IP 封禁自动化工具')
parser.add_argument('--threshold', type=int, default=THRESHOLD,
help='封禁阈值(请求次数)')
parser.add_argument('--duration', type=int, default=BLOCK_DURATION,
help='封禁时长(秒)')
parser.add_argument('--days', type=int, default=ANALYZE_DAYS,
help='分析最近多少天的日志')
parser.add_argument('--test', action='store_true',
help='测试模式,不实际修改配置文件')
args = parser.parse_args()
# 更新全局配置
# global THRESHOLD, BLOCK_DURATION, ANALYZE_DAYS
# THRESHOLD = args.threshold
# BLOCK_DURATION = args.duration
# ANALYZE_DAYS = args.days
print(f"开始执行IP封禁分析,阈值: {THRESHOLD},封禁时长: {BLOCK_DURATION}秒")
# 分析日志
new_blocks = analyze_logs()
if not new_blocks:
print("没有找到需要封禁的IP,退出")
return
# 加载现有封禁
existing_blocks = load_existing_blocks()
if args.test:
print("测试模式: 以下IP将被封禁")
for ip, count in new_blocks.items():
print(f" {ip}: {count} 次请求")
print("测试模式,不会修改配置文件")
return
# 生成封禁文件
new_ips = generate_block_file(new_blocks, existing_blocks)
if new_ips:
print(f"新封禁了 {len(new_ips)} 个IP")
# 重新加载Nginx
reload_nginx()
else:
print("没有新增封禁IP")
if __name__ == "__main__":
main()
白名单文件
创建白名单文件 /data/bt/block_ips/whitelist.json
[
"127.0.0.1",
"192.168.1.0/24", # 示例:允许整个网段
"203.0.113.10" # 示例:允许单个IP
]
配置 Nginx
# 在 http 段或 server 段中添加
include /etc/nginx/block_ips.conf;
设置定时任务
crontab -e
30 23 * * * /data/sh/block_ips.py > /var/log/block_ips.log 2>&1
最后更新于 2025-06-23 18:19:51 并被添加「」标签,已有 24 位童鞋阅读过。
本站使用「署名 4.0 国际」创作共享协议,可自由转载、引用,但需署名作者且注明文章出处
此处评论已关闭