文章

🍎Apple 库存监控/飞书推送

参数说明

  • searchNearby=true (是否查询周边店铺)

  • store=R359(主直营店)

  • parts.0=MYM43CH/A、parts.1、parts.2...(要查询库存的型号列表)

  • mts.0=compact (Message字段用紧凑模式)

代码示例

import json
import random
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

import requests


def req():
    url = "https://www.apple.com.cn/shop/fulfillment-messages?searchNearby=true&store=R359&mts.0=compact&parts.0=MYLN3CH/A&parts.1=MYM43CH/A&parts.2=MYM53CH/A&parts.3=MYLT3CH/A&parts.4=MYLR3CH/A&parts.5=MYLQ3CH/A&parts.6=MYM23CH/A&parts.7=MYLP3CH/A&parts.8=MYLX3CH/A&parts.9=MYM83CH/A&parts.10=MYLW3CH/A&parts.11=MYM73CH/A&parts.12=MYLV3CH/A&parts.13=MYM63CH/A&parts.14=MYLU3CH/A&parts.15=MYLY3CH/A&parts.16=MYTR3CH/A&parts.17=MYTQ3CH/A&parts.18=MYTP3CH/A&parts.19=MYTW3CH/A&parts.20=MYTT3CH/A&parts.21=MYTY3CH/A&parts.22=MYTX3CH/A&parts.23=MYW03CH/A&parts.24=MYTN3CH/A&parts.25=MYTM3CH/A&parts.26=MYW23CH/A&parts.27=MYW13CH/A&pl=true"
    user_agent = [
        "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
        "Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
        "Mozilla/5.0 (iPad; U; CPU OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
    ]
    headers = {
        'User-Agent': random.choice(user_agent),
        'Host': "www.apple.com.cn",
        'Accept': "*/*"
    }

    response = requests.request("GET", url, headers=headers)
    return response.json()


def parse(arg):
    pickup_message = arg["body"]["content"]["pickupMessage"]
    available_store_and_model_list = []
    for store in pickup_message["stores"]:
        store_name = store['storeName']
        parts_availability = store['partsAvailability']
        for model, info in parts_availability.items():
            compact_msg = info['messageTypes']['compact']
            phone_title = compact_msg['storePickupProductTitle']
            buy_ability = info['buyability']
            if (buy_ability['isBuyable']
                    and compact_msg['storeSelectionEnabled'] == True
                    and info['pickupDisplay'] == 'available'):
                _available_item = {
                    "store_name": store_name,
                    "model": model,
                    "phone_title": phone_title,
                    "store_pickup_quote": compact_msg["storePickupQuote"],
                }
                available_store_and_model_list.append(_available_item)
    return available_store_and_model_list


# 使用飞书机器人的 Webhook URL 发送一条文本消息
webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/{id}"


def alarm(message_content: str):
    # 构造请求数据
    data = {
        "msg_type": "text",
        "content": {
            "text": f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 库存监控]\n\n"
                    f"{message_content}\n"
        }
    }

    # 发送 POST 请求
    headers = {'Content-Type': 'application/json'}
    requests.post(webhook_url, headers=headers, data=json.dumps(data))


def run():
    while True:
        try:
            response = req()
            available_store_and_model = parse(response)
            message = []
            max_message = []
            if len(available_store_and_model) == 0:
                message.append("暂无库存")
            else:
                for item in available_store_and_model:
                    if 'Max' in item['phone_title']:
                        max_message.append(
                        f"· 店铺名称:{item['store_name']} 型号:{item['phone_title']} 取货信息:{item['store_pickup_quote']} ")
                    else:
                        message.append(
                            f"· 店铺名称:{item['store_name']} 型号:{item['phone_title']} 取货信息:{item['store_pickup_quote']} ")
                message.append("<at user_id=\"all\"></at>")
            max_message_str = '\n'.join(max_message)
            message_str = '\n'.join(message)
            final_msg = f"Pro Max:\n{max_message_str}\n--------------------\nPro:\n{message_str}"
            alarm(final_msg)
            print(message)
            time.sleep(random.randint(5, 10))
        except Exception as e:
            traceback.print_exc()


with ThreadPoolExecutor(max_workers=2) as executor:
    for i in range(2):
        time.sleep(10)
        executor.submit(run)

License:  CC BY 4.0