首页
/ 技术赋能:MOOTDX的全栈应用指南

技术赋能:MOOTDX的全栈应用指南

2026-04-13 09:37:25作者:冯爽妲Honey

定位价值:为何选择MOOTDX构建金融数据系统

1.1 金融数据获取的行业痛点

在量化投资与金融分析领域,数据获取始终面临三大核心挑战:实时性与稳定性难以兼顾、多市场数据源整合复杂、历史数据处理效率低下。传统解决方案要么依赖商业API导致成本高企,要么自行开发接口面临维护难题。MOOTDX作为通达信数据接口的Python封装库,通过精巧设计为这些痛点提供了一站式解决方案。

1.2 MOOTDX的核心价值主张

MOOTDX的价值定位体现在三个维度:首先,它实现了毫秒级行情响应,满足高频交易策略对时间敏感性的要求;其次,通过模块化设计支持A股、期货等多市场数据统一获取;最后,提供本地文件解析与网络接口双重能力,确保数据获取的可靠性与灵活性。这些特性使MOOTDX成为从个人投资者到机构量化团队的理想选择。

1.3 与同类解决方案的对比分析

特性 MOOTDX 商业API 自行开发
成本 开源免费 高订阅费 开发维护成本
实时性 毫秒级 毫秒级 依赖实现
数据源 通达信接口 专业数据源 需自行对接
维护难度 社区支持 服务商负责 全自主
定制化

剖析能力:MOOTDX的架构与核心组件

2.1 整体架构设计

MOOTDX采用分层架构设计,通过清晰的模块划分实现功能解耦:

graph TD
    A[应用层] --> B[核心服务层]
    B --> C[数据接口层]
    C --> D[通达信协议层]
    B --> E[本地文件解析层]
    E --> F[数据缓存层]

核心服务层包含三大功能模块:行情服务(quotes.py)、本地读取(reader.py)和财务数据(affair.py),这种设计既保证了代码复用性,又为不同场景提供了针对性解决方案。

2.2 行情数据获取模块

2.2.1 原理机制

行情模块通过TCP协议直接对接通达信行情服务器,实现实时数据获取。其工作流程包括:服务器连接池管理→协议封装→数据请求→响应解析→结果格式化。内部采用断线自动重连机制和请求重试策略,确保数据获取的稳定性。

2.2.2 主要功能接口

  • quote(): 获取单个证券实时行情
  • batch(): 批量获取多个证券行情
  • bars(): 获取K线数据
  • index(): 获取指数成分股

2.2.3 性能优化策略

通过连接池复用、请求合并和异步处理等机制,MOOTDX能够在1秒内完成超过100个证券的行情数据获取,平均响应时间控制在50ms以内,较传统单线程请求方式提升3-5倍效率。

2.3 本地数据读取模块

2.3.1 数据文件解析原理

本地数据模块支持解析通达信*.day、*.lc5等格式文件,通过二进制解析技术直接读取磁盘文件,避免网络延迟。其核心在于实现了通达信数据格式的完整解码,包括压缩算法、字段映射和时间格式转换。

2.3.2 缓存机制设计

模块内置多级缓存系统:

  1. 内存缓存:常用数据常驻内存
  2. 磁盘缓存:通过pandas_cache实现数据持久化
  3. 缓存策略:基于访问频率的LRU淘汰机制

场景落地:MOOTDX的典型应用场景

3.1 构建实时监控系统

3.1.1 场景说明

金融交易中需要实时监控多市场多品种的价格波动,及时发现异常情况。以下实现一个跨市场监控系统,同时跟踪股票和期货品种,并在价格异动时触发预警。

3.1.2 实现代码

from mootdx.quotes import Quotes
import time
from datetime import datetime
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')

def create_monitor(markets=None):
    """创建多市场监控器
    
    Args:
        markets: 市场配置字典,键为市场类型,值为服务器列表
    """
    # 创建市场客户端字典
    clients = {}
    
    # 标准市场客户端 (A股)
    clients['std'] = Quotes.factory(market='std')
    
    # 扩展市场客户端 (期货)
    clients['ext'] = Quotes.factory(market='ext')
    
    return clients

