基于海天4S选股标准,批量遍历全市场个股,自动输出初选合格股票清单。
2026/6/21 13:42:04 网站建设 项目流程

严格聚焦 “基于海天 4S 选股标准,批量遍历全市场个股,自动输出初选合格股票清单” 这一件事,适合直接写进课程讲义或技术博客。

海天 4S 选股标准:全市场批量初筛系统

一、实际应用场景描述

在 A 股实盘或研究中,投资者常面临一个基础但关键的问题:

面对 5000+ 只股票,如何快速、系统化地筛选出符合“海天 4S 标准”的初选池?

典型场景

场景 痛点

私募/公募研究员做初筛 每天手动翻几百只票,效率低、标准不统一

量化策略做选股前置过滤 需要干净、可重复的“股票池生成器”

智能投顾平台 需要自动化、可追溯、可参数化的筛选流程

个人投资者 不懂如何把“海天 4S”翻译成可执行的量化规则

👉 本质需求:

把“海天 4S 选股理念”翻译成明确、可编程、可回测的筛选流程,并批量应用到全市场。

二、引入痛点(问题结构化)

层级 痛点 后果

概念层 “海天 4S”缺乏统一、可执行的定义 不同人理解不同,无法系统化

数据层 财务数据、行情数据来源分散 清洗成本高、出错率高

工程层 全市场 5000+ 只股票逐一计算,性能差 运行慢、无法日常化

风控层 缺乏“通过/拒绝”明细日志 无法审计、无法复盘

教学层 讲义只讲概念,不给代码 学生知其然不知其所以然

核心结论:

“海天 4S”不是玄学,它本质上是 4 个维度的量化过滤漏斗,可以被严谨地工程化。

三、核心逻辑讲解("海天 4S"的工程化翻译)

3.1 海天 4S 是什么?

海天投资体系中的 4S 选股标准,是指从以下 4 个维度 对个股进行初筛:

维度 英文 核心含义 可量化指标

基本面 Soundness 财务健康、盈利能力强 ROE、净利率、资产负债率

成长性 Sustainability 业绩增长可持续 营收/净利润增速、复合增长率

估值合理性 Safety 价格相对内在价值有安全边际 PE、PB、PEG、EV/EBITDA

流动性 Scalability 足够大的成交量和市值 日均成交额、市值、换手率

四层漏斗:基本面 → 成长性 → 估值 → 流动性,层层过滤。

3.2 筛选漏斗设计

┌──────────────────────────────────────────────────────────────┐

│ 海天 4S 筛选漏斗 │

├──────────────────────────────────────────────────────────────┤

│ │

│ 全市场 5000+ 只股票 │

│ │ │

│ ▼ │

│ ┌──────────────────────────────────────┐ │

│ │ S1: 基本面过滤(Soundness) │ │

│ │ ROE ≥ 8% │ │

│ │ 净利率 ≥ 5% │ │

│ │ 资产负债率 ≤ 70% │ │

│ └──────────────────────────────────────┘ │

│ │ 通过(约 40%~60%) │

│ ▼ │

│ ┌──────────────────────────────────────┐ │

│ │ S2: 成长性过滤(Sustainability) │ │

│ │ 营收增速 ≥ 10% │ │

│ │ 净利润增速 ≥ 10% │ │

│ │ 连续 2 年正增长 │ │

│ └──────────────────────────────────────┘ │

│ │ 通过(约 30%~50%) │

│ ▼ │

│ ┌──────────────────────────────────────┐ │

│ │ S3: 估值过滤(Safety) │ │

│ │ PE ≤ 行业中位数 × 1.2 │ │

│ │ PB ≤ 3.0 │ │

│ │ PEG ≤ 1.5 │ │

│ └──────────────────────────────────────┘ │

│ │ 通过(约 40%~60%) │

│ ▼ │

│ ┌──────────────────────────────────────┐ │

