首页
/ 秒杀级库存监控:用Scrapy实现电商商品实时追踪

秒杀级库存监控:用Scrapy实现电商商品实时追踪

2026-02-05 04:51:03作者:薛曦旖Francesca

你是否遇到过这样的情况:购物车里的商品突然售罄,心仪的限量款永远抢不到?作为运营人员或普通消费者,错过库存变动可能意味着直接的经济损失。本文将带你用Scrapy(一种快速高级的网页爬取框架)构建一个实时库存监控系统,无需复杂编程知识,15分钟即可完成部署。读完本文你将获得:

  • 零代码实现商品库存监控的完整方案
  • 自动推送库存变动通知的配置方法
  • 避开反爬机制的实用技巧
  • 多平台监控扩展指南

系统工作原理

Scrapy库存监控系统通过定期抓取目标商品页面,提取库存状态信息并与历史数据比对,实现实时变动检测。其核心工作流程如下:

graph LR
    A[启动监控] --> B[配置目标商品URL]
    B --> C[设置抓取间隔]
    C --> D[运行Scrapy爬虫]
    D --> E[提取库存数据]
    E --> F{库存变动?}
    F -->|是| G[触发通知]
    F -->|否| H[等待下一轮抓取]
    G --> H
    H --> D

关键技术组件包括:

  • Spider(爬虫):定义抓取规则和数据提取方式
  • Selector(选择器):精确定位页面中的库存元素
  • Pipeline(管道):处理提取的数据并执行比对
  • Scheduler(调度器):控制抓取频率和并发策略

快速开始:15分钟搭建监控系统

环境准备

首先确保已安装Python和Scrapy。如果尚未安装,执行以下命令:

pip install scrapy

通过官方文档验证安装是否成功:安装指南

创建监控项目

  1. 创建新项目:
scrapy startproject inventory_monitor
cd inventory_monitor

项目结构将自动生成,关键文件包括:

  • inventory_monitor/spiders/:存放爬虫代码
  • inventory_monitor/items.py:定义数据结构
  • inventory_monitor/pipelines.py:实现数据处理逻辑
  • inventory_monitor/settings.py:配置系统参数
  1. 创建商品爬虫:

spiders目录下新建product_spider.py文件:

import scrapy
from datetime import datetime
from inventory_monitor.items import InventoryItem

class ProductSpider(scrapy.Spider):
    name = "product"
    allowed_domains = ["example.com"]  # 替换为实际域名
    start_urls = ["http://example.com/product-page"]  # 替换为目标商品URL
    custom_settings = {
        "DOWNLOAD_DELAY": 60,  # 抓取间隔(秒)
        "CONCURRENT_REQUESTS": 1,  # 避免触发反爬
        "LOG_LEVEL": "INFO"
    }

    def parse(self, response):
        item = InventoryItem()
        # 提取商品信息(根据实际页面结构调整选择器)
        item["url"] = response.url
        item["timestamp"] = datetime.now().isoformat()
        item["product_name"] = response.css("h1.product-title::text").get().strip()
        
        # 提取库存状态(以下为示例选择器,需根据实际页面调整)
        stock_status = response.css("div.stock-status::text").get()
        item["in_stock"] = "有货" in stock_status or "库存" in stock_status
        
        # 提取库存数量(如果页面显示具体数字)
        item["stock_quantity"] = response.xpath("//span[@class='quantity']/text()").re_first(r'\d+')
        
        yield item

定义数据结构

编辑items.py文件,定义存储库存数据的结构:

import scrapy

class InventoryItem(scrapy.Item):
    url = scrapy.Field()
    timestamp = scrapy.Field()
    product_name = scrapy.Field()
    in_stock = scrapy.Field()
    stock_quantity = scrapy.Field()
    price = scrapy.Field()  # 可选:同时监控价格变动

实现库存比对逻辑

编辑pipelines.py文件,添加库存变动检测功能:

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import json
import os

class InventoryPipeline:
    def __init__(self):
        self.history_file = "inventory_history.json"
        self.load_history()

    def load_history(self):
        if os.path.exists(self.history_file):
            with open(self.history_file, "r") as f:
                self.history = json.load(f)
        else:
            self.history = {}

    def save_history(self):
        with open(self.history_file, "w") as f:
            json.dump(self.history, f, indent=2)

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        url = adapter["url"]
        current_stock = adapter["in_stock"]
        
        # 检查历史记录
        if url in self.history:
            prev_stock = self.history[url]["in_stock"]
            if prev_stock != current_stock:
                # 库存状态变动,触发通知
                self.send_notification(adapter)
        
        # 更新历史记录
        self.history[url] = adapter.asdict()
        self.save_history()
        
        return item

    def send_notification(self, item):
        # 实现通知逻辑(邮件、短信等)
        product_name = item["product_name"]
        status = "有货了!" if item["in_stock"] else "已售罄"
        message = f"【库存变动通知】{product_name} - {status}"
        
        # 这里可以添加邮件发送代码或调用第三方通知API
        self.spider.logger.info(message)

