【 Smalltools 】监控域名证书到期告警
简单小脚本,分享给大家!!!
#!/usr/bin/python3# pip install pyopenssl cryptography idnaimport datetime#获取系统当前时间Startime=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")from OpenSSL import SSLfrom cryptography import x509from cryptography.x509.oid import NameOIDimport idnaimport timeimport jsonimport urllibimport requestsfrom socket import socketfrom collections import namedtupleimport smtplibfrom email.mime.text import MIMETextimport requests, sys, jsonimport urllib3urllib3.disable_warnings()# ------------------------------ 微信相关配置 ---------------------Corpid = "xxxx"Secret = "yyyy"# 应用IDAgentid = "ssss"Token_config = r'/tmp/zabbix_wechat_config.json'def GetTokenFromServer(Corpid, Secret): """获取access_token""" Url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" Data = { "corpid": Corpid, "corpsecret": Secret } r = requests.get(url=Url, params=Data, verify=False) print(r.json()) if r.json()['errcode'] != 0: return False else: Token = r.json()['access_token'] file = open(Token_config, 'w') file.write(r.text) file.close() return Tokendef SendMessage(Partyid, Subject, Content): """发送消息""" try: file = open(Token_config, 'r') Token = json.load(file)['access_token'] file.close() except: Token = GetTokenFromServer(Corpid, Secret) # 发送消息 Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token Data = { "toparty": Partyid, "msgtype": "text", "agentid": Agentid, "text": {"content": Subject + '\n' + Content}, "safe": "0" } r = requests.post(url=Url, data=json.dumps(Data), verify=False) n = 1 while r.json()['errcode'] != 0 and n < 4: n = n + 1 Token = GetTokenFromServer(Corpid, Secret) if Token: Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token r = requests.post(url=Url, data=json.dumps(Data), verify=False) print(r.json()) return r.json()# --------------------------------- 证书相关配置 ----------------------------HostInfo = namedtuple(field_names='cert hostname peername', typename='HostInfo')#要监控的地址,可以填写多个HOSTS = [ ('toptops.top', 443), ('harbor.toptops.top', 443), ('k8s.toptops.top', 443),]def verify_cert(cert, hostname): cert.has_expired()def get_certificate(hostname, port): hostname_idna = idna.encode(hostname) sock = socket() sock.connect((hostname, port)) peername = sock.getpeername() ctx = SSL.Context(SSL.SSLv23_METHOD) # most compatible ctx.check_hostname = False ctx.verify_mode = SSL.VERIFY_NONE sock_ssl = SSL.Connection(ctx, sock) sock_ssl.set_connect_state() sock_ssl.set_tlsext_host_name(hostname_idna) sock_ssl.do_handshake() cert = sock_ssl.get_peer_certificate() crypto_cert = cert.to_cryptography() sock_ssl.close() sock.close() return HostInfo(cert=crypto_cert, peername=peername, hostname=hostname)def print_basic_info(hostinfo): start = time.mktime(time.strptime(str(Startime), '%Y-%m-%d %H:%M:%S')) end = time.mktime(time.strptime(str(hostinfo.cert.not_valid_after), '%Y-%m-%d %H:%M:%S')) count_days = int((end - start) / (24 * 60 * 60)) s = '''{hostname} \t起始时间: {startime} \t到期时间: {endtime} \t剩余过期天数:{count} '''.format( hostname=hostinfo.hostname, startime=hostinfo.cert.not_valid_before, endtime=hostinfo.cert.not_valid_after, count=count_days ) print(s) text='SSL证书有效天数数量小于7天,请及时更换:' \ '%s' % (s) if count_days <= 7: Partyid = '1' Subject = str("") Status = SendMessage(Partyid, Subject, Content=text)import concurrent.futuresif __name__ == '__main__': with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e: for hostinfo in e.map(lambda x: get_certificate(x[0], x[1]), HOSTS): print_basic_info(hostinfo)
# 执行脚本toptops.top 起始时间: 2020-07-18 03:21:18 到期时间: 2022-08-02 10:57:01 剩余过期天数:174harbor.toptops.top 起始时间: 2021-07-14 09:45:08 到期时间: 2022-08-15 09:45:08 剩余过期天数:187k8s.toptops.top 起始时间: 2020-07-18 03:21:18 到期时间: 2022-08-02 10:57:01 剩余过期天数:174