小红书数据采集终极指南:如何高效构建Python自动化工具
2026/6/13 1:27:02 网站建设 项目流程

小红书数据采集终极指南:如何高效构建Python自动化工具

【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs

小红书作为国内领先的生活方式分享平台,汇聚了海量用户生成内容,对于数据分析师、营销从业者和开发者来说,小红书数据采集已成为获取市场洞察的重要途径。xhs 是一个基于小红书 Web 端请求封装的 Python SDK,提供了完整的小红书数据采集解决方案,让开发者能够高效、稳定地获取平台公开数据。本文将从项目概述、快速入门、核心功能到实战应用,全面解析如何使用 xhs 构建专业的数据采集系统。🚀

项目概述与价值定位

xhs 项目是一个专门为小红书平台设计的 Python 数据采集工具包,它封装了复杂的网络请求和签名逻辑,提供了简洁易用的 API 接口。通过这个工具,开发者可以轻松获取小红书笔记内容、用户信息、搜索数据等公开信息,为内容分析、竞品研究和市场趋势预测提供数据支持。

核心优势对比

特性xhs SDK传统爬虫官方API
易用性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
稳定性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
功能完整性⭐⭐⭐⭐⭐⭐⭐
维护成本
合规性中等

快速入门指南

安装与配置

安装 xhs 非常简单,可以通过 pip 直接安装:

# 安装最新版本 pip install xhs # 或者从源码安装 git clone https://gitcode.com/gh_mirrors/xh/xhs cd xhs pip install -e .

基础使用示例

让我们从最简单的示例开始,展示如何快速获取小红书笔记数据:

from xhs import XhsClient import datetime # 初始化客户端 cookie = "your_cookie_string" xhs_client = XhsClient(cookie) # 获取指定笔记详情 note_id = "6505318c000000001f03c5a6" note = xhs_client.get_note_by_id(note_id) print(f"笔记标题:{note['title']}") print(f"作者:{note['user']['nickname']}") print(f"点赞数:{note['likes']}") print(f"收藏数:{note['collects']}") print(f"发布时间:{datetime.datetime.fromtimestamp(note['time']/1000)}")

核心功能详解

1. 多维度搜索功能

xhs 提供了强大的搜索功能,支持多种搜索条件和排序方式:

from xhs import SearchSortType, SearchNoteType # 初始化客户端 xhs_client = XhsClient(cookie) # 综合搜索示例 search_results = xhs_client.search( keyword="Python编程", sort=SearchSortType.GENERAL, note_type=SearchNoteType.VIDEO, page=1 ) # 分析搜索结果 for result in search_results['items'][:5]: # 前5条结果 print(f"笔记ID:{result['id']}") print(f"标题:{result['title']}") print(f"互动数据 - 点赞:{result['likes']}, 收藏:{result['collects']}") print(f"用户:{result['user']['nickname']}") print("-" * 50)

2. 内容分类浏览系统

xhs 支持按内容分类获取推荐流,覆盖小红书主要内容领域:

from xhs import FeedType # 获取不同分类的内容 feed_types = { "美食": FeedType.FOOD, "穿搭": FeedType.FASION, "旅行": FeedType.TRAVEL, "健身": FeedType.FITNESS, "游戏": FeedType.GAME } for category_name, feed_type in feed_types.items(): print(f"\n获取{category_name}分类内容...") notes = xhs_client.get_home_feed(feed_type=feed_type) # 分析热门内容 top_notes = sorted(notes['items'], key=lambda x: x['likes'], reverse=True)[:3] for note in top_notes: print(f" - {note['title'][:30]}... (点赞: {note['likes']})")

3. 用户信息获取

除了笔记内容,xhs 还支持获取用户相关信息:

# 获取用户信息 user_info = xhs_client.get_user_info(user_id="用户ID") print(f"用户昵称:{user_info['nickname']}") print(f"粉丝数:{user_info['fans_count']}") print(f"笔记数:{user_info['notes_count']}") print(f"个人简介:{user_info['desc']}")

核心源码:xhs/core.py 包含了所有核心功能的实现

实战应用场景

场景一:内容趋势分析平台