│ │ S4: 流动性过滤(Scalability) │ │

│ │ 日均成交额 ≥ 2000 万 │ │

│ │ 市值 ≥ 50 亿 │ │

│ │ 换手率 ≥ 0.5% │ │

│ └──────────────────────────────────────┘ │

│ │ 通过(约 60%~80%) │

│ ▼ │

│ ✅ 初选合格股票清单(通常 50~200 只) │

│ │

└──────────────────────────────────────────────────────────────┘

3.3 为什么是这 4 个维度?

维度 解决的问题 不筛的后果

基本面 淘汰"看起来增长但财务不健康"的票 踩雷财务造假

成长性 淘汰"大而不强"的周期顶点票 买在周期顶部

估值 淘汰"好公司但价格太贵"的票 买在估值泡沫

流动性 淘汰"纸面好但买不进/卖不出"的票 实盘无法执行

3.4 关键工程决策

决策 选择 理由

行业分类 申万一级行业 估值中性化需要

行业估值基准 行业中位数 比均值更抗极端值

财务数据频率 近 4 个季度 TTM 平滑季节性

行情数据窗口 近 20 个交易日 流动性是"近期"概念

通过标准 全部 4S 都通过 初筛从严

四、项目结构(工程化)

haitian_4s_screener/

├── README.md

├── requirements.txt

├── config.yaml # 4S 阈值配置

├── data/

│ ├── financials.csv # 财务数据(模拟)

│ ├── daily_prices.csv # 日频行情

│ └── daily_amounts.csv # 日频成交额

├── src/

│ ├── data_loader.py # 数据加载与清洗

│ ├── industry_classifier.py # 行业分类(申万)

│ ├── s1_soundness.py # ★ S1: 基本面过滤

│ ├── s2_sustainability.py # ★ S2: 成长性过滤

│ ├── s3_safety.py # ★ S3: 估值过滤

│ ├── s4_scalability.py # ★ S4: 流动性过滤

│ ├── screener_engine.py # ★ 主引擎:串联 4S

│ └── visualizer.py # 可视化

├── main.py # 主入口

└── output/

└── screened_stocks.csv # 输出:合格股票清单

五、完整代码(模块化 + 清晰注释)

"requirements.txt"

pandas>=1.5

numpy>=1.21

matplotlib>=3.5

seaborn>=0.12

pyyaml>=6.0

"config.yaml"

# 海天 4S 选股标准配置

# S1: 基本面(Soundness)

s1_soundness:

enabled: true

min_roe_pct: 8.0 # ROE ≥ 8%

min_net_margin_pct: 5.0 # 净利率 ≥ 5%

max_debt_ratio_pct: 70.0 # 资产负债率 ≤ 70%

min_current_ratio: 1.0 # 流动比率 ≥ 1.0(可选)

# S2: 成长性(Sustainability)

s2_sustainability:

enabled: true

min_revenue_growth_pct: 10.0 # 营收增速 ≥ 10%

min_profit_growth_pct: 10.0 # 净利润增速 ≥ 10%

min_consecutive_growth_years: 2 # 连续 2 年正增长

# S3: 估值(Safety)

s3_safety:

enabled: true

max_pe_ttm: 50.0 # PE(TTM) ≤ 50

max_pb: 3.0 # PB ≤ 3.0

max_peg: 1.5 # PEG ≤ 1.5

valuation_compared_to_industry: true # 与行业对比

industry_pe_multiplier: 1.2 # 行业中位数 × 1.2

# S4: 流动性(Scalability)

s4_scalability:

enabled: true

min_avg_amount: 20000000 # 日均成交额 ≥ 2000 万

min_market_cap: 5000000000 # 市值 ≥ 50 亿

min_turnover_pct: 0.5 # 换手率 ≥ 0.5%

lookback_days: 20

# 输出

output:

path: "output/screened_stocks.csv"

include_failed: true # 输出中是否包含未通过的股票