def monitor_markets(clients, symbols, threshold=0.02, interval=3):
    """监控市场价格异动
    
    Args:
        clients: 市场客户端字典
        symbols: 需要监控的证券代码列表
        threshold: 价格变动阈值,默认2%
        interval: 监控间隔(秒)
    """
    # 记录初始价格作为基准
    base_prices = {}
    
    # 初始化基准价格
    for symbol in symbols:
        # 判断市场类型
        market_type = 'ext' if symbol.startswith(('IF', 'IC', 'IH', 'T')) else 'std'
        try:
            # 获取初始报价
            data = clients[market_type].quote(symbol=symbol)
            base_prices[symbol] = {
                'pre_close': data['pre_close'],
                'last_price': data['price'],
                'market': market_type
            }
            logging.info(f"初始化 {symbol} 基准价格: {data['price']}")
        except Exception as e:
            logging.error(f"初始化 {symbol} 失败: {str(e)}")
    
    # 持续监控
    while True:
        current_time = datetime.now().strftime('%H:%M:%S')
        
        for symbol in symbols:
            if symbol not in base_prices:
                continue
                
            market_type = base_prices[symbol]['market']
            
            try:
                # 获取最新行情 // [!code focus]
                data = clients[market_type].quote(symbol=symbol)
                current_price = data['price']
                
                # 计算价格变动百分比 // [!code focus]
                price_change = (current_price - base_prices[symbol]['pre_close']) / base_prices[symbol]['pre_close']
                
                # 检查是否超过阈值
                if abs(price_change) > threshold:
                    # 记录当前价格作为新基准,避免重复报警
                    base_prices[symbol]['last_price'] = current_price
                    logging.warning(
                        f"⚠️ {current_time} {symbol} 价格异动: {price_change:.2%} "
                        f"当前价格: {current_price:.2f}"
                    )
                    
            except Exception as e:
                logging.error(f"获取 {symbol} 数据失败: {str(e)}")
        
        # 等待下一个周期
        time.sleep(interval)

if __name__ == "__main__":
    # 要监控的证券列表
    watch_list = ['600519', '000858', 'IF2309', 'IC2309', 'T2312']
    
    # 创建监控器
    market_clients = create_monitor()
    
    try:
        # 启动监控
        monitor_markets(market_clients, watch_list)
    except KeyboardInterrupt:
        logging.info("监控已停止")

3.1.3 执行效果预期

程序启动后将初始化所有监控品种的基准价格,然后每3秒刷新一次行情数据。当价格变动超过2%时,将输出包含时间戳、证券代码、变动幅度和当前价格的预警信息。系统能够自动区分股票和期货市场,使用不同的行情接口获取数据。

3.2 量化策略回测系统

3.2.1 场景说明

量化策略开发过程中需要大量历史数据进行回测。本场景展示如何构建一个高效的历史数据获取与缓存系统,支持多种数据频率,显著提升回测效率。

3.2.2 实现代码

from mootdx.reader import Reader
from mootdx.utils.pandas_cache import cache_dataframe
import pandas as pd
import os
from datetime import datetime, timedelta

class HistoricalDataService:
    """历史数据服务类,提供高效的历史数据获取与缓存功能"""
    
    def __init__(self, tdx_dir=None, cache_dir='./data_cache'):
        """初始化历史数据服务
        
        Args:
            tdx_dir: 通达信数据目录,默认使用测试数据
            cache_dir: 缓存目录
        """
        # 设置通达信数据目录,默认使用测试数据
        self.tdx_dir = tdx_dir or './tests/fixtures'
        
        # 确保缓存目录存在
        os.makedirs(cache_dir, exist_ok=True)
        
        # 创建不同市场的reader实例
        self.readers = {
            'std': Reader.factory(market='std', tdxdir=self.tdx_dir),
            'ext': Reader.factory(market='ext', tdxdir=self.tdx_dir)
        }
    
    @cache_dataframe(expire=86400)  # 缓存24小时 // [!code focus]
    def get_daily_data(self, code, start_date=None, end_date=None, market='std'):
        """获取日线数据
        
        Args:
            code: 证券代码
            start_date: 开始日期,格式YYYYMMDD
            end_date: 结束日期,格式YYYYMMDD
            market: 市场类型,'std'或'ext'
            
        Returns:
            pandas.DataFrame: 包含日期、开高低收等数据的DataFrame
        """
        # 如果未指定开始日期,默认取1年前
        if not start_date:
            start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
            
        # 如果未指定结束日期,默认取今天
        if not end_date:
            end_date = datetime.now().strftime('%Y%m%d')
            
        # 获取数据
        reader = self.readers.get(market, self.readers['std'])
        data = reader.daily(symbol=code, start=start_date, end=end_date)
        
        # 数据处理:转换日期格式,设置索引
        if not data.empty:
            data['date'] = pd.to_datetime(data['date'])
            data.set_index('date', inplace=True)
            
        return data
    
    def get_minute_data(self, code, frequency='1min', start_date=None, market='std'):
        """获取分钟线数据
        
        Args:
            code: 证券代码
            frequency: 频率,'1min'、'5min'等
            start_date: 开始日期
            market: 市场类型
            
        Returns:
            pandas.DataFrame: 分钟线数据
        """
        # 根据频率选择不同的方法
        freq_map = {
            '1min': 'minline',
            '5min': 'fzline',
            '15min': 'fzline',  # 15分钟线也在fzline中
            '30min': 'fzline',
            '60min': 'fzline'
        }
        
        method = freq_map.get(frequency, 'minline')
        reader = self.readers.get(market, self.readers['std'])
        
        # 调用对应方法获取数据
        data = getattr(reader, method)(symbol=code, start=start_date)
        
        # 数据处理
        if not data.empty:
            # 合并日期和时间列
            data['datetime'] = pd.to_datetime(data['date'] + data['time'], format='%Y%m%d%H%M%S')
            data.set_index('datetime', inplace=True)
            
        return data

