通过腾讯云申请的key,使用api接口管理域名,有缘看到这个脚本的小伙伴,如果觉得有点意思,可以帮助博主改改代码,感谢!
# /usr/bin/env python3 # -*-coding:utf-8-*- # 脚本默认运行在命令行下,需要作修改自行修改即可 # 使用脚本前注意填入腾讯云申请到的SecretId、Secret_Key # 对域名进行批量增删改时需要注意在脚本的同一路径下domain_list.txt写入相关信息,脚本会自动生成此文件 # 脚本会记录增删改时的历史记录TencentDNS.log,以便操作失误时方便进行回滚 # 本人学艺不精,难免会有bug,所以请严格按照帮助使用操作 # 脚本完成于2019/11/8 import argparse import base64 import hashlib import hmac import json import os import random import sys import time from urllib.request import quote import requests class ManageDomain(): def __init__(self): # 填入腾讯云的Secretkey、SecretID self.secret_id = '' self.secret_key = '' self.req_api_url = 'https://cns.api.qcloud.com/v2/index.php?' self.host = 'cns.api.qcloud.com/v2/index.php' def get_signature(self, action, **kwargs): dict_args = {} dict_args.update(kwargs) dict_args['Action'] = action dict_args['Nonce'] = random.randint(1000, 10000000) dict_args['SecretId'] = self.secret_id dict_args['SignatureMethod'] = 'HmacSHA256' dict_args['Timestamp'] = int(time.time()) dict_args = {str(k): str(v) for k, v in dict_args.items()} # 生成签名字符串,请求方法 + 请求主机 +请求路径 + ? + 请求字符串 # 这里使用GET方法,注意将字典内的key进行排序 dict_args = {str(k): str(v) for k, v in dict_args.items()} sig_str = 'GET' + self.host + '?' + '&'.join(k + '=' + v for k, v in sorted(dict_args.items())) # 签名加密,并重新加入字典,需要做url编码,否则报4100 signature = quote( base64.b64encode(hmac.new(self.secret_key.encode(), sig_str.encode(), hashlib.sha256).digest()).decode()) dict_args['Signature'] = signature # 拼接请求url, https:// + 请求域名 + 请求路径 + ? + 最终请求参数串 result = '&'.join(k + '=' + v for k, v in sorted(dict_args.items())) req_url = self.req_api_url + result # 使用get请求api,并返回值 reponse = requests.get(req_url.encode('utf-8')) req_info = json.loads(reponse.text) return req_info def all_domain(self): # 查询账号下所有域名及其主机记录 domain_list = self.get_signature(action='DomainList') print('账号下拥有域名总数: ', domain_list['data']['info']['domain_total']) print(' 域名\t%s\t%s' % ('解析条目数量', '主机记录[记录类型]')) # 腾讯云有毒,这个数量有两条默认的@记录,实际数量会加+2,通过api删了之后就会显示负数 for items in domain_list['data']['domains']: print('{0}'.format(items['name']), end='') record_list = self.get_signature(action='RecordList', domain=items['name']) print(' ' * 2, record_list['data']['info']['record_total'], end='') for item in record_list['data']['records']: if item['value'] == 'f1g1ns1.dnspod.net.' or item['value'] == 'f1g1ns2.dnspod.net.': continue print(' {0}[{1}]'.format(item['name'], item['type']), end='') print('\n') def get_recordid_oldip(self, domain, subDomain, get_data='3'): # 获取旧ip或recordid req_data = self.get_signature(action='RecordList', domain=domain, subDomain=subDomain) if not req_data['data']['records']: print('记录值不存在') tmp = req_data['data']['records'][0] if get_data == '1': return tmp['value'] elif get_data == '2': return tmp['id'], tmp['value'] else: return tmp['id'] def check_record(self, domain, subDomain): # 查看解析记录 record_status = self.get_signature(action='RecordList', domain=domain, subDomain=subDomain) if record_status['code'] != 0: # 若域名输入错误,会报错误代码5100,下同 print('域名不存在,请检查域名是否正确') exit() elif not record_status['data']['records']: print('{0}.{1} 域名不存在,请检查域名是否正确'.format(subDomain, domain)) else: for item in record_status['data']['records']: if item['name'] == subDomain: if item['enabled'] == 1: print( '{0}.{1} ---> {2} {4}线路 {3}记录'.format(item['name'], domain, item['value'], item['type'], item['line'])) else: print('解析记录未启用') def add_record(self, domain, subDomain, recordType, recordLine, value): # 添加解析记录 check_exists = self.get_signature(action='RecordList', domain=domain, subDomain=subDomain) if check_exists['code'] != 0: print('域名不存在,检查域名输入是否正确') exit() else: for item in check_exists['data']['records']: if item['value'] == 'f1g1ns1.dnspod.net.' or item['value'] == 'f1g1ns2.dnspod.net.': break elif item['name'] == subDomain and item['value'] == value: print('{1}.{0} 记录已存在,不要重复添加同一条记录'.format(domain, subDomain)) exit() domain_status = self.get_signature(action='RecordCreate', domain=domain, subDomain=subDomain, recordType=recordType, recordLine=recordLine, value=value) if domain_status['code'] != 0: print(domain_status['message']) exit() else: print('增加解析记录\t{0}.{1} ---> {2} {4}线路 {3}记录'.format(subDomain, domain, value, recordType, recordLine)) msg = '增加解析记录\t{0}.{1} ---> {2} {4}线路 {3}记录'.format(subDomain, domain, value, recordType, recordLine, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) with open('TencentDNS.log', 'a+', encoding='utf-8')as f: f.writelines(msg + '\n') def modify_record(self, domain, subDomain, recordType, recordLine, value): # 修改解析记录 record_id, old_ip = self.get_recordid_oldip(domain=domain, subDomain=subDomain, get_data='2') new_record = self.get_signature(domain=domain, subDomain=subDomain, recordType=recordType, recordLine=recordLine, value=value, action='RecordModify', recordId=record_id) if new_record['code'] == 0: print('修改解析记录\t{0}.{1} {2} ---> {3}'.format(subDomain, domain, old_ip, value)) msg = '修改解析记录\t{0}.{1} {2} ---> {3}\t{4}'.format(subDomain, domain, old_ip, value, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) with open('TencentDNS.log', 'a+', encoding='utf-8')as f: f.writelines(msg + '\n') def del_record(self, domain, subDomain): # 删除解析记录 record_id, old_ip = self.get_recordid_oldip(domain=domain, subDomain=subDomain, get_data='2') del_rec = self.get_signature(action='RecordDelete', domain=domain, recordId=record_id) if del_rec['code'] == 0: print('删除解析记录\t{0}.{1} {2}'.format(subDomain, domain, old_ip)) msg = '删除解析记录\t{0}.{1} {2}\t{3}'.format(subDomain, domain, old_ip, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) with open('TencentDNS.log', 'a+', encoding='utf-8')as f: f.writelines(msg + '\n') def AddModifyRemove_domain(obj): # 批量增加、修改域名解析记录 if not os.path.exists('domain_list.txt'): msg = '#文件格式为 域名,子域名,记录类型,线路,记录值 文件从第三行开始读,请不要删除范例\nhello.wang,www,A,默认,192.168.1.1' with open('domain_list.txt', 'w', encoding='utf-8')as f: f.write(msg) print('已创建文件,将域名相关信息写入文件进行批量修改处理') else: with open('domain_list.txt', 'r', encoding='utf-8')as f: lines = f.readlines()[2:] for item in lines: domain = item.strip().split(',')[0] subDomain = item.strip().split(',')[1] recordType = item.strip().split(',')[2] recordLine = item.strip().split(',')[3] value = item.strip().split(',')[4] if deal_action == 'modify': obj.modify_record(domain, subDomain, recordType, recordLine, value) elif deal_action == 'add': obj.add_record(domain, subDomain, recordType, recordLine, value) elif deal_action == 'delete': obj.del_record(domain, subDomain) def script_args(): global deal_action obj = ManageDomain() parse = argparse.ArgumentParser() parse.add_argument('-C', action='store_true', help='查询账号下所有域名及其主机记录') parse.add_argument('-o', action='store_true', help='批量添加domain_list.txt文件中域名信息') parse.add_argument('-O', action='store_true', help='批量修改domain_list.txt文件中域名信息') parse.add_argument('-R', action='store_true', help='批量删除domain_list.txt文件中域名信息') parse.add_argument('-c', action='store_true', help='查询域名解析记录,配合 -d -s 使用') parse.add_argument('-r', action='store_true', help='删除域名解析记录,配合 -d -s 使用') parse.add_argument('-a', action='store_true', help='增加域名解析记录,配合 -d -s -t -l -v 使用') parse.add_argument('-m', action='store_true', help='修改域名解析记录,配合 -d -s -t -l -v 使用') parse.add_argument('-d', nargs=1, type=str, help='域名') parse.add_argument('-s', nargs=1, type=str, help='主机记录') type_list = ['A', 'CNAME', 'TXT', 'MX', 'NS'] parse.add_argument('-t', nargs=1, type=str, choices=type_list, default=['A'], help='记录类型[可选,默认为A类型]') line_list = ['默认', '境内', '境外', '电信', '联通', '移动'] parse.add_argument('-l', nargs=1, type=str, choices=line_list, default=['默认'], help='线路[可选,默认为默认线路]') parse.add_argument('-v', nargs=1, type=str, help='记录值') args = parse.parse_args() if sys.argv == 1: parse.print_help() exit() if args.C: obj.all_domain() exit() elif args.o: deal_action = 'add' AddModifyRemove_domain(obj) exit() elif args.O: deal_action = 'modify' AddModifyRemove_domain(obj) exit() elif args.R: deal_action = 'delete' AddModifyRemove_domain(obj) exit() elif args.r and args.d and args.s: domain, subDomain = args.d[0], args.s[0] obj.del_record(domain, subDomain) elif args.c and args.d and args.s: domain, subDomain = args.d[0], args.s[0] obj.check_record(domain, subDomain) elif args.a and args.d and args.s and args.t and args.l and args.v: domain, subDomain, recordType, recordLine, value = args.d[0], args.s[0], args.t[0], args.l[0], args.v[0] obj.add_record(domain, subDomain, recordType, recordLine, value) elif args.m and args.d and args.s and args.t and args.l and args.v: domain, subDomain, recordType, recordLine, value = args.d[0], args.s[0], args.t[0], args.l[0], args.v[0] obj.modify_record(domain, subDomain, recordType, recordLine, value) else: print('参数错误/缺少必要参数,请查看使用帮助') parse.print_help() if __name__ == '__main__': script_args() # AddModify_domain() # obj.check_record('hello.wang', 'www') # obj.add_record('hello.wang', 'www', 'A', '默认', '192.168.1.5') # obj.del_record('hello.wang', 'www') # obj.modify_record('hello.wang', 'www', 'A', '默认', '192.168.1.1')