環(huán)境準備:
1.安裝python3.7和相關(guān)的依賴
并安裝redis緩存數(shù)據(jù)庫
pip install aliyun-python-sdk-core
pip install aliyun-python-sdk-slb
pip intall IPy
pip intall redis
pip intall paramiko
2.添加ram訪問控制的編程接口用戶
3.添加slb的訪問控制策略并和需要頻控的slb進行綁定
redis封堵ip的格式
腳本程序目錄
Aliyun_SLB_Manager
├── helpers
│ ├── common.py
│ ├── email.py
│ ├── remote.py
│ └── slb.py
├── logs
│ └── run_20210204.log
└── run.py
# 程序核心就是使用shell命令對nginx的日志中出現(xiàn)的ip地址 和 訪問的接口進行過濾,找出訪問頻繁的那些程序加入slb黑名單,同時加入redis緩存,因為slb有封堵ip個數(shù)限制,redis中存儲的ip需要設(shè)置過期時間,對比后刪除slb中封堵的Ip
# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200
2454 114.248.45.15
1576 47.115.122.23
1569 47.107.239.148
269 112.32.217.52
grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'
[root@alisz-edraw-api-server-web01:~]# grep 04/Feb/2021:15:4 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -3
2454 114.248.45.15
1576 47.115.122.23
1569 47.107.239.148
python腳本
主入口程序
run.py
import time
from helpers.email import send_mail
from helpers.remote import get_black_ips
from helpers.common import is_white_ip,get_ban_ip_time,set_ban_ip_time,groups
from helpers.slb import slb_add_host,slb_del_host,slb_get_host
if __name__ == "__main__":
# aliyun 訪問控制針對 slb 的管理用戶
# 用戶登錄名稱 slb-frequency-user@xxx.onaliyun.com
accessKeyId = 'id'
accessSecret = 'pass'
# slb 訪問控制策略id
acl_id = 'acl-slb'
# reginid 查詢地址:https://help.aliyun.com/document_detail/40654.html?spm=a2c6h.13066369.0.0.54a17471VmN3kA
region_id = 'cn-shenzhen'
# 黑名單限制個數(shù) 300
slb_limit = 200
# 每10分鐘訪問限制閾值
threshold = 50
# 接收郵箱
mails = ['reblue520@chinasoft.cn']
# 遠程ssh執(zhí)行g(shù)rep過濾出可疑ip
res = get_black_ips(threshold)
deny_host_list = res[0]
hosts_with_count = res[1]
hosts_with_count = sorted(hosts_with_count.items(), key=lambda x: x[1] , reverse=True)
print(hosts_with_count)
# exit()
# 等待被ban的ip , 過濾掉ip白名單
deny_hosts = []
for host in deny_host_list:
if (is_white_ip(host) == False):
deny_hosts.append(host + '/32')
# 獲取所有已經(jīng)被ban的ip
response = slb_get_host(accessKeyId , accessSecret , acl_id , region_id)
denied_hosts = []
if('AclEntrys' in response.keys()):
for item in response['AclEntrys']['AclEntry']:
denied_hosts.append(item['AclEntryIP'])
# 被ban超過2天,首先移除
must_del_hosts = []
denied_hosts_clone = denied_hosts.copy()
for host in denied_hosts:
if (get_ban_ip_time(host) == 0 or (get_ban_ip_time(host) int(round(time.time())) - 2* 24 * 3600)):
must_del_hosts.append(host)
denied_hosts_clone.remove(host)
# 排除相同的
deny_hosts_new = []
for item in deny_hosts:
if(item not in denied_hosts_clone):
deny_hosts_new.append(item)
# 兩者和超過300的限制
if((len(denied_hosts_clone)+len(deny_hosts_new))>slb_limit):
denied_hosts_detail = {}
for host in denied_hosts_clone:
denied_hosts_detail[host] = get_ban_ip_time(host)
# 需要排除的數(shù)量
num = len(denied_hosts_clone) + len(deny_hosts_new) - slb_limit
denied_hosts_detail = sorted(denied_hosts_detail.items(), key=lambda x: x[1])
denied_hosts_detail = denied_hosts_detail[:num]
for item in denied_hosts_detail:
must_del_hosts.append(item[0])
print("denied:",denied_hosts)
print("delete:",must_del_hosts)
print("add:",deny_hosts_new)
# exit()
# 先刪除一部分 must_del_hosts
if(len(must_del_hosts)>0):
if (len(must_del_hosts)>50):
must_del_hosts_clone = groups(must_del_hosts,50)
for item in must_del_hosts_clone:
slb_del_host(item, accessKeyId, accessSecret, acl_id, region_id)
time.sleep(1)
else :
slb_del_host(must_del_hosts, accessKeyId, accessSecret, acl_id, region_id)
# 再新增 deny_hosts_new
if(len(deny_hosts_new)>0):
if(len(deny_hosts_new)>50):
deny_hosts_new_clone = groups(deny_hosts_new,50)
for item in deny_hosts_new_clone:
slb_add_host(item, accessKeyId, accessSecret, acl_id, region_id)
time.sleep(1)
else:
slb_add_host(deny_hosts_new, accessKeyId, accessSecret, acl_id, region_id)
# 記錄ip被禁時間
for host in deny_hosts_new:
set_ban_ip_time(host)
if (len(deny_hosts_new) >= 1):
mail_content = ''
if(len(must_del_hosts) > 0):
mail_content += "以下黑名單已被解禁("+str(len(must_del_hosts))+"):\n"+"\n".join(must_del_hosts) + "\n"
mail_content += "\n新增以下ip黑名單("+str(len(deny_hosts_new))+"):\n"+"\n".join(deny_hosts_new)
mail_content += "\n\n10分鐘訪問超過15次("+str(len(hosts_with_count))+"):\n"
for item in hosts_with_count:
mail_content += str(item[1]) + " " + str(item[0]) + "\n"
mail_content += "\n\n黑名單("+str(len(denied_hosts))+"個):\n"
for item in denied_hosts:
mail_content += str(item) + "\n"
send_mail(mail_content , mails)
slb操作相關(guān)的腳本
slb.py
import logging , json
from aliyunsdkcore.client import AcsClient
from aliyunsdkslb.request.v20140515.AddAccessControlListEntryRequest import AddAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.RemoveAccessControlListEntryRequest import RemoveAccessControlListEntryRequest
from aliyunsdkslb.request.v20140515.DescribeAccessControlListAttributeRequest import DescribeAccessControlListAttributeRequest
# 阿里云slb訪問控制里添加ip
def slb_add_host(hosts, accessKeyId, accessSecret, acl_id, region_id):
client = AcsClient(accessKeyId, accessSecret, region_id)
request = AddAccessControlListEntryRequest()
request.set_accept_format('json')
logging.info("正在封印IP:%s" % ",".join(hosts))
try:
add_hosts = []
for host in hosts:
add_hosts.append({"entry": host, "comment": "deny"})
request.set_AclEntrys(add_hosts)
request.set_AclId(acl_id)
response = client.do_action_with_exception(request)
print(response)
except BaseException as e:
logging.error("添加黑名單失敗,原因:%s" % e)
# slb刪除ip
def slb_del_host(hosts, accessKeyId, accessSecret, acl_id , region_id = 'us-west-1'):
logging.info("正在解封IP:%s" % ",".join(hosts))
try:
del_hosts = []
for host in hosts:
del_hosts.append({"entry": host, "comment": "deny"})
client = AcsClient(accessKeyId, accessSecret, region_id)
request = RemoveAccessControlListEntryRequest()
request.set_accept_format('json')
request.set_AclEntrys(del_hosts)
request.set_AclId(acl_id)
client.do_action_with_exception(request)
logging.info("slb刪除IP:%s成功" % ",".join(hosts)) # 查看調(diào)用接口結(jié)果
logging.info("slb刪除IP:%s成功" % ",".join(hosts)) # 查看調(diào)用接口結(jié)果
except BaseException as e:
logging.error("移出黑名單失敗,原因:%s" % e)
# 阿里云slb獲取IP黑名單列表
def slb_get_host(accessKeyId, accessSecret, acl_id, region_id):
client = AcsClient(accessKeyId, accessSecret, region_id)
request = DescribeAccessControlListAttributeRequest()
request.set_accept_format('json')
try:
request.set_AclId(acl_id)
response = client.do_action_with_exception(request)
data_sub = json.loads((response.decode("utf-8")))
return data_sub
except BaseException as e:
logging.error("獲取黑名單失敗,原因:%s" % e)
遠程操作日志的腳本
remote.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import re
import paramiko
def get_black_ips(threshold = 100):
# file = '/data/www/logs/nginx_log/access/*api*_access.log'
file = '/data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log'
# 可以 ssh 訪問服務(wù)器 nginx 日志的用戶信息
username = 'apache'
passwd = 'pass'
ten_min_time = (datetime.datetime.now() - datetime.timedelta(minutes=10)).strftime("%d/%b/%Y:%H:%M")
ten_min_time = ten_min_time[:-1]
# 線上 需要對日志進行過濾的目標服務(wù)器,一般是內(nèi)網(wǎng)ip,本地調(diào)試時可以直接使用外網(wǎng)ip方便調(diào)試
ssh_hosts = ['1.1.1.1']
deny_host_list = []
for host in ssh_hosts:
'''
# 過濾日志文件,需要顯示如下效果,次數(shù) ip地址,需要定位具體的api接口,否則誤傷率極高
# grep 04/Feb/2021:15:2 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -5 | awk '{if ($1 >15)print $1,$2}'
2998 116.248.89.2
2381 114.248.45.15
1639 47.107.239.148
1580 47.115.122.23
245 59.109.149.45
'''
shell = (
# "grep %s %s | grep '/index.php?submod=checkoutmethod=indexpid' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'") % (
# grep 04/Feb/2021:14:5 /data/www/logs/nginx_log/access/masterapi.chinasoft.cn_access.log | grep '/api/user' | awk '{print $1}' | awk -F ':' '{print $2}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >15)print $1,$2}'
"grep %s %s | grep '/api' | awk '{print $1}' | sort | uniq -c | sort -r -n | head -200 | awk '{if ($1 >2000)print $1,$2}'") % (
ten_min_time, file)
print(shell)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, port=2020, username=username, password=passwd)
stdin, stdout, stderr = ssh.exec_command(shell)
result = stdout.read().decode(encoding="utf-8")
deny_host_re = re.compile(r'\d{1,99} \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
deny_host_re = deny_host_re.findall(result)
deny_host_list = deny_host_list + deny_host_re
uniq_host = {}
for host_str in deny_host_list:
tmp = host_str.split(' ')
if tmp[1] in uniq_host:
uniq_host[tmp[1]] += int(tmp[0])
else:
uniq_host[tmp[1]] = int(tmp[0])
deny_host_list = []
for v in uniq_host:
if (uniq_host[v] > threshold):
deny_host_list.append(v)
return [deny_host_list , uniq_host]
發(fā)送郵件的腳本
email.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import logging
def send_mail(host , receivers):
# 發(fā)送郵件的服務(wù)器,用戶信息
mail_host = "smtpdm-ap-southeast-1.aliyun.com"
mail_user = "admin@mail.chinasoft.com"
mail_pass = "pass"
sender = 'admin@mail.chinasoft.com'
message = MIMEText('chinasoft國內(nèi)接口被刷,單個IP最近10分鐘內(nèi)訪問超過閾值100次會收到此郵件告警!!!!\n%s' % (host), 'plain', 'utf-8')
message['From'] = Header("chinasoft國內(nèi)接口被刷", 'utf-8')
subject ='[DDOS]購買鏈接接口異常鏈接!!'
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP(mail_host, 80)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
logging.info("郵件發(fā)送成功")
except smtplib.SMTPException as e:
logging.error("發(fā)送郵件失敗,原因:%s" % e)
配置文件
common.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import IPy
from functools import reduce
import redis,time
def groups(L1,len1):
groups=zip(*(iter(L1),)*len1)
L2=[list(i) for i in groups]
n=len(L1) % len1
L2.append(L1[-n:]) if n !=0 else L2
return L2
def ip_into_int(ip):
return reduce(lambda x, y: (x 8) + y, map(int, ip.split('.')))
# 過濾掉內(nèi)網(wǎng)ip
def is_internal_ip(ip):
ip = ip_into_int(ip)
net_a = ip_into_int('10.255.255.255') >> 24
net_b = ip_into_int('172.31.255.255') >> 20
net_c = ip_into_int('192.168.255.255') >> 16
return ip >> 24 == net_a or ip >> 20 == net_b or ip >> 16 == net_c
# 是否為白名單ip (公司內(nèi)網(wǎng)+集群內(nèi)網(wǎng)ip+slb和需要互訪的服務(wù)器ip避免誤殺)
def is_white_ip(ip):
if (is_internal_ip(ip)):
return True
white_hosts = [
# web-servers
'1.1.1.1',
'1.1.1.2',
];
for white in white_hosts:
if (ip in IPy.IP(white)):
return True
return False
def get_ban_ip_time(ip):
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)
client = redis.Redis(connection_pool=pool)
key = 'slb_ban_'+ip
val = client.get(key)
if val == None:
return 0
else :
return int(val)
def set_ban_ip_time(ip):
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1)
client = redis.Redis(connection_pool=pool)
key = 'slb_ban_'+ip
timestamp = time.time()
timestamp = int(round(timestamp))
return client.set(key , timestamp , 86400)
本地可以直接運行run.py進行調(diào)試
到此這篇關(guān)于python腳本使用阿里云slb對惡意攻擊進行封堵的實現(xiàn)的文章就介紹到這了,更多相關(guān)python腳本阿里云slb內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- 利用Python+阿里云實現(xiàn)DDNS動態(tài)域名解析的方法
- 阿里云 CentOS7.4 安裝 Python3.6的方法講解
- Python3編程實現(xiàn)獲取阿里云ECS實例及監(jiān)控的方法
- 在阿里云服務(wù)器上配置CentOS+Nginx+Python+Flask環(huán)境