> 文档中心 > 【 Smalltools 】监控域名证书到期告警

【 Smalltools 】监控域名证书到期告警

【 Smalltools 】监控域名证书到期告警

简单小脚本,分享给大家!!!

#!/usr/bin/python3# pip install pyopenssl cryptography idnaimport datetime#获取系统当前时间Startime=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")from OpenSSL import SSLfrom cryptography import x509from cryptography.x509.oid import NameOIDimport idnaimport timeimport jsonimport urllibimport requestsfrom socket import socketfrom collections import namedtupleimport smtplibfrom email.mime.text import MIMETextimport requests, sys, jsonimport urllib3urllib3.disable_warnings()# ------------------------------ 微信相关配置 ---------------------Corpid = "xxxx"Secret = "yyyy"# 应用IDAgentid = "ssss"Token_config = r'/tmp/zabbix_wechat_config.json'def GetTokenFromServer(Corpid, Secret):    """获取access_token"""    Url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"    Data = { "corpid": Corpid, "corpsecret": Secret    }    r = requests.get(url=Url, params=Data, verify=False)    print(r.json())    if r.json()['errcode'] != 0: return False    else: Token = r.json()['access_token'] file = open(Token_config, 'w') file.write(r.text) file.close() return Tokendef SendMessage(Partyid, Subject, Content):    """发送消息"""    try: file = open(Token_config, 'r') Token = json.load(file)['access_token'] file.close()    except: Token = GetTokenFromServer(Corpid, Secret)    # 发送消息    Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token    Data = { "toparty": Partyid, "msgtype": "text", "agentid": Agentid, "text": {"content": Subject + '\n' + Content}, "safe": "0"    }    r = requests.post(url=Url, data=json.dumps(Data), verify=False)    n = 1    while r.json()['errcode'] != 0 and n < 4: n = n + 1 Token = GetTokenFromServer(Corpid, Secret) if Token:     Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token     r = requests.post(url=Url, data=json.dumps(Data), verify=False)     print(r.json())    return r.json()# --------------------------------- 证书相关配置 ----------------------------HostInfo = namedtuple(field_names='cert hostname peername', typename='HostInfo')#要监控的地址,可以填写多个HOSTS = [    ('toptops.top', 443),    ('harbor.toptops.top', 443),    ('k8s.toptops.top', 443),]def verify_cert(cert, hostname):    cert.has_expired()def get_certificate(hostname, port):    hostname_idna = idna.encode(hostname)    sock = socket()    sock.connect((hostname, port))    peername = sock.getpeername()    ctx = SSL.Context(SSL.SSLv23_METHOD) # most compatible    ctx.check_hostname = False    ctx.verify_mode = SSL.VERIFY_NONE    sock_ssl = SSL.Connection(ctx, sock)    sock_ssl.set_connect_state()    sock_ssl.set_tlsext_host_name(hostname_idna)    sock_ssl.do_handshake()    cert = sock_ssl.get_peer_certificate()    crypto_cert = cert.to_cryptography()    sock_ssl.close()    sock.close()    return HostInfo(cert=crypto_cert, peername=peername, hostname=hostname)def print_basic_info(hostinfo):    start = time.mktime(time.strptime(str(Startime), '%Y-%m-%d %H:%M:%S'))    end = time.mktime(time.strptime(str(hostinfo.cert.not_valid_after), '%Y-%m-%d %H:%M:%S'))    count_days = int((end - start) / (24 * 60 * 60))    s = '''{hostname}    \t起始时间: {startime}    \t到期时间:  {endtime}    \t剩余过期天数:{count}    '''.format(     hostname=hostinfo.hostname,     startime=hostinfo.cert.not_valid_before,     endtime=hostinfo.cert.not_valid_after,     count=count_days    )    print(s)    text='SSL证书有效天数数量小于7天,请及时更换:' \  '%s' % (s)    if count_days <= 7: Partyid = '1' Subject = str("") Status = SendMessage(Partyid, Subject, Content=text)import concurrent.futuresif __name__ == '__main__':    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as e: for hostinfo in e.map(lambda x: get_certificate(x[0], x[1]), HOSTS):     print_basic_info(hostinfo)
# 执行脚本toptops.top    起始时间: 2020-07-18 03:21:18    到期时间:  2022-08-02 10:57:01    剩余过期天数:174harbor.toptops.top    起始时间: 2021-07-14 09:45:08    到期时间:  2022-08-15 09:45:08    剩余过期天数:187k8s.toptops.top    起始时间: 2020-07-18 03:21:18    到期时间:  2022-08-02 10:57:01    剩余过期天数:174

51mike麦克疯