include_reasons: true # 是否列出拒绝原因

# 数据

data:

financials_file: "data/financials.csv"

prices_file: "data/daily_prices.csv"

amounts_file: "data/daily_amounts.csv"

industry_file: "data/industry_mapping.csv"

"src/data_loader.py"

"""

data_loader.py

数据加载与清洗模块

"""

import pandas as pd

import numpy as np

from pathlib import Path

from typing import Optional

def load_financials(filepath: str) -> pd.DataFrame:

"""

加载财务数据(TTM 口径)

预期 CSV 格式:

code,name,industry,roe_ttm,pct,net_margin_pct,debt_ratio_pct,

current_ratio,revenue_growth_pct,profit_growth_pct,

pe_ttm,pb,peg,market_cap,revenue_ttm,net_profit_ttm

000001,平安银行,金融,11.5,28.3,91.2,1.15,5.2,6.8,6.2,0.85,0.3,231.5,156.8,42.3

...

"""

df = pd.read_csv(filepath)

df['code'] = df['code'].astype(str).str.zfill(6)

# 数值列转 numeric

numeric_cols = [

'roe_ttm_pct', 'net_margin_pct', 'debt_ratio_pct', 'current_ratio',

'revenue_growth_pct', 'profit_growth_pct',

'pe_ttm', 'pb', 'peg',

'market_cap_billion', 'revenue_ttm_billion', 'net_profit_ttm_billion'

]

for col in numeric_cols:

if col in df.columns:

df[col] = pd.to_numeric(df[col], errors='coerce')

return df

def load_industry_mapping(filepath: str) -> pd.DataFrame:

"""

加载行业映射表(用于行业中性化估值对比)

格式:

code,name,sw_industry_l1,sw_industry_l2

"""

df = pd.read_csv(filepath)

df['code'] = df['code'].astype(str).str.zfill(6)

return df

def load_price_data(filepath: str) -> pd.DataFrame:

"""加载日频行情数据"""

df = pd.read_csv(filepath, parse_dates=['date'])

df['code'] = df['code'].astype(str).str.zfill(6)

return df

def load_amount_data(filepath: str) -> pd.DataFrame:

"""加载日频成交额数据"""

df = pd.read_csv(filepath, parse_dates=['date'])

df['code'] = df['code'].astype(str).str.zfill(6)

return df

def generate_mock_financials(

n_stocks: int = 100,

seed: int = 42

) -> pd.DataFrame:

"""

生成模拟财务数据(覆盖 4S 各维度)

分布设计:

- 约 50% 通过 S1(基本面)

- 约 40% 通过 S2(成长性)

- 约 60% 通过 S3(估值)

- 约 70% 通过 S4(流动性)

→ 最终约 8%~15% 全部通过

"""

np.random.seed(seed)

industries = [

'金融', '消费', '科技', '制造', '医药',

'能源', '材料', '工业', '电信', '公用事业'

]

records = []

for i in range(n_stocks):

code = f'{i:06d}'

industry = np.random.choice(industries)

# === S1: 基本面 ===

# ROE: 均值 10%,部分 < 8%

roe = np.random.normal(10.0, 6.0)

if np.random.random() < 0.35:

roe = np.random.uniform(-5.0, 7.0) # 劣质公司

# 净利率

net_margin = np.random.normal(12.0, 8.0)

if np.random.random() < 0.30:

net_margin = np.random.uniform(-2.0, 4.0)

# 资产负债率

debt_ratio = np.random.normal(55.0, 15.0)

if np.random.random() < 0.20:

debt_ratio = np.random.uniform(75.0, 95.0) # 高负债

# 流动比率

current_ratio = np.random.normal(1.5, 0.6)

# === S2: 成长性 ===

revenue_growth = np.random.normal(12.0, 10.0)

if np.random.random() < 0.35:

revenue_growth = np.random.uniform(-15.0, 8.0)

profit_growth = np.random.normal(10.0, 12.0)