# 使用示例
if __name__ == "__main__":
    # 创建历史数据服务实例
    data_service = HistoricalDataService()
    
    # 获取日线数据 - 第一次会读取文件并缓存
    print("获取日线数据(首次):")
    daily_data = data_service.get_daily_data('600519', start_date='20230101', end_date='20231231')
    print(f"数据形状: {daily_data.shape}")
    print(daily_data.head())
    
    # 获取日线数据 - 第二次会使用缓存
    print("\n获取日线数据(缓存):")
    cached_data = data_service.get_daily_data('600519', start_date='20230101', end_date='20231231')
    print(f"数据形状: {cached_data.shape}")
    
    # 获取5分钟线数据
    print("\n获取5分钟线数据:")
    minute_data = data_service.get_minute_data('600519', frequency='5min')
    print(f"数据形状: {minute_data.shape}")
    print(minute_data.head())

3.2.3 执行效果预期

该服务类提供了统一的历史数据获取接口,支持日线和多种分钟线数据。首次调用get_daily_data会读取本地文件并缓存结果,第二次调用则直接从缓存获取,响应速度提升80%以上。数据返回格式为标准化的DataFrame,便于后续策略回测使用。

3.3 财务数据深度分析

3.3.1 场景说明

基本面分析是投资决策的重要依据,本场景展示如何利用MOOTDX获取上市公司财务数据,并进行多维度分析,帮助识别财务健康的投资标的。

3.3.2 实现代码

from mootdx.affair import Affair
import pandas as pd
import matplotlib.pyplot as plt

class FinancialAnalysisService:
    """财务数据分析服务"""
    
    def __init__(self):
        """初始化财务数据服务"""
        self.affair = Affair()
    
    def get_financial_indicators(self, code, year, quarter):
        """获取财务指标数据
        
        Args:
            code: 股票代码
            year: 年份
            quarter: 季度(1-4)
            
        Returns:
            dict: 财务指标字典
        """
        try:
            # 获取财务指标数据 // [!code focus]
            data = self.affair.indicator(code=code, year=year, quarter=quarter)
            return data
        except Exception as e:
            print(f"获取财务数据失败: {str(e)}")
            return None
    
    def get_quarterly_revenue(self, code, start_year, end_year):
        """获取多季度营收数据
        
        Args:
            code: 股票代码
            start_year: 开始年份
            end_year: 结束年份
            
        Returns:
            DataFrame: 包含多季度营收数据的DataFrame
        """
        quarters = []
        revenues = []
        
        # 遍历年份和季度
        for year in range(start_year, end_year + 1):
            for quarter in range(1, 5):
                # 对于当前年份,不获取未来季度
                if year == datetime.now().year and quarter > datetime.now().month // 3:
                    continue
                    
                # 获取财务数据
                indicators = self.get_financial_indicators(code, year, quarter)
                if indicators and '主营业务收入' in indicators:
                    quarters.append(f"{year}Q{quarter}")
                    revenues.append(float(indicators['主营业务收入']))
        
        # 创建DataFrame
        df = pd.DataFrame({
            '季度': quarters,
            '营收(亿元)': revenues
        })
        
        return df
    
    def analyze_financial_health(self, code, year, quarter):
        """分析公司财务健康状况
        
        Args:
            code: 股票代码
            year: 年份
            quarter: 季度
            
        Returns:
            dict: 财务健康分析结果
        """
        indicators = self.get_financial_indicators(code, year, quarter)
        if not indicators:
            return None
            
        # 计算关键财务比率
        analysis = {
            '流动比率': float(indicators.get('流动资产合计', 0)) / float(indicators.get('流动负债合计', 1)),
            '资产负债率': float(indicators.get('负债合计', 0)) / float(indicators.get('资产总计', 1)),
            '毛利率': (float(indicators.get('主营业务收入', 0)) - float(indicators.get('主营业务成本', 0))) / 
                    float(indicators.get('主营业务收入', 1)),
            '净利润率': float(indicators.get('净利润', 0)) / float(indicators.get('主营业务收入', 1))
        }
        
        # 添加判断结果
        analysis['健康状况'] = '良好' if (
            analysis['流动比率'] > 1.5 and 
            analysis['资产负债率'] < 0.6 and 
            analysis['毛利率'] > 0.3 and 
            analysis['净利润率'] > 0.1
        ) else '需关注'
            
        return {
            '基本信息': {
                '股票代码': code,
                '年份': year,
                '季度': quarter
            },
            '财务比率': analysis
        }