构建一个自动化内容趋势分析系统,帮助品牌发现热门话题:

import pandas as pd from collections import Counter from datetime import datetime, timedelta class ContentTrendAnalyzer: def __init__(self, xhs_client): self.xhs_client = xhs_client self.trend_data = [] def collect_trending_data(self, keyword, days=7): """收集指定关键词的趋势数据""" for day in range(days): target_date = datetime.now() - timedelta(days=day) # 搜索相关笔记 results = self.xhs_client.search( keyword=keyword, sort=SearchSortType.GENERAL, page=1 ) for note in results['items']: self.trend_data.append({ 'date': target_date.date(), 'keyword': keyword, 'title': note['title'], 'likes': note['likes'], 'collects': note['collects'], 'comments': note['comments'], 'user': note['user']['nickname'] }) return pd.DataFrame(self.trend_data) def analyze_trend_patterns(self, df): """分析趋势模式""" # 按日期统计 daily_stats = df.groupby('date').agg({ 'likes': 'mean', 'collects': 'mean', 'comments': 'mean' }).reset_index() # 热门关键词提取 all_titles = ' '.join(df['title'].tolist()) word_counts = Counter(all_titles.split()) trending_keywords = word_counts.most_common(10) return { 'daily_stats': daily_stats, 'trending_keywords': trending_keywords, 'total_notes': len(df), 'avg_engagement': df['likes'].mean() }

场景二:竞品监控系统

为市场营销团队构建竞品内容监控系统:

import schedule import time from datetime import datetime class CompetitorMonitor: def __init__(self, competitors, xhs_client): self.competitors = competitors self.xhs_client = xhs_client self.monitoring_data = {} def setup_monitoring_schedule(self): """设置监控计划""" # 每小时监控一次 schedule.every(1).hours.do(self.monitor_all_competitors) # 每天生成报告 schedule.every().day.at("09:00").do(self.generate_daily_report) def monitor_all_competitors(self): """监控所有竞品""" print(f"开始竞品监控 - {datetime.now()}") for competitor in self.competitors: try: self.monitor_competitor(competitor) time.sleep(2) # 避免请求过快 except Exception as e: print(f"监控 {competitor} 时出错: {e}") def monitor_competitor(self, competitor_name): """监控单个竞品""" # 搜索竞品相关内容 search_results = self.xhs_client.search( keyword=competitor_name, sort=SearchSortType.TIME_DESC ) # 分析最新内容 latest_posts = search_results['items'][:10] for post in latest_posts: post_id = post['id'] # 获取详细数据 note_detail = self.xhs_client.get_note_by_id(post_id) # 计算互动率 engagement_rate = self.calculate_engagement_rate(note_detail) # 存储数据 if competitor_name not in self.monitoring_data: self.monitoring_data[competitor_name] = [] self.monitoring_data[competitor_name].append({ 'post_id': post_id, 'title': post['title'], 'engagement_rate': engagement_rate, 'timestamp': datetime.now(), 'likes': note_detail['likes'], 'collects': note_detail['collects'], 'comments': note_detail['comments'] }) print(f"✓ 监控到 {competitor_name} 的新内容: {post['title'][:50]}...")

场景三:内容质量评估系统

构建内容质量评估系统,帮助内容创作者优化发布策略:

class ContentQualityAnalyzer: def __init__(self, xhs_client): self.xhs_client = xhs_client def analyze_content_quality(self, note_id): """分析内容质量""" note = self.xhs_client.get_note_by_id(note_id) quality_score = 0 quality_factors = [] # 1. 互动率评估 engagement_score = self.calculate_engagement_score(note) quality_score += engagement_score quality_factors.append(f"互动得分: {engagement_score}") # 2. 内容完整性评估 completeness_score = self.evaluate_content_completeness(note) quality_score += completeness_score quality_factors.append(f"完整度得分: {completeness_score}") # 3. 时效性评估 timeliness_score = self.evaluate_timeliness(note) quality_score += timeliness_score quality_factors.append(f"时效性得分: {timeliness_score}") # 4. 多媒体质量评估 media_score = self.evaluate_media_quality(note) quality_score += media_score quality_factors.append(f"媒体质量得分: {media_score}") return { 'total_score': quality_score, 'factors': quality_factors, 'recommendations': self.generate_recommendations(quality_score, quality_factors) } def calculate_engagement_score(self, note): """计算互动得分""" likes = note.get('likes', 0) collects = note.get('collects', 0) comments = note.get('comments', 0) # 加权计算互动得分 engagement_score = (likes * 0.4 + collects * 0.3 + comments * 0.3) / 100 return min(engagement_score, 25) # 最高25分 def generate_recommendations(self, score, factors): """生成优化建议""" recommendations = [] if score < 60: recommendations.append("建议增加互动引导,如提问或投票") recommendations.append("优化标题和封面图,提高点击率") recommendations.append("增加内容深度,提供更多实用信息") elif score < 80: recommendations.append("继续保持内容质量") recommendations.append("尝试不同内容形式,如视频或图文结合") recommendations.append("增加发布时间频率") else: recommendations.append("内容质量优秀,继续保持") recommendations.append("可以考虑系列化内容创作") recommendations.append("与粉丝互动,建立社区") return recommendations

性能优化与最佳实践

1. 请求频率控制策略

为了避免被平台限制,需要合理控制请求频率:

import time from functools import wraps import random class RateLimiter: def __init__(self, max_calls=5, period=60): self.max_calls = max_calls self.period = period self.calls = [] def __call__(self, func): @wraps(func) def wrapper(*args, **kwargs): now = time.time() # 清理过期记录 self.calls[:] = [t for t in self.calls if t > now - self.period] # 检查是否超过限制 if len(self.calls) >= self.max_calls: sleep_time = self.period - (now - self.calls[0]) print(f"⚠️ 达到频率限制,等待 {sleep_time:.1f} 秒") time.sleep(sleep_time + random.uniform(0.5, 1.5)) self.calls.clear() # 记录本次调用 self.calls.append(time.time()) # 添加随机延迟,模拟人类行为 time.sleep(random.uniform(0.5, 2)) return func(*args, **kwargs) return wrapper # 使用装饰器 @RateLimiter(max_calls=3, period=60) def safe_search(keyword, page=1): """安全的搜索函数""" return xhs_client.search(keyword=keyword, page=page)

2. 错误处理与重试机制

健壮的错误处理是数据采集系统的关键:

import logging from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RobustXhsClient: def __init__(self, cookie): self.xhs_client = XhsClient(cookie) self.session = self._create_session() def _create_session(self): """创建带重试机制的会话""" session = requests.Session() # 配置重试策略 retry_strategy = requests.adapters.Retry( total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"] ) adapter = requests.adapters.HTTPAdapter( max_retries=retry_strategy, pool_connections=10, pool_maxsize=10 ) session.mount('http://', adapter) session.mount('https://', adapter) return session @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((requests.exceptions.RequestException, DataFetchError)) ) def get_note_with_retry(self, note_id): """带重试的笔记获取""" try: logger.info(f"尝试获取笔记: {note_id}") note = self.xhs_client.get_note_by_id(note_id) logger.info(f"成功获取笔记: {note_id}") return note except Exception as e: logger.error(f"获取笔记失败: {note_id}, 错误: {e}") raise

3. 数据存储优化方案