if np.random.random() < 0.30:

profit_growth = np.random.uniform(-20.0, 8.0)

# === S3: 估值 ===

pe = np.random.normal(25.0, 15.0)

if np.random.random() < 0.25:

pe = np.random.uniform(55.0, 200.0) # 高估值

pe = max(pe, -50) # 亏损公司 PE 为负

pb = np.random.normal(2.0, 1.2)

pb = max(pb, 0.3)

peg = np.random.normal(1.2, 0.8)

peg = max(peg, -2.0)

# === 其他 ===

market_cap = np.random.uniform(2.0, 500.0) # 亿

revenue_ttm = market_cap * np.random.uniform(0.2, 1.5)

net_profit_ttm = revenue_ttm * net_margin / 100

records.append({

'code': code,

'name': f'股票{i:03d}',

'industry': industry,

'roe_ttm_pct': round(roe, 2),

'net_margin_pct': round(net_margin, 2),

'debt_ratio_pct': round(debt_ratio, 2),

'current_ratio': round(current_ratio, 2),

'revenue_growth_pct': round(revenue_growth, 2),

'profit_growth_pct': round(profit_growth, 2),

'pe_ttm': round(pe, 2),

'pb': round(pb, 2),

'peg': round(peg, 2),

'market_cap_billion': round(market_cap, 2),

'revenue_ttm_billion': round(revenue_ttm, 2),

'net_profit_ttm_billion': round(net_profit_ttm, 2),

})

return pd.DataFrame(records)

def generate_mock_amounts(

codes: list,

n_days: int = 120,

seed: int = 42

) -> pd.DataFrame:

"""生成模拟成交额数据"""

np.random.seed(seed)

dates = pd.date_range('2024-07-01', periods=n_days, freq='B')

records = []

for code in codes:

base_amount = np.random.uniform(5e5, 2e8) # 50 万 ~ 2 亿

for d in dates:

# 20% 概率生成"低流动性"数据

if np.random.random() < 0.2:

amt = np.random.uniform(1e4, 5e5)

else:

amt = base_amount * np.random.uniform(0.5, 1.5)

records.append({'date': d, 'code': code, 'amount': round(amt, 2)})

return pd.DataFrame(records)

"src/industry_classifier.py"

"""

industry_classifier.py

行业分类模块(申万一级行业)

用于行业中性化估值对比

"""

import pandas as pd

import numpy as np

class IndustryClassifier:

"""

行业分类器

功能:

1. 为每只股票映射申万一级行业

2. 计算行业估值中枢(中位数)

3. 判断个股估值是否"合理"(相对行业)

"""

# 申万一级行业列表

SW_INDUSTRIES_L1 = [

'农林牧渔', '采掘', '化工', '钢铁', '有色金属',

'电子', '家用电器', '食品饮料', '纺织服装', '轻工制造',

'医药生物', '公用事业', '交通运输', '房地产', '商业贸易',

'休闲服务', '综合', '建筑材料', '建筑装饰', '电气设备',

'国防军工', '计算机', '传媒', '通信', '银行',

'非银金融', '汽车', '机械设备', '煤炭', '石油石化',

'环保', '美容护理'

]

def __init__(self, industry_mapping: Optional[pd.DataFrame] = None):

"""

参数:

industry_mapping: 股票-行业映射表(含 code, sw_industry_l1 列)

"""

self.mapping = industry_mapping

self._industry_stats: pd.DataFrame = None

def build_industry_valuation_stats(

self,

financials: pd.DataFrame

) -> pd.DataFrame:

"""

计算各行业估值指标的中位数(用于行业中性化)

返回:

DataFrame: index=行业, columns=[pe_median, pb_median, ...]

"""

if 'industry' not in financials.columns:

# 没有行业信息 → 全部视为同一行业

stats = financials[['pe_ttm', 'pb', 'peg']].median()