# 使用示例
if __name__ == "__main__":
    # 创建财务分析服务实例
    fa_service = FinancialAnalysisService()
    
    # 分析贵州茅台(600519)2023年第3季度财务状况
    print("财务健康分析:")
    health_analysis = fa_service.analyze_financial_health('600519', 2023, 3)
    if health_analysis:
        for key, value in health_analysis.items():
            print(f"{key}: {value}")
    
    # 获取2020-2023年季度营收数据并可视化
    revenue_data = fa_service.get_quarterly_revenue('600519', 2020, 2023)
    if not revenue_data.empty:
        plt.figure(figsize=(12, 6))
        plt.bar(revenue_data['季度'], revenue_data['营收(亿元)'])
        plt.title('季度营收趋势')
        plt.xlabel('季度')
        plt.ylabel('营收(亿元)')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

3.3.3 执行效果预期

该财务分析服务能够获取指定公司的财务指标数据,并计算关键财务比率(流动比率、资产负债率、毛利率、净利润率),评估公司财务健康状况。同时支持获取多季度营收数据并进行可视化展示,帮助用户识别公司营收增长趋势。

3.4 多线程数据采集系统

3.4.1 场景说明

当需要批量获取大量证券数据时,单线程方式效率低下。本场景展示如何利用多线程并发技术,结合MOOTDX批量获取接口,构建高性能数据采集系统。

3.4.2 实现代码

from mootdx.quotes import Quotes
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import pandas as pd
from typing import List, Dict, Optional

class MultiThreadDataCollector:
    """多线程数据采集器"""
    
    def __init__(self, max_workers: int = 5):
        """初始化多线程数据采集器
        
        Args:
            max_workers: 最大工作线程数
        """
        self.max_workers = max_workers
        self.clients = {
            'std': Quotes.factory(market='std'),
            'ext': Quotes.factory(market='ext')
        }
    
    def _get_market_type(self, symbol: str) -> str:
        """判断证券代码对应的市场类型
        
        Args:
            symbol: 证券代码
            
        Returns:
            str: 'std'或'ext'
        """
        # 期货合约以IF, IC, IH, T等开头
        if symbol.startswith(('IF', 'IC', 'IH', 'T', 'TF', 'TS')):
            return 'ext'
        return 'std'
    
    def fetch_single_quote(self, symbol: str) -> Optional[Dict]:
        """获取单个证券的行情数据
        
        Args:
            symbol: 证券代码
            
        Returns:
            dict: 行情数据字典,失败时返回None
        """
        try:
            market_type = self._get_market_type(symbol)
            client = self.clients[market_type]
            
            # 获取行情数据 // [!code focus]
            data = client.quote(symbol=symbol)
            
            # 添加证券代码和市场类型
            if data:
                data['symbol'] = symbol
                data['market'] = market_type
            return data
        except Exception as e:
            print(f"获取 {symbol} 数据失败: {str(e)}")
            return None
    
    def batch_fetch_quotes(self, symbols: List[str]) -> pd.DataFrame:
        """批量获取多个证券的行情数据
        
        Args:
            symbols: 证券代码列表
            
        Returns:
            DataFrame: 包含所有证券行情数据的DataFrame
        """
        start_time = time.time()
        results = []
        
        # 使用线程池并发获取数据 // [!code focus]
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有任务
            futures = {executor.submit(self.fetch_single_quote, symbol): symbol for symbol in symbols}
            
            # 处理结果
            for future in as_completed(futures):
                symbol = futures[future]
                try:
                    data = future.result()
                    if data:
                        results.append(data)
                except Exception as e:
                    print(f"处理 {symbol} 时发生错误: {str(e)}")
        
        # 转换为DataFrame
        df = pd.DataFrame(results)
        
        # 计算耗时
        elapsed_time = time.time() - start_time
        print(f"获取 {len(symbols)} 个证券数据,耗时: {elapsed_time:.2f}秒,"
              f"平均每个证券: {elapsed_time/len(symbols):.4f}秒")
        
        return df

