首页
/ WebDataset与PyTorch Lightning结合实现大规模数据集高效训练

WebDataset与PyTorch Lightning结合实现大规模数据集高效训练

2025-06-30 12:38:35作者:曹令琨Iris

背景介绍

在处理大规模数据集时,传统的PyTorch数据加载方式往往会遇到内存不足、加载速度慢等问题。WebDataset作为一种高效的解决方案,能够很好地处理TB级别的大规模数据集。本文将详细介绍如何将WebDataset与PyTorch Lightning框架结合,实现高效的大规模数据训练。

核心挑战

在大规模数据集训练中,我们面临几个关键挑战:

  1. 多GPU支持:大规模数据集训练必须支持多GPU并行
  2. 进度可视化:需要准确显示每个epoch的训练步数
  3. 数据分片处理:需要高效处理分布在多个tar文件中的数据

解决方案架构

1. 数据预处理与统计

首先需要统计数据集的基本信息。对于Laion115M这样的超大规模数据集,我们编写并行脚本扫描所有tar文件,记录每个文件包含的样本数,并将结果存储在JSON文件中:

{
  "/data/laion115m/00000.tar": 1147,
  "/data/laion115m/00001.tar": 1203,
  ...
}

2. WebDataset基础使用

WebDataset的基本使用方式如下:

dataset = wds.WebDataset(url)
    .shuffle(1000)
    .decode('pilrgb', handler=wds.warn_and_continue)
    .to_tuple("jpg", "txt", handler=wds.warn_and_continue)
    .map(transforms)

3. 自定义IterableDataset实现

为了实现多GPU支持和进度显示,我们自定义IterableDataset:

class Iter_ds(torch.utils.data.IterableDataset):
    def __init__(self, urls, transforms, n_sample):
        self.urls = urls
        self.transforms = transforms
        self.n_sample = n_sample
        
    def __len__(self):  
        return self.n_sample // get_world_size()
                       
    def __iter__(self):
        process_rank = get_rank()
        world_size = get_world_size()
        for url in self.urls:  
            dataset = wds.WebDataset(url, nodesplitter=wds.split_by_worker)
                .shuffle(1000)
                .decode('pilrgb', handler=wds.warn_and_continue)
                .to_tuple("jpg", "txt", handler=wds.warn_and_continue)
                .map(self.transforms)
            
            for batch_id, sample in enumerate(dataset):
                if batch_id % world_size == process_rank: 
                    yield sample
                else: 
                    continue

4. PyTorch Lightning集成

将上述实现集成到PyTorch Lightning的DataModule中:

class Laion115M(pl.LightningDataModule):
    def __init__(self, data_dir, split_ratio, img_transforms, txt_transforms, 
                 num_workers=4, batch_size=16, num_epoch=1, pin_memory=False):
        super().__init__()
        self.data_dir = Path(data_dir)
        self.split_ratio = split_ratio
        self.img_transforms = img_transforms
        self.txt_transforms = txt_transforms
        self.transforms = lambda tup: (self.img_transforms(tup[0]), self.txt_transforms(tup[1]))
        self.batch_size = batch_size
        self.num_epoch = num_epoch
        self.num_workers = num_workers
        self.pin_memory = pin_memory

    def prepare_data(self):
        with open(self.data_dir, 'r') as f:
            self.tar_dict = json.load(f)
        
        tar_lst = list(self.tar_dict.keys())
        n_shard = len(tar_lst)
        
        tra_ratio, val_ratio, _ = self.split_ratio
        self.tra_lst = tar_lst[:int(n_shard * tra_ratio)]
        self.val_lst = tar_lst[len(self.tra_lst):len(self.tra_lst)+int(n_shard * val_ratio)]
        self.tst_lst = tar_lst[len(self.tra_lst)+len(self.val_lst):]

    def _get_sample_num(self, tar_lst):
        return sum(self.tar_dict[tar_key] for tar_key in tar_lst)

    def setup(self, stage='train'):
        self.prepare_data()
        if stage == 'train':
            n_tra_sample = self._get_sample_num(self.tra_lst)
            self.laion_train = Iter_ds(self.tra_lst, transforms=self.transforms, n_sample=n_tra_sample)
            n_val_sample = self._get_sample_num(self.val_lst)
            self.laion_valid = Iter_ds(self.val_lst, transforms=self.transforms, n_sample=n_val_sample)
        else:
            n_tst_sample = self._get_sample_num(self.tst_lst)
            self.laion_test = Iter_ds(self.tst_lst, transforms=self.transforms, n_sample=n_tst_sample)

    def train_dataloader(self):
        return torch.utils.data.DataLoader(
            self.laion_train, 
            batch_size=self.batch_size, 
            shuffle=False, 
            pin_memory=True, 
            num_workers=self.num_workers,
            prefetch_factor=2,
            drop_last=True
        )

