流量轉發 之 Cloudflare更新IP記錄

Python · 04-19 · 283 人浏览
流量轉發 之 Cloudflare更新IP記錄

該脚本需要安裝 requests、ping3 模塊。

pip3 install requests
pip3 install ping3

这个脚本的功能包括:

  1. 检查指定的 URL 是否可达,如果全部可达则等待 5 分钟后重新检查,否则执行其他代码。
  2. 如果有 URL 不可达,则执行其他代码,其中包括对指定 IP 地址进行 ping 测试,并找到延迟最小的 IP。
  3. 使用 Cloudflare API 更新指定域名的 DNS 记录,将延迟最小的 IP 设置为域名的 A 记录。

假設你有一個大陸VPS,通過該脚本可以測試中轉域名的IP是否可用,若不可用,則更新IP(自動設置為延遲最低的IP)。

import requests
import ping3
import time

api_key = ''
email = ""
domain = ''
prefix = ''

def check_urls(target_urls):
    for target_url in target_urls:
        try:
            response = requests.get(target_url, timeout=2)
            if response.status_code == 200:
                print(f"Request to {target_url} successful. Status code: {response.status_code}")
            else:
                print(f"Request to {target_url} failed with status code: {response.status_code}")
                return False
        except Exception as e:
            print(f"Error while requesting {target_url}: {e}")
            return False
    return True

def get_zone_id(domain):
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': api_key,
        'Content-Type': 'application/json'
    }
    url = f'https://api.cloudflare.com/client/v4/zones?name={domain}'
    response = requests.get(url, headers=headers)
    return response.json()['result'][0]['id']

def get_record_id(domain, prefix):
    zone_id = get_zone_id(domain)  # 获取区域ID
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': api_key,
        'Content-Type': 'application/json'
    }
    url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records'
    response = requests.get(url, headers=headers)
    records = response.json()['result']
    for record in records:
        if record['name'] == f"{prefix}.{domain}":
            return record['id']
    return None

def DNS_Update(domain, record_type, prefix, record_content):
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': api_key,
        'Content-Type': 'application/json'
    }
    zone_id = get_zone_id(domain)
    config_domain = prefix + '.' + domain
    record_id = get_record_id(domain, prefix)  # 获取记录ID

    url = f'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records'

    # 构建数据
    data = {
        'type': record_type,
        'name': config_domain,
        'content': record_content,
        'ttl': 120,
        'proxied': False
    }

    # 如果记录已存在,则使用PUT请求更新记录
    if record_id:
        url += f'/{record_id}'
        response = requests.put(url, headers=headers, json=data)
    else:
        # 否则,使用POST请求添加新记录
        response = requests.post(url, headers=headers, json=data)

    # 检查响应状态码并打印结果
    if response.status_code == 200:
        print('Successfully updated/added:', config_domain, record_type, record_content)
    else:
        print('Failed to update/add:', config_domain, record_type, record_content)

def test_ping(target_ip):
    try:
        # 发送 ICMP ping 请求并获取延迟
        delay = ping3.ping(target_ip)

        if delay is not None:
            print(f"Ping to {target_ip} successful. Delay: {delay} ms")
            return delay
        else:
            print(f"No response from {target_ip}.")
            return 999
    except Exception as e:
        print(f"Error while pinging {target_ip}: {e}")
        return 999

if __name__ == "__main__":
    target_ips = ["8.8.8.8", '1.1.1.1']
    target_urls = ['https://gosogle.com', 'https://yosutube.com']

    # 检查 URL 是否正常,如果全部正常则等待 5 分钟后重复运行,否则执行其他代码
    while True:
        if check_urls(target_urls):
            print("All URLs are reachable. Waiting for 5 minutes before rechecking.")
            time.sleep(300)  # 5 minutes
        else:
            print("Some URLs are not reachable. Running other code.")

            delays = []
            min_delay_ip = None  # 用于存储当前最小延迟的IP
            min_delay = float('inf')  # 初始设定为无穷大

            for target_ip in target_ips:
                # 执行 ping 测试
                delay = test_ping(target_ip)
                delays.append(delay)

                # 更新最小延迟的IP
                if delay < min_delay:
                    min_delay = delay
                    min_delay_ip = target_ip

            print(f"The IP with the minimum delay is: {min_delay_ip} with delay: {min_delay} ms")
            DNS_Update(domain, 'A', prefix, min_delay_ip)
本站立足于美利堅合衆國,請讀者自覺遵守當地法律!如有違規,本站不承擔任何法律責任! This site is based in the United States of America, readers are requested to abide by local laws! If there are any violations, this site does not bear any legal responsibility! Theme Jasmine by Kent Liao