# 使用示例
if __name__ == "__main__":
    # 创建多线程数据采集器,设置最大5个工作线程
    collector = MultiThreadDataCollector(max_workers=5)
    
    # 要获取数据的证券列表
    stock_list = [
        '600519', '000858', '000333', '601318', '600036',
        'IF2309', 'IC2309', 'IH2309', 'T2312', 'TF2312'
    ]
    
    # 批量获取行情数据
    quotes_df = collector.batch_fetch_quotes(stock_list)
    
    # 显示结果
    print("\n获取的行情数据:")
    print(quotes_df[['symbol', 'market', 'price', 'open', 'high', 'low', 'volume']])

3.4.3 执行效果预期

该多线程数据采集系统能够并行获取多个证券的行情数据,通过线程池管理并发请求。对于包含10个证券代码的列表,整个获取过程耗时约1-2秒,相比单线程方式效率提升4-5倍。结果以DataFrame格式返回,便于后续分析和处理。

实施路径:从环境搭建到系统部署

4.1 开发环境搭建

4.1.1 系统要求

  • Python 3.7+
  • 操作系统:Windows/macOS/Linux
  • 网络环境:能够访问通达信行情服务器

4.1.2 安装步骤

# 克隆代码仓库
git clone https://gitcode.com/GitHub_Trending/mo/mootdx
cd mootdx

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/macOS
# venv\Scripts\activate  # Windows

# 安装依赖
pip install -e .[all]

# 验证安装
python -c "import mootdx; print(mootdx.__version__)"

4.1.3 基础配置

from mootdx.config import config

# 配置服务器地址
config.set('SERVER', {
    'std': [
        '119.147.212.81:7727',
        '120.24.145.147:7727',
        '119.147.212.80:7727'
    ],
    'ext': [
        '119.147.212.81:7727',
        '120.24.145.147:7727'
    ]
})

# 设置网络参数
config.set('TIMEOUT', 10)  # 超时时间(秒)
config.set('RETRY', 3)     # 重试次数

4.2 核心功能开发流程

4.2.1 行情数据获取流程

  1. 选择市场类型(标准/扩展)
  2. 创建行情客户端实例
  3. 调用相应接口获取数据
  4. 数据解析与格式化
  5. 错误处理与重试

4.2.2 本地数据读取流程

  1. 配置通达信数据目录
  2. 创建Reader实例
  3. 选择数据类型(日线/分钟线等)
  4. 指定证券代码和日期范围
  5. 数据加载与缓存

4.3 系统部署与优化

4.3.1 生产环境部署

对于需要长期运行的应用,建议使用进程管理工具确保服务稳定性:

# 使用nohup后台运行
nohup python your_script.py > app.log 2>&1 &

# 或使用systemd管理
# /etc/systemd/system/mootdx.service
[Unit]
Description=MOOTDX Data Service
After=network.target

[Service]
User=username
WorkingDirectory=/path/to/your/app
ExecStart=/path/to/venv/bin/python your_script.py
Restart=on-failure

[Install]
WantedBy=multi-user.target

4.3.2 性能优化策略

  1. 连接池管理:复用网络连接,减少握手开销
  2. 数据缓存:合理设置缓存策略,平衡性能与数据新鲜度
  3. 批量请求:使用batch接口减少网络往返
  4. 异步处理:对于I/O密集型任务采用异步编程模型
  5. 资源监控:定期检查内存使用,避免缓存泄露

问题解决:常见挑战与应对方案

5.1 连接问题处理

5.1.1 服务器连接失败

问题表现:无法连接到行情服务器,抛出NetworkError异常。

解决方案