性能优化技巧

  1. 多节点训练注意事项

    • 网络带宽是瓶颈,尽量减少节点间通信
    • 梯度同步会占用大量网络资源
    • 每个GPU对应一个进程,进程间通信也会影响性能
  2. 数据加载优化

    • 使用pin_memory=True将数据预先加载到内存
    • 设置prefetch_factor预取数据
    • 合理设置num_workers数量
  3. 数据打乱策略

    • 对于分类数据集等有序数据必须打乱
    • 对于Laion115M等网络爬取数据可不严格打乱
    • 使用DistributedSampler自动处理多GPU数据分片

版本兼容性

经过测试,以下版本组合工作良好:

  • WebDataset: 0.2.86
  • PyTorch Lightning: 2.2.1
  • PyTorch: 2.2.0+cu118

总结

本文介绍了WebDataset与PyTorch Lightning结合处理大规模数据集的完整方案。通过自定义IterableDataset和合理的数据分片策略,我们实现了多GPU支持、训练进度可视化等关键功能。对于超大规模数据集训练,这种方案能够有效解决内存和性能问题,是处理TB级数据集的理想选择。

登录后查看全文
热门项目推荐

热门内容推荐

最新内容推荐

项目优选

收起
openHiTLS-examplesopenHiTLS-examples
本仓将为广大高校开发者提供开源实践和创新开发平台,收集和展示openHiTLS示例代码及创新应用,欢迎大家投稿,让全世界看到您的精巧密码实现设计,也让更多人通过您的优秀成果,理解、喜爱上密码技术。
C
54
469
kernelkernel
deepin linux kernel
C
22
5
nop-entropynop-entropy
Nop Platform 2.0是基于可逆计算理论实现的采用面向语言编程范式的新一代低代码开发平台,包含基于全新原理从零开始研发的GraphQL引擎、ORM引擎、工作流引擎、报表引擎、规则引擎、批处理引引擎等完整设计。nop-entropy是它的后端部分,采用java语言实现,可选择集成Spring框架或者Quarkus框架。中小企业可以免费商用
Java
7
0
RuoYi-Vue3RuoYi-Vue3
🎉 (RuoYi)官方仓库 基于SpringBoot,Spring Security,JWT,Vue3 & Vite、Element Plus 的前后端分离权限管理系统
Vue
880
519
Cangjie-ExamplesCangjie-Examples
本仓将收集和展示高质量的仓颉示例代码,欢迎大家投稿,让全世界看到您的妙趣设计,也让更多人通过您的编码理解和喜爱仓颉语言。
Cangjie
336
1.1 K
ohos_react_nativeohos_react_native
React Native鸿蒙化仓库
C++
181
264
cjoycjoy
一个高性能、可扩展、轻量、省心的仓颉Web框架。Rest, 宏路由,Json, 中间件,参数绑定与校验,文件上传下载,MCP......
Cangjie
87
14
CangjieCommunityCangjieCommunity
为仓颉编程语言开发者打造活跃、开放、高质量的社区环境
Markdown
1.09 K
0
openHiTLSopenHiTLS
旨在打造算法先进、性能卓越、高效敏捷、安全可靠的密码套件,通过轻量级、可剪裁的软件技术架构满足各行业不同场景的多样化要求,让密码技术应用更简单,同时探索后量子等先进算法创新实践,构建密码前沿技术底座!
C
361
381
cherry-studiocherry-studio
🍒 Cherry Studio 是一款支持多个 LLM 提供商的桌面客户端
TypeScript
613
60