WeNet 更新:超大规模数据 UIO,支持千万小时语音训练任务(转载自语音杂谈)
作者:互联网
转载自语音杂谈 https://mp.weixin.qq.com/s/C6vVQ455g_fVNtRBb9pa7A
WeNet 更新:超大规模数据 UIO,支持千万小时语音训练任务
近日,WeNet 中更新了超大规模数据 UIO (Unified IO) 支持,可以支持工业级千万小时级别的语音数据训练,支持云存储 OSS/S3/HDFS 等,并且训练速度更快,接口更简单,更容易使用和调试。
旧有 IO 方案的问题
WeNet 中旧有的 IO 方案基于 Pytorch 原生的 Dataset,在训练时,需要一次性把所有训练音频路径和对应标注的索引信息加载到内存,训练时对所有数据进行随机读取。在工业级的超大规模数据(5万小时以上,大约5000万条以上音频)时,该方案会导致两个严重的问题:
- 内存溢出(OOM)问题:在超大规模数据时,一般机器的物理内存已经难以一次性加载训练数据的索引信息。
- 读取性能慢:旧有方案为随机读取,在超大规模数据内存无法做文件 cache 的情况下,训练数据读取速度大幅度降低,从而导致训练速度慢。
UIO 解决方案
为解决上述问题,受如下工业级的解决方案的启发:
- webdataset: https://github.com/webdataset/webdataset
- tfrecord:Tensorflow 的 IO 解决方案
我们重新设计了 WeNet 的 IO 方案,其核心思想是:将多个小数据(如1000条)的音频和标注打成压缩包(tar),并基于 Pytorch 的 IterableDataset 进行读取。该方案在训练时:
- 内存中仅需维护压缩包的的索引信息,从而大大节省了内存,解决了 OOM 的问题。
- 读取时在内存中进行 on-the-fly 的解压缩,同一个压缩包内的数据顺序读取,解决了随机读取性能慢的问题。不同的压缩包可以进行随机读取,保证了数据的全局随机性。
同时,新 IO 方案在小数据集时支持选择使用原始形式的数据(无需压缩包),以保证在小数据集时的简洁性,也避免了小数据时浪费额外的空间。所以,新的 IO 方案同时兼顾了小数据集和大数据集,我们称之为 Unified IO(UIO)。UIO 的整体设计如下图所示:
其中:
-
Small IO 为小数据集支持,我们称之为 raw 模式,该模式仅支持本地文件读取。所需文件须整理成 Kaldi 风格的语音列表文件 wav.scp 和标注列表文件 text。
-
Big IO 为大数据集支持,我们称之为 shard 模式,该模式既可以支持本地文件读取,也可以支持网络云存储文件的读取。所需文件须整理成压缩包形式,单个压缩包内顺序存储了音频(wav)和其标注(txt)。
除此之外,受 TFRecord 链式 (chain IO) 的启发,UIO 也采用链式实现。在实际使用中,链式 IO 更灵活,更易于扩展,也更易于调试。
TFRecord 中的链式 IO 示例
def read_dataset(filename, batch_size):
dataset = tf.data.TFRecordDataset(filename)
dataset = dataset.map(_parse_image_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(500)
dataset = dataset.batch(batch_size, drop_remainder=True)
dataset = dataset.repeat()
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
return dataset
WeNet 中的 UIO 链式数据流如下图所示:
其链式 IO 包括以下模块:
- tokenize 模块: 将标注解析成建模单元 char 或者 BPE。
- filter 模块:过滤掉训练数据中音频过长过短、标注过长过短的句子。
- resample 模块: 对训练数据进行可选的重采样。
- compute_fbank 模块:特征提取
- spec_augmentation 模块:对特征进行 spec_augmentation 增强。
- shuffle 模块:对数据进行局部 shuffle。
- sort 模块:对数据进行局部排序。
- batch 模块:将多条数据组织成 batch。
- padding 模块:对同一 batch 内的数据进行 padding 处理。
实验
目前我们在 aishell(200小时) 和 WenetSpeech(1万小时)数据上分别验证了 UIO 的准确性。
AIshell
从该实验可以看到,UIO 无论是 raw 还是 shard 的模式,均能取得与之前旧有方案相当的识别率。
WenetSpeech
表中 WeNet 的结果直接基于 UIO shard 方案训练。WeNet 和 ESPnet 使用相近的模型结构和参数配置,两者取得相近的识别率,说明了 UIO 方案在大规模数据上的正确性。在训练过程中,我们观测到 UIO 的 GPU 整体的利用率在 80% ~ 90% 以上,说明整体 IO 的读取效率很高。
如何使用?
关于 UIO 的详细使用方式请参考 aishell 数据集示例:
https://github.com/wenet-e2e/wenet/blob/main/examples/aishell/s0/run.sh
目前 wenet 中所有的数据集均已使用 UIO 作为默认数据准备和读取方案。
和 UIO 相关的有三个参数:
- train_data(cv_data/test_data): data.list
- data_type: 可以为raw或者shard
- symbol_table: 训练时的建模单元字典文件
python wenet/bin/train.py --gpu $gpu_id \
--config $train_config \
--data_type $data_type \
--symbol_table $dict \
--train_data $feat_dir/$train_set/data.list \
--cv_data $feat_dir/dev/data.list \
...
对于 data_type 为 raw 的文件,data.list 的形式如下所示,每一行为一行 json 序列化的字符串,该 json 串中含有 key, wav, txt 三个字段。
{"key": "BAC009S0002W0122", "wav": "/export/data/asr-data/OpenSLR/33//data_aishell/wav/train/S0002/BAC009S0002W0122.wav", "txt": "而对楼市成交抑制作用最大的限购"}
{"key": "BAC009S0002W0123", "wav": "/export/data/asr-data/OpenSLR/33//data_aishell/wav/train/S0002/BAC009S0002W0123.wav", "txt": "也成为地方政府的眼中钉"}
{"key": "BAC009S0002W0124", "wav": "/export/data/asr-data/OpenSLR/33//data_aishell/wav/train/S0002/BAC009S0002W0124.wav", "txt": "自六月底呼和浩特市率先宣布取消限购后"}
对于 data_type 为 shard 的文件,data.list 中每一行为一个统一资源标识符 URI 的地址,该地址可以为本地文件,或者网络文件 HTTP/HTTPS/FTP 等形式。通过网络接口形式,我们即可实现对 S3/OSS/HDFS 等分布式存储系统的支持。例如: 对于本地文件 data.list 的形式为:
/export/maryland/binbinzhang/code/wenet/examples/aishell/s3/raw_wav/train/shards/shards_000000000.tar.gz
/export/maryland/binbinzhang/code/wenet/examples/aishell/s3/raw_wav/train/shards/shards_000000001.tar.gz
/export/maryland/binbinzhang/code/wenet/examples/aishell/s3/raw_wav/train/shards/shards_000000002.tar.gz
对于网络文件,例如 OSS,data.list 的形式为:
https://examplebucket.oss-cn-hangzhou.aliyuncs.com/exampledir/1.tar.gz
https://examplebucket.oss-cn-hangzhou.aliyuncs.com/exampledir/2.tar.gz
实现细节
如何做数据的 Distributed Partition? 根据rank 和 num_workers 对数据进行分割即可,如以下代码所示:
class DistributedSampler:
def __init__(self, shuffle=True, partition=True):
self.epoch = -1
self.update()
self.shuffle = shuffle
self.partition = partition
def set_epoch(self, epoch):
self.epoch = epoch
def sample(self, data):
data = list(range(len(data)))
if self.partition:
if self.shuffle:
random.Random(self.epoch).shuffle(data)
data = data[self.rank::self.world_size]
data = data[self.worker_id::self.num_workers]
return data
如何处理不平衡的数据?
在训练时每个 rank 上分配到的数据不一定完全一样多,即使分配到一样多的数据,在后续处理中也会有数据过滤,也会导致每个 rank 上分配的数据不一样多。使用 model.join() 处理每个rank上分配到的数据不均衡, 详请参考 https://pytorch.org/tutorials/advanced/generic_join.html#how-does-join-work
Buffer size, 目前的设置中有两处 buffer。
Shuffle buffer:用来 shuffle 数据,这个 buffer 设置的大小建议大于单个 shards 中含有的数据数量,每次相当于数据在两个 shards 间做 shuffle,增大了数据的随机性。例如每个 shard 中含1000 条语音时,可以设置 shuffle_buffer 为 1500.
Sort buffer: 用来对数据按语音帧数排序,这个操作很重要,能大大提高训练速度。
当然,两个 buffer 均不建议设置的过大,设置过大时,一方面比较占内存,二是程序可能卡在读满一个 buffer 这一步上。
Prefetch
Pytorch Dataloader 中使用 prefetch 来预读数据,prefetch的粒度是最终训练的batch,默认参数为2,也就是默认会预读两个 batch 的数据。在新 IO 的设计中,因为有前置 buffer 的存在,预读的数据可能已经在 buffer 中,从而没有去做真正的预读,等到下次训练时,buffer 中的数据不足,才会 on the fly 的再去填充 buffer,这时训练即 block 在了读数据上。简言之,就是在prefetch 很小的时候训练在部分时间会 block 在读数据上,因为前面某一级还在缓存数据。所以要设置比较大的 prefetch 来避免这个问题,例如:
- shards: 1000 条
- buffer: 1000 条
- batch_size: 32
- pretch: 100
相当于预读了 32 * 100,大约是3个shards的内容。
读者潜在的问题
- 为什么不直接使用 TFRecord?
TFRecord 是为 Tensorflow 专门设计的一个重量级的库,使用 protobuf 进行数据组织,protobuf 格式数据可读性差。Pytorch 中也缺乏相应的 TFRecord 的生态。
- 为什么不直接使用 webdataset?
WeNet 中 UIO 的设计理念和方式和 webdataset 非常相近。但从目前现有的实现来看,webdataset 是基于图像数据设计的,在语音任务中使用的话,仍需做大量的修改。并且,我们希望在小数据集时,依然使用原始的语音文件,webdataset 中也不支持。
- 为什么使用 LMDB, HDF5 等 key/value 的数据库?
这些数据库能够解决大量小文件的问题,但依然不能解决文件随机读取的问题,也不支持 Cloud IO。
转载自语音杂谈:https://mp.weixin.qq.com/s/C6vVQ455g_fVNtRBb9pa7A
标签:语音,数据,WeNet,self,UIO,IO,wav,data 来源: https://www.cnblogs.com/DataBaker/p/15774349.html