配置系统参数

修改settings.py文件,启用管道并配置关键参数:

# 启用库存管道
ITEM_PIPELINES = {
    "inventory_monitor.pipelines.InventoryPipeline": 300,
}

# 下载延迟(秒),控制抓取频率
DOWNLOAD_DELAY = 60

# 并发请求数,避免给服务器造成压力
CONCURRENT_REQUESTS = 1

# 启用自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0

# 启用Cookie支持,某些网站需要登录才能查看库存
COOKIES_ENABLED = True

详细的设置说明可参考:设置文档

启动监控系统

执行以下命令启动监控:

scrapy crawl product -o inventory_log.jsonl

系统将开始定期抓取目标页面并记录库存状态。日志文件inventory_log.jsonl将保存所有抓取记录。

高级配置:精准定位库存元素

选择器使用指南

要准确提取库存信息,需要使用Scrapy的选择器功能精确定位页面元素。Scrapy支持CSS和XPath两种选择器语法。

例如,对于以下HTML结构:

<div class="product-info">
    <h1>限量版运动鞋</h1>
    <div class="price">¥999</div>
    <div class="stock-status">
        <span class="availability in-stock">有货</span>
        <span class="quantity">剩余:3件</span>
    </div>
</div>

使用CSS选择器提取库存状态:

# 提取库存状态文本
stock_status = response.css("div.stock-status span.availability::text").get()

# 提取剩余数量
quantity = response.css("div.stock-status span.quantity::text").re_first(r'剩余:(\d+)件')

使用XPath选择器实现同样功能:

# 提取库存状态文本
stock_status = response.xpath("//div[@class='stock-status']/span[@class='availability']/text()").get()

# 提取剩余数量
quantity = response.xpath("//div[@class='stock-status']/span[@class='quantity']/text()").re_first(r'剩余:(\d+)件')

更多选择器使用技巧请参考:选择器文档

处理动态内容

对于使用JavaScript动态加载库存的页面,需要启用JavaScript渲染。推荐使用Splash服务,配置方法如下:

  1. 安装Splash:
docker run -p 8050:8050 scrapinghub/splash
  1. 安装Scrapy-Splash插件:
pip install scrapy-splash
  1. settings.py中添加配置:
SPLASH_URL = 'http://localhost:8050'
DOWNLOADER_MIDDLEWARES = {
    'scrapy_splash.SplashCookiesMiddleware': 723,
    'scrapy_splash.SplashMiddleware': 725,
    'scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware': 810,
}
SPIDER_MIDDLEWARES = {
    'scrapy_splash.SplashDeduplicateArgsMiddleware': 100,
}
DUPEFILTER_CLASS = 'scrapy_splash.SplashAwareDupeFilter'

详细配置说明参见:SplashPipeline文档

反爬策略:避免被目标网站屏蔽

为确保监控系统长期稳定运行,需要采取适当的反爬措施:

合理设置抓取频率

通过调整DOWNLOAD_DELAYAUTOTHROTTLE_*参数,模拟人类浏览行为:

# settings.py
DOWNLOAD_DELAY = 60  # 每次请求间隔60秒
AUTOTHROTTLE_ENABLED = True  # 启用自动限速
AUTOTHROTTLE_START_DELAY = 5  # 初始延迟
AUTOTHROTTLE_MAX_DELAY = 60  # 最大延迟
AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0  # 目标并发数

自动限速功能详细说明:自动限速

随机User-Agent

配置随机User-Agent中间件,避免被识别为爬虫:

  1. 安装插件:
pip install scrapy-fake-useragent
  1. 修改配置:
# settings.py
DOWNLOADER_MIDDLEWARES = {
    'scrapy.downloadermiddlewares.useragent.UserAgentMiddleware': None,
    'scrapy_fake_useragent.middleware.RandomUserAgentMiddleware': 400,
}
FAKEUSERAGENT_PROVIDERS = [
    'scrapy_fake_useragent.providers.FakeUserAgentProvider',
    'scrapy_fake_useragent.providers.FakerProvider',
    'scrapy_fake_useragent.providers.FixedUserAgentProvider',
]
# 可选:设置一个固定的备用User-Agent
FAKEUSERAGENT_FALLBACK = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'

使用代理IP

如果监控多个网站或频繁抓取,考虑使用代理IP池:

# 在Spider中设置代理
def start_requests(self):
    for url in self.start_urls:
        yield scrapy.Request(url, meta={
            'proxy': 'http://user:pass@proxy_ip:port'
        })

代理中间件详细说明:HTTP代理中间件

数据持久化与可视化

存储到数据库

通过修改Pipeline,可将库存数据存储到MySQL、MongoDB等数据库中。以下是MongoDB存储示例:

# pipelines.py
import pymongo
from itemadapter import ItemAdapter