import sqlite3 import json from datetime import datetime from contextlib import contextmanager class XhsDataStorage: def __init__(self, db_path="xhs_data.db"): self.db_path = db_path self._init_database() @contextmanager def get_connection(self): """获取数据库连接上下文管理器""" conn = sqlite3.connect(self.db_path) try: yield conn finally: conn.close() def _init_database(self): """初始化数据库结构""" with self.get_connection() as conn: cursor = conn.cursor() # 创建笔记表 cursor.execute(''' CREATE TABLE IF NOT EXISTS notes ( id TEXT PRIMARY KEY, title TEXT, content TEXT, user_id TEXT, likes INTEGER DEFAULT 0, collects INTEGER DEFAULT 0, comments INTEGER DEFAULT 0, publish_time DATETIME, category TEXT, tags TEXT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建用户表 cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( user_id TEXT PRIMARY KEY, nickname TEXT, avatar TEXT, notes_count INTEGER DEFAULT 0, fans_count INTEGER DEFAULT 0, following_count INTEGER DEFAULT 0, desc TEXT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_notes_publish_time ON notes(publish_time)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_notes_likes ON notes(likes)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_notes_category ON notes(category)') conn.commit() def save_note(self, note_data): """保存笔记数据""" with self.get_connection() as conn: cursor = conn.cursor() # 提取标签 tags = note_data.get('tags', []) tags_str = ','.join(tags) if tags else '' cursor.execute(''' INSERT OR REPLACE INTO notes (id, title, content, user_id, likes, collects, comments, publish_time, category, tags, raw_data, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( note_data['id'], note_data.get('title', ''), note_data.get('desc', ''), note_data['user']['user_id'], note_data.get('likes', 0), note_data.get('collects', 0), note_data.get('comments', 0), datetime.fromtimestamp(note_data['time']/1000) if 'time' in note_data else None, note_data.get('category', ''), tags_str, json.dumps(note_data, ensure_ascii=False) )) conn.commit()

常见问题排错

问题一:签名验证失败

症状:请求返回签名错误或验证失败

解决方案

  1. 检查 cookie 是否过期或无效
  2. 验证签名函数逻辑是否正确
  3. 适当增加延迟避免频繁请求
def enhanced_sign(uri, data=None, a1="", web_session=""): """增强版签名函数""" import time from playwright.sync_api import sync_playwright for retry in range(3): try: with sync_playwright() as playwright: browser = playwright.chromium.launch(headless=True) context = browser.new_context() page = context.new_page() # 设置更长的页面加载时间 page.set_default_timeout(10000) # 添加必要的初始化脚本 page.goto("https://www.xiaohongshu.com", wait_until="networkidle") # 设置cookie context.add_cookies([ {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"} ]) page.reload(wait_until="networkidle") time.sleep(3) # 增加等待时间确保页面完全加载 # 检查签名函数是否存在 signature_available = page.evaluate("typeof window._webmsxyw === 'function'") if not signature_available: raise Exception("签名函数未加载成功") # 执行签名 encrypt_params = page.evaluate( "([url, data]) => window._webmsxyw(url, data)", [uri, data] ) browser.close() return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]) } except Exception as e: if retry == 2: raise Exception(f"签名失败,重试3次后仍然失败: {str(e)}") # 指数退避重试 wait_time = (retry + 1) * 2 print(f"第{retry+1}次签名失败,等待{wait_time}秒后重试") time.sleep(wait_time)

问题二:IP被封禁处理

症状:请求返回403错误或连接被拒绝

解决方案

  1. 使用代理IP池轮换
  2. 降低请求频率
  3. 实现智能重试机制
class ProxyManager: def __init__(self, proxy_list): self.proxy_list = proxy_list self.current_index = 0 self.failed_proxies = set() def get_current_proxy(self): """获取当前可用的代理""" if not self.proxy_list: return None # 跳过失效的代理 while (self.proxy_list[self.current_index] in self.failed_proxies and len(self.failed_proxies) < len(self.proxy_list)): self.current_index = (self.current_index + 1) % len(self.proxy_list) return self.proxy_list[self.current_index] def rotate_proxy(self): """切换到下一个代理""" self.current_index = (self.current_index + 1) % len(self.proxy_list) print(f"切换到代理: {self.get_current_proxy()}") def mark_proxy_failed(self, proxy): """标记代理失效""" self.failed_proxies.add(proxy) print(f"标记代理失效: {proxy}") # 如果所有代理都失效,清空失效列表重新开始 if len(self.failed_proxies) >= len(self.proxy_list): print("所有代理都失效,清空失效列表重新开始") self.failed_proxies.clear() def is_proxy_available(self, proxy): """检查代理是否可用""" return proxy not in self.failed_proxies

问题三:数据格式变化处理

症状:API返回数据格式变化导致解析失败

解决方案

  1. 实现兼容性解析
  2. 添加数据验证
  3. 建立格式监控