result = pd.DataFrame({

'pe_median': [stats['pe_ttm']],

'pb_median': [stats['pb']],

'peg_median': [stats['peg']],

'stock_count': [len(financials)]

}, index=['全市场'])

self._industry_stats = result

return result

# 按行业计算

grouped = financials.groupby('industry')[['pe_ttm', 'pb', 'peg']].median()

counts = financials.groupby('industry').size().rename('stock_count')

result = grouped.join(counts)

self._industry_stats = result

return result

def get_industry_benchmark(

self,

code: str,

metric: str = 'pe_ttm'

) -> float:

"""

获取某股票所在行业的估值基准

参数:

code: 股票代码

metric: 估值指标(pe_ttm / pb / peg)

返回:

行业中位数,若无行业信息则返回全市场中位数

"""

if self._industry_stats is None:

raise ValueError("请先调用 build_industry_valuation_stats()")

if self.mapping is not None:

row = self.mapping[self.mapping['code'] == code]

if len(row) > 0:

industry = row.iloc[0].get('sw_industry_l1', '全市场')

if industry in self._industry_stats.index:

col = metric + '_median' if metric != 'pe_ttm' else 'pe_median'

if col in self._industry_stats.columns:

return self._industry_stats.loc[industry, col]

# Fallback: 全市场

col = metric + '_median' if metric != 'pe_ttm' else 'pe_median'

if col in self._industry_stats.columns:

return self._industry_stats.iloc[0][col]

return np.nan

def print_industry_summary(self):

"""打印行业估值概览"""

if self._industry_stats is None:

print("请先调用 build_industry_valuation_stats()")

return

print(f"\n{'='*70}")

print(f"{'行业估值中枢':^70}")

print(f"{'='*70}")

print(f"{'行业':<15} {'PE中位数':<12} {'PB中位数':<12} {'PEG中位数':<12} {'股票数':<8}")

print(f"{'─'*70}")

for idx, row in self._industry_stats.iterrows():

name = idx if isinstance(idx, str) else '全市场'

print(f" {name:<13} {row.get('pe_median', 0):>10.1f} "

f"{row.get('pb_median', 0):>10.1f} "

f"{row.get('peg_median', 0):>10.2f} "

f"{int(row.get('stock_count', 0)):<8}")

print(f"{'='*70}\n")

"src/s1_soundness.py"(★ S1 基本面)

"""

s1_soundness.py

★ S1: 基本面过滤(Soundness)

核心指标:

- ROE(净资产收益率)≥ 阈值

- 净利率 ≥ 阈值

- 资产负债率 ≤ 阈值

- 流动比率 ≥ 阈值(可选)

"""

import pandas as pd

import numpy as np

from typing import Dict, List

import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')

logger = logging.getLogger(__name__)

class SoundnessFilter:

"""

基本面健康度过滤器

通过条件(全部满足才通过):

1. ROE(TTM) ≥ min_roe

2. 净利率 ≥ min_net_margin

3. 资产负债率 ≤ max_debt_ratio

4. 流动比率 ≥ min_current_ratio(可选)

"""

def __init__(

self,

min_roe_pct: float = 8.0,

min_net_margin_pct: float = 5.0,

max_debt_ratio_pct: float = 70.0,

min_current_ratio: float = 1.0,

check_current_ratio: bool = False

):

self.min_roe = min_roe_pct

self.min_margin = min_net_margin_pct

self.max_debt = max_debt_ratio_pct

self.min_current = min_current_ratio

self.check_current = check_current_ratio

logger.info(f"S1 基本面过滤: ROE≥{min_roe_pct}%, 净利率≥{min_net_margin_pct}%, "

f"负债率≤{max_debt_ratio_pct}%, 流动比率≥{min_current_ratio}({'开' if check_current_ratio else '关'})")

def filter(self, financials: pd.DataFrame) -> Dict:

"""

★ 核心方法:执行 S1 过滤

参数:

financials: 财务数据 DataFrame

返回:

{

'passed': [code, ...],

'rejected': {

'roe': [code, ...],

'net_margin': [code, ...],

'debt_ratio': [code, ...],

'current_ratio': [code, ...]

},

'reasons': {code: [reason1, reason2, ...]} # 每只被拒股票的详细原因

}

"""

passed = []

rejections = {'roe': [], 'net_margin': [], 'debt_ratio': [], 'current_ratio': []}

reasons = {}

for _, row in financials.iterrows():

code = row['code']

fail_reasons = []

# ★ 检查 ROE

roe = row.get('roe_ttm_pct', np.nan)

if pd.notna(roe) and roe < self.min_roe:

rejections['roe'].append(code)

fail_reasons.append(f'ROE={roe:.1f}%<{self.min_roe}%')

# ★ 检查净利率

margin = row.get('net_margin_pct', np.nan)

if pd.notna(margin) and margin < self.min_margin:

rejections['net_margin'].append(code)

fail_reasons.append(f'净利率={margin:.1f}%<{self.min_margin}%')

# ★ 检查资产负债率

debt = row.get('debt_ratio_pct', np.nan)

if pd.notna(debt) and debt > self.max_debt:

rejections['debt_ratio'].append(code)

fail_reasons.append(f'负债率={debt:.1f}%>{self.max_debt}%')

# ★ 检查流动比率(可选)

if self.check_current:

current = row.get('current_ratio', np.nan)

if pd.notna(current) and current < self.min_current:

rejections['current_ratio'].append(code)

fail_reasons.append(f'流动比率={current:.2f}<{self.min_current}')

if fail_reasons:

reasons[code] = fail_reasons

else:

passed.append(code)

return {

'passed': passed,

'rejected': rejections,

'reasons': reasons,

'pass_rate_pct': len(passed) / max(len(financials), 1) * 100

}

def print_report(self, result: Dict):

"""打印 S1 筛选报告"""

total = sum(len(v) for v in result['rejected'].values()) + len(result['passed'])

print(f"\n{'='*60}")

print(f" S1 基本面过滤报告")

print(f"{'='*60}")

print(f" 筛选前: {total} 只")

print(f" 通过: {len(result['passed'])} 只 ({result['pass_rate_pct']:.1f}%)")

if result['rejected']['roe']:

print(f" ROE 不达标: {len(result['rejected']['roe'])} 只")

if result['rejected']['net_margin']:

print(f" 净利率不达标: {len(result['rejected']['net_margin'])} 只")

if result['rejected']['debt_ratio']:

print(f" 负债率超标: {len(result['rejected']['debt_ratio'])} 只")

if result['rejected']['current_ratio']:

print(f" 流动比率不足: {len(result['rejected']['current_ratio'])} 只")

# 打印前 5 只被拒股票的详细原因

if result['reasons']:

print(f"\n 拒绝明细(前 5 只):")

for i, (code, rs) in enumerate(result['reasons'].items()):

if i >= 5:

break

print(f" {code}: {'; '.join(rs)}")

print(f"{'='*60}\n")

"src/s2_sustainability.py"(★ S2 成长性)

"""

s2_sustainability.py

★ S2: 成长性过滤(Sustainability)

核心指标:

- 营收增速 ≥ 阈值

- 净利润增速 ≥ 阈值

- 连续 N 年正增长(可选,需多年数据)

"""

import pandas as pd

import numpy as np

from typing import Dict, List, Optional

import logging

logger = logging.getLogger(__name__)

class SustainabilityFilter:

"""

成长性过滤器

通过条件(全部满足才通过):

1. 营收同比增速 ≥ min_revenue_growth

2. 净利润同比增速 ≥ min_profit_growth

3. 连续 N 年正增长(可选)

"""

def __init__(

本文代码仅供学习与技术交流,不构成任何投资建议,股市有风险,入市需谨慎!

利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询