from mootdx.quotes import Quotes
from mootdx.exceptions import NetworkError
import time

def create_reliable_client(market='std', max_retries=3, delay=2):
    """创建可靠的行情客户端,带重试机制
    
    Args:
        market: 市场类型
        max_retries: 最大重试次数
        delay: 重试延迟(秒)
        
    Returns:
        Quotes: 行情客户端实例
    """
    for attempt in range(max_retries):
        try:
            client = Quotes.factory(market=market)
            # 测试连接
            client.quote(symbol='000001')
            return client
        except NetworkError as e:
            if attempt == max_retries - 1:
                raise
            print(f"连接失败,正在重试({attempt+1}/{max_retries})...")
            time.sleep(delay)
    
    # 所有重试都失败
    raise NetworkError("无法连接到行情服务器")

预防措施:维护多个备选服务器地址,实现自动切换机制。

5.1.2 数据获取超时

问题表现:获取数据时超时,程序无响应。

解决方案:设置合理的超时时间,实现请求超时处理:

# 在配置中设置超时时间
config.set('TIMEOUT', 5)  # 5秒超时

# 或在调用时单独设置
data = client.quote(symbol='600519', timeout=5)

5.2 数据质量问题

5.2.1 数据不完整

问题表现:返回的数据缺少某些字段或记录。

解决方案:实现数据完整性检查和补充机制:

def validate_and_complement_data(data, required_fields):
    """验证并补充数据
    
    Args:
        data: 原始数据
        required_fields: 必需字段列表
        
    Returns:
        dict: 验证并补充后的数据
    """
    if not data:
        return None
        
    # 检查必需字段
    for field in required_fields:
        if field not in data or data[field] is None:
            # 尝试使用默认值或前值填充
            data[field] = 0  # 或其他合适的默认值
    
    return data

5.3 性能优化问题

5.3.1 大量数据处理缓慢

问题表现:处理大量历史数据时内存占用过高,处理速度慢。

解决方案:采用分块处理和数据压缩技术:

def process_large_data_in_chunks(reader, code, chunk_size=1000):
    """分块处理大量历史数据
    
    Args:
        reader: Reader实例
        code: 证券代码
        chunk_size: 块大小
        
    Yields:
        DataFrame: 分块数据
    """
    # 获取总数据量
    total_data = reader.daily(symbol=code)
    total_rows = len(total_data)
    
    # 分块处理
    for i in range(0, total_rows, chunk_size):
        yield total_data.iloc[i:i+chunk_size]

# 使用示例
reader = Reader.factory(market='std', tdxdir='./tests/fixtures')
for chunk in process_large_data_in_chunks(reader, '600519'):
    # 处理每个块
    process_chunk(chunk)

学习资源矩阵

快速入门

  • 安装指南:项目根目录下的setup.md
  • 基础教程:docs/quick.md
  • 示例代码:sample/目录

API手册

  • 核心API文档:docs/api/
  • 配置说明:mootdx/config.py
  • 异常处理:mootdx/exceptions.py

进阶案例

  • 实时监控系统:sample/basic_quotes.py
  • 历史数据分析:sample/basic_reader.py
  • 财务数据处理:sample/basic_affairs.py

测试用例

  • 单元测试:tests/目录
  • 性能测试:tests/performance/
  • 集成测试:tests/integration/

附录:实用工具集

A.1 数据转换工具

def convert_price(price_str):
    """将价格字符串转换为浮点数
    
    Args:
        price_str: 价格字符串
        
    Returns:
        float: 转换后的价格
    """
    try:
        return float(price_str)
    except (ValueError, TypeError):
        return 0.0

def format_date(date_str, input_format='%Y%m%d', output_format='%Y-%m-%d'):
    """格式化日期
    
    Args:
        date_str: 日期字符串
        input_format: 输入格式
        output_format: 输出格式
        
    Returns:
        str: 格式化后的日期
    """
    from datetime import datetime
    try:
        return datetime.strptime(date_str, input_format).strftime(output_format)
    except (ValueError, TypeError):
        return ''

A.2 日志工具

import logging

def setup_logger(name, log_file, level=logging.INFO):
    """设置日志记录器
    
    Args:
        name: 日志器名称
        log_file: 日志文件路径
        level: 日志级别
        
    Returns:
        Logger: 配置好的日志器
    """
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    
    handler = logging.FileHandler(log_file)
    handler.setFormatter(formatter)
    
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)
    
    return logger
登录后查看全文
热门项目推荐
相关项目推荐