class MongoDBPipeline:
    collection_name = 'inventory'

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get('MONGO_URI'),
            mongo_db=crawler.settings.get('MONGO_DATABASE', 'inventory')
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def close_spider(self, spider):
        self.client.close()

    def process_item(self, item, spider):
        self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
        return item

启用MongoDB管道:

# settings.py
ITEM_PIPELINES = {
    "inventory_monitor.pipelines.InventoryPipeline": 300,
    "inventory_monitor.pipelines.MongoDBPipeline": 400,
}
MONGO_URI = "mongodb://localhost:27017/"
MONGO_DATABASE = "inventory_monitor"

MongoDB管道详细实现:MongoDB Pipeline

生成库存趋势图表

使用Matplotlib生成库存历史趋势图:

import pandas as pd
import matplotlib.pyplot as plt

# 读取JSON日志文件
df = pd.read_json("inventory_log.jsonl", lines=True)
df['timestamp'] = pd.to_datetime(df['timestamp'])

# 按商品分组绘制库存趋势
for product, group in df.groupby('product_name'):
    plt.figure(figsize=(10, 5))
    plt.plot(group['timestamp'], group['stock_quantity'].astype(int))
    plt.title(f'{product} 库存趋势')
    plt.xlabel('时间')
    plt.ylabel('库存数量')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(f'{product}_trend.png')
    plt.close()

多商品监控与通知系统

批量监控配置

修改爬虫以支持多个商品同时监控:

# product_spider.py
class ProductSpider(scrapy.Spider):
    name = "product"
    allowed_domains = ["example.com"]
    
    # 从外部文件加载商品列表
    def start_requests(self):
        with open("products.txt", "r") as f:
            urls = [line.strip() for line in f if line.strip()]
            
        for url in urls:
            yield scrapy.Request(url, callback=self.parse)

创建products.txt文件,每行一个商品URL:

http://example.com/product1
http://example.com/product2
http://example.com/product3

通知方式扩展

邮件通知

使用Python内置smtplib库实现邮件通知:

# pipelines.py
import smtplib
from email.mime.text import MIMEText

def send_email(self, subject, message):
    # 邮件配置
    smtp_server = "smtp.example.com"
    smtp_port = 587
    smtp_username = "your_email@example.com"
    smtp_password = "your_password"
    recipient = "recipient@example.com"
    
    msg = MIMEText(message)
    msg['Subject'] = subject
    msg['From'] = smtp_username
    msg['To'] = recipient
    
    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()
        server.login(smtp_username, smtp_password)
        server.send_message(msg)

微信通知

通过企业微信API发送消息:

# pipelines.py
import requests

def send_wechat_notification(self, message):
    corp_id = "your_corp_id"
    corp_secret = "your_corp_secret"
    agent_id = 1000001
    to_user = "@all"
    
    # 获取access_token
    token_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corp_id}&corpsecret={corp_secret}"
    token = requests.get(token_url).json()["access_token"]
    
    # 发送消息
    msg_url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}"
    data = {
        "touser": to_user,
        "msgtype": "text",
        "agentid": agent_id,
        "text": {"content": message}
    }
    requests.post(msg_url, json=data)

常见问题与解决方案

页面结构变化导致监控失效

问题:目标网站更新页面结构,导致选择器无法定位库存元素。

解决方案:使用更健壮的选择器,结合多个属性进行定位:

# 更健壮的选择器示例
stock_selector = """
    //div[contains(@class, 'product-info')]
    //*[contains(text(), '库存') or contains(text(), '剩余')]
    /following-sibling::span/text()
"""

定期检查和更新选择器,或使用机器学习方法自动识别库存元素。

被目标网站屏蔽

症状:爬虫开始返回403错误或验证码页面。

解决方案

  1. 增加抓取间隔
  2. 启用代理IP轮换
  3. 完善Cookie和Session管理
  4. 使用验证码识别服务(如打码平台)

反爬策略详细指南:反爬最佳实践

库存数据不准确

原因:页面缓存、地区差异或动态加载问题。

解决方案

  1. 禁用缓存:request.meta['dont_cache'] = True
  2. 设置适当的请求头:
headers = {
    "Cache-Control": "no-cache",
    "Pragma": "no-cache",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
yield scrapy.Request(url, headers=headers)
  1. 使用Splash进行完整页面渲染

总结与扩展

通过本文介绍的方法,你已掌握使用Scrapy构建商品库存监控系统的核心技术。该系统不仅适用于电商商品,还可扩展到任何需要监控网页数据变化的场景。

进阶学习资源

系统扩展方向

  1. 分布式监控:使用Scrapy-Redis实现多节点分布式监控
  2. 智能预测:基于历史数据预测库存变化趋势
  3. 自动下单:结合Selenium实现库存可用时自动下单
  4. 多源数据融合:整合多个平台的价格和库存信息

通过不断优化和扩展,这个基础框架可以发展成为功能强大的电商数据监控平台,为你的购物决策提供有力支持。

祝你抢购成功!如有任何问题,欢迎查阅项目文档或提交Issue。

登录后查看全文
热门项目推荐
相关项目推荐