class AdaptiveDataParser: def __init__(self): self.field_mappings = { 'title': ['title', 'note_title', 'desc', 'content'], 'likes': ['likes', 'like_count', 'likeCount'], 'collects': ['collects', 'collect_count', 'collectCount'], 'comments': ['comments', 'comment_count', 'commentCount'], 'user_id': ['user.user_id', 'author.user_id', 'user_id'], 'nickname': ['user.nickname', 'author.nickname', 'nickname'] } def parse_note_data(self, raw_data): """自适应解析笔记数据""" result = {} # 尝试多种可能的字段路径 for target_field, possible_paths in self.field_mappings.items(): value = self._extract_field(raw_data, possible_paths) if value is not None: result[target_field] = value # 设置默认值 result.setdefault('likes', 0) result.setdefault('collects', 0) result.setdefault('comments', 0) result.setdefault('title', '') result.setdefault('user', {'user_id': '', 'nickname': '未知用户'}) return result def _extract_field(self, data, paths): """从多个可能路径中提取字段""" for path in paths: try: if '.' in path: # 处理嵌套路径 parts = path.split('.') value = data for part in parts: value = value.get(part, {}) if value != {}: return value else: # 处理直接路径 value = data.get(path) if value is not None: return value except (KeyError, AttributeError, TypeError): continue return None def validate_note_data(self, note_data): """验证笔记数据完整性""" required_fields = ['id', 'title', 'user'] for field in required_fields: if field not in note_data or not note_data[field]: return False, f"缺少必要字段: {field}" # 验证用户信息 if 'user_id' not in note_data.get('user', {}): return False, "用户信息不完整" # 验证数值字段 numeric_fields = ['likes', 'collects', 'comments'] for field in numeric_fields: if field in note_data and not isinstance(note_data[field], (int, float)): return False, f"字段 {field} 类型错误" return True, "数据验证通过"

扩展与进阶

1. 异步请求支持

对于需要处理大量请求的场景,可以扩展异步支持:

import asyncio import aiohttp from typing import List, Dict, Any class AsyncXhsClient: def __init__(self, cookie, max_concurrent=10): self.cookie = cookie self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def fetch_multiple_notes(self, note_ids: List[str]) -> Dict[str, Any]: """异步获取多个笔记""" tasks = [] results = {} async with aiohttp.ClientSession() as session: for note_id in note_ids: task = asyncio.create_task( self._fetch_note_with_semaphore(session, note_id) ) tasks.append((note_id, task)) for note_id, task in tasks: try: results[note_id] = await task except Exception as e: results[note_id] = {'error': str(e)} return results async def _fetch_note_with_semaphore(self, session, note_id): """使用信号量控制并发""" async with self.semaphore: return await self._fetch_note(session, note_id) async def _fetch_note(self, session, note_id): """获取单个笔记""" url = f"https://www.xiaohongshu.com/explore/{note_id}" headers = { 'Cookie': self.cookie, 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } async with session.get(url, headers=headers) as response: if response.status == 200: return await response.json() else: raise Exception(f"请求失败: {response.status}")

2. 数据可视化扩展

结合数据可视化库,创建直观的数据分析报告:

import matplotlib.pyplot as plt import seaborn as sns from wordcloud import WordCloud class XhsDataVisualizer: def __init__(self, data_storage): self.storage = data_storage def create_engagement_heatmap(self, category=None): """创建互动热力图""" with self.storage.get_connection() as conn: query = """ SELECT strftime('%H', publish_time) as hour, strftime('%w', publish_time) as weekday, AVG(likes) as avg_likes, COUNT(*) as post_count FROM notes WHERE publish_time IS NOT NULL """ if category: query += f" AND category = '{category}'" query += " GROUP BY hour, weekday ORDER BY hour, weekday" df = pd.read_sql_query(query, conn) # 创建热力图 pivot_table = df.pivot(index='weekday', columns='hour', values='avg_likes') plt.figure(figsize=(12, 8)) sns.heatmap(pivot_table, cmap='YlOrRd', annot=True, fmt='.0f') plt.title('小红书笔记互动热力图') plt.xlabel('发布时间(小时)') plt.ylabel('星期几') plt.tight_layout() return plt def create_word_cloud(self, category=None, max_words=100): """创建词云图""" with self.storage.get_connection() as conn: query = "SELECT title FROM notes WHERE title != ''" if category: query += f" AND category = '{category}'" df = pd.read_sql_query(query, conn) # 合并所有标题 text = ' '.join(df['title'].tolist()) # 生成词云 wordcloud = WordCloud( width=800, height=400, background_color='white', max_words=max_words, font_path='simhei.ttf' # 中文字体 ).generate(text) plt.figure(figsize=(12, 8)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') plt.title('小红书笔记标题词云') return plt

3. 监控告警系统

构建监控告警系统,及时发现数据采集异常:

import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime, timedelta class MonitoringAlertSystem: def __init__(self, email_config): self.email_config = email_config self.alerts = [] def check_data_quality(self, storage, hours=24): """检查数据质量""" with storage.get_connection() as conn: # 检查最近24小时的数据 cutoff_time = datetime.now() - timedelta(hours=hours) # 检查数据完整性 query = """ SELECT COUNT(*) as total_notes, SUM(CASE WHEN title IS NULL OR title = '' THEN 1 ELSE 0 END) as missing_titles, SUM(CASE WHEN likes = 0 AND collects = 0 AND comments = 0 THEN 1 ELSE 0 END) as zero_engagement, AVG(likes) as avg_likes FROM notes WHERE created_at > ? """ cursor = conn.cursor() cursor.execute(query, (cutoff_time,)) result = cursor.fetchone() # 分析数据质量 quality_issues = [] if result['missing_titles'] > 0: quality_issues.append(f"缺失标题的笔记: {result['missing_titles']}条") if result['zero_engagement'] > result['total_notes'] * 0.1: # 超过10% quality_issues.append(f"零互动笔记过多: {result['zero_engagement']}条") if result['avg_likes'] < 10: # 平均点赞低于10 quality_issues.append(f"平均点赞数过低: {result['avg_likes']:.1f}") return { 'total_notes': result['total_notes'], 'quality_issues': quality_issues, 'avg_likes': result['avg_likes'] } def send_alert_email(self, subject, message): """发送告警邮件""" msg = MIMEMultipart() msg['From'] = self.email_config['sender'] msg['To'] = ', '.join(self.email_config['recipients']) msg['Subject'] = subject msg.attach(MIMEText(message, 'plain')) try: with smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port']) as server: server.starttls() server.login(self.email_config['username'], self.email_config['password']) server.send_message(msg) print(f"告警邮件发送成功: {subject}") except Exception as e: print(f"发送告警邮件失败: {e}") def monitor_and_alert(self, storage): """监控并发送告警""" quality_report = self.check_data_quality(storage) if quality_report['quality_issues']: subject = f"小红书数据采集质量告警 - {datetime.now().strftime('%Y-%m-%d %H:%M')}" message = f""" 数据采集质量报告: 总笔记数:{quality_report['total_notes']} 平均点赞数:{quality_report['avg_likes']:.1f} 发现的问题: """ for issue in quality_report['quality_issues']: message += f"- {issue}\n" message += "\n建议检查采集配置和网络连接。" self.send_alert_email(subject, message) self.alerts.append({ 'timestamp': datetime.now(), 'issues': quality_report['quality_issues'] })

通过本文的全面介绍,您已经掌握了使用 xhs 进行小红书数据采集的核心技术和最佳实践。从基础使用到高级功能,从性能优化到实战应用,xhs 为开发者提供了一个强大而灵活的数据采集解决方案。无论您是数据分析师、营销人员还是开发者,都可以基于 xhs 构建符合自己需求的自动化工具。

记住,技术是工具,合规使用是关键。在使用 xhs 进行数据采集时,请务必遵守相关法律法规和平台规则,合理控制请求频率,尊重用户隐私,仅将数据用于合法合规的用途。💡

示例代码:example/ 提供了丰富的使用示例 配置文件:setup.cfg 包含项目配置信息 测试用例:tests/ 确保代码质量

【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs

创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询