dataset pipeline

本教程旨在:

  • 说明 TensorFlow 输入流水线本质上是一个 ETL 流程。
  • 介绍围绕 tf.data API 的常见性能优化。
  • 讨论转换的应用顺序对性能的影响。
  • 总结设计高性能 TensorFlow 输入流水线的最佳做法。

输入流水线结构

我们可以将典型的 TensorFlow 训练输入流水线视为 ETL 流程:

  • 提取:从永久性存储(可以是 HDD 或 SSD 等本地存储或 GCS 或 HDFS 等远程存储)读取数据。
  • 转换:使用 CPU 核心解析数据并对其执行预处理操作,例如图像解压缩、数据增强转换(例如随机裁剪、翻转和颜色失真)、重排和批处理。
  • 加载:将转换后的数据加载到执行机器学习模型的加速器设备(例如,GPU 或 TPU)上。
    这种模式可高效利用 CPU,同时预留加速器来完成对模型进行训练的繁重工作。此外,将输入流水线视为 ETL 流程可提供便于应用性能优化的结构。

使用 tf.estimator.EstimatorAPI时,前两个阶段(提取和转换)是在 input_fn(传递给 tf.estimator.Estimator.train) 中捕获的。代码可能如下(简单序列)实现所示:

def parse_fn(example):
  "Parse TFExample records and perform simple data augmentation."
  example_fmt = {
    "image": tf.FixedLengthFeature((), tf.string, ""),
    "label": tf.FixedLengthFeature((), tf.int64, -1)
  }
  parsed = tf.parse_single_example(example, example_fmt)
  image = tf.image.decode_image(parsed["image"])
  image = _augment_helper(image)  # augments image using slice, reshape, resize_bilinear
  return image, parsed["label"]

def input_fn():
  files = tf.data.Dataset.list_files("/path/to/dataset/train-*.tfrecord")
  dataset = files.interleave(tf.data.TFRecordDataset)
  dataset = dataset.shuffle(buffer_size=FLAGS.shuffle_buffer_size)
  dataset = dataset.map(map_func=parse_fn)
  dataset = dataset.batch(batch_size=FLAGS.batch_size)
  return dataset

下一部分以此输入流水线为基础,并添加了性能优化。

优化性能

由于新型计算设备(例如 GPU 和 TPU)可以不断提高神经网络的训练速度,因此,CPU 处理很容易成为瓶颈。tf.data API 为用户提供构建块来设计可高效利用 CPU 的输入流水线,并优化 ETL 流程的每个步骤。

流水线

要执行训练步骤,您必须首先提取并转换训练数据,然后将其提供给在加速器上运行的模型。但是,在一个简单的同步实现中,当 CPU 正在准备数据时,加速器处于空闲状态。相反,当加速器正在训练模型时,CPU 处于空闲状态。因此,训练步的用时是 CPU 预处理时间和加速器训练时间的总和。

流水线将训练步骤的预处理和模型执行过程重叠到一起。当加速器正在执行第 N 个训练步时,CPU 正在准备第 N+1 步的数据。这样做不仅可以最大限度地缩短训练的单步用时(而不是总用时),而且可以缩短提取和转换数据所需的时间

如果不使用流水线,CPU 和 GPU/TPU 在大部分时间都处于空闲状态:
dataset_withoutpipeline

使用流水线可以显著减少空闲时间:

datasets_with_pipelining

tf.data API 通过 tf.data.Dataset.prefetch 转换提供了一种软件流水线机制,该机制可用于将生成数据的时间和使用数据的时间分离开。具体而言,该转换使用后台线程和内部缓冲区,以便在请求元素之前从输入数据集中预取这些元素。因此,为了实现上图所示的流水线效果,您可以将 prefetch(1) 作为最终转换添加到数据集流水线中(如果单步训练使用 n 个元素,则添加 prefetch(n))。

要将此项更改应用于我们正在运行的示例,请将:

dataset = dataset.batch(batch_size=FLAGS.batch_size)
return dataset

更改为:

dataset = dataset.batch(batch_size=FLAGS.batch_size)
dataset = dataset.prefetch(buffer_size=FLAGS.prefetch_buffer_size)
return dataset

请注意,只要可以将“提供方”的工作与“使用方”的工作重叠,预取转换就会发挥作用。

并行处理数据提取

在实际设置中,输入数据可能会远程存储(例如,GCS 或 HDFS),这是因为输入数据不适合本地存储,或因为训练是分布式训练,因此在每台机器上复制输入数据没有意义。非常适合在本地读取数据的数据集流水线在远程读取数据时可能会遇到 I/O 瓶颈,这是因为本地存储和远程存储之间存在以下差异:

  • 首字节时间:与本地存储相比,从远程存储读取文件的首字节所用时间可能要多出几个数量级。
  • 读取吞吐量:虽然远程存储可提供较大的聚合宽带,但读取单个文件可能只能利用此宽带的一小部分。

此外,将原始字节读入内存中后,可能还需要对数据进行反序列化或解密(例如,protobuf),这会带来额外的开销。无论数据是在本地还是远程存储,都存在这种开销,但如果未有效预取数据,则在远程存储的情况下可能更糟。

为了降低各种数据提取开销的影响,tf.data API 提供了 tf.contrib.data.parallel_interleave 转换。使用此转换可以并行执行其他数据集(例如数据文件读取器)并交错这些数据集的内容。可以通过 cycle_length 参数指定要重叠的数据集的数量。

下图说明了为 parallel_interleave 转换提供 cycle_length=2 的效果:
parallel_interleave

如将

dataset = files.interleave(tf.data.TFRecordDataset)

改为:

dataset = files.apply(tf.contrib.data.parallel_interleave(tf.data.TFRecordDataset,
cycle_length=FLAGS.num_parallel_readers))

由于负载或网络事件,远程存储系统的吞吐量可能会随时间而变化。鉴于这种差异,parallel_interleave 转换可以选择使用预取(如需了解详情,请参阅 tf.contrib.data.parallel_interleave)。

默认情况下,parallel_interleave 转换可提供元素的确定性排序以帮助实现可再现性。作为预取的替代方案(在某些情况下可能效率低下),parallel_interleave 转换还提供了一个可提升性能但无法保证排序的选项。特别是,如果 sloppy 参数设为 true,则该转换可在系统请求下一个元素时暂时跳过其元素不可用的文件,从而放弃该转换的确定性排序

并行处理数据转换

准备批次数据时,可能需要预处理输入元素。为此,tf.data API 提供了 tf.data.Dataset.map 转换,以将用户定义的函数(例如,正在运行的示例的 parse_fn)应用于输入数据集的每个元素。由于输入元素彼此独立,因此可以跨多个 CPU 核心并行执行预处理。为实现这一点,map 转换提供了 num_parallel_calls 参数来指定并行处理级别。例如,下图说明了将 num_parallel_calls=2 设置为 map 转换的效果:

如何为 num_parallel_calls 参数选择最佳值取决于硬件、训练数据的特征(例如其大小和形状)、映射函数的成本以及同时在 CPU 上进行的其他处理;一个简单的启发法是设为可用 CPU 核心的数量。例如,如果执行以上示例的机器有 4 个核心,则设置 num_parallel_calls=4 会更高效。另一方面,将 num_parallel_calls 设置为远大于可用 CPU 数量的值可能会导致调度效率低下,进而减慢速度。

如将

dataset =  dataset.map(map_func=parse_fn)

改为:

dataset = dataset.map(map_func=parse_fn,num_parallel_calls=FLAGS.num_parallel_calls)

此外,如果批次大小为数百或数千,那么并行处理批次创建过程还可能给流水线带来更大的优势。为此,tf.data API 提供了 tf.contrib.data.map_and_batch转换,它可以将映射和批次转换“混合”在一起。
如将:

dataset = dataset.map(map_func=parse_fn,num_parallel_calls=FLAGS.num_parallel_calls)
dataset = dataset.batch(batch_size=FLAGS.batch_size)

改为:

dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func=parse_fn,batch_size=FLAGS.batch_size))

性能考虑因素

重复和重排

tf.data.Dataset.repeat 转换会将输入数据重复有限(或无限)次;每次数据重复通常称为一个周期。tf.data.Dataset.shuffle 转换会随机化数据集样本的顺序。

如果在 shuffle 转换之前应用 repeat 转换,则系统会对周期边界进行模糊处理。也就是说,某些元素可以在其他元素出现之前重复出现。另一方面,如果在重复转换之前应用 shuffle 转换,那么在每个周期开始时性能可能会降低,因为需要初始化 shuffle 转换的内部状态。换言之,前者(repeatshuffle 之前)可提供更好的性能,而后者(repeatshuffle 之前)可提供更强的排序保证。

如果可能,建议您使用 tf.contrib.data.shuffle_and_repeat 混合转换,这样可以达到两全其美的效果(良好的性能和强大的排序保证)。否则,我们建议在重复之前进行重排。

最佳做法摘要

使用 prefetch 转换可将提供方和使用方的工作重叠。我们特别建议将 prefetch(n)(其中 n 是单步训练使用的元素数/批次数)添加到输入流水线的末尾,以便将在 CPU 上执行的转换与在加速器上执行的训练重叠。

通过设置 num_parallel_calls 参数并行处理 map 转换。建议您将其值设为可用 CPU 核心的数量.

  • 如果您使用 batch 转换将预处理元素组合到一个批次中,建议您使用 map_and_batch 混合转换;特别是在您使用的批次较大时。

  • 如果您要处理远程存储的数据并/或需要反序列化,建议您使用 parallel_interleave 转换来重叠从不同文件读取(和反序列化)数据的操作。

  • 向量化传递给 map 转换的低开销用户定义函数,以分摊与调度和执行相应函数相关的开销。

  • 如果内存可以容纳您的数据,请使用 cache 转换在第一个周期中将数据缓存在内存中,以便后续周期可以避免与读取、解析和转换该数据相关的开销。

  • 如果预处理操作会增加数据大小,建议您首先应用 interleaveprefetchshuffle(如果可以)以减少内存使用量。

  • 建议您在应用 repeat 转换之前先应用 shuffle 转换,最好使用 shuffle_and_repeat 混合转换。

One text example

首先我们需要加载一个名为file.txt的数据集:

I use TensorFlow
You use PyTorch
Both are great
dataset = tf.data.TextLineDataset("file.txt")

我们可以创造一个迭代器对象 iterator object 在数据集上,

iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

one_shot_iterator方法创建一个迭代器,该迭代器能够在数据集上迭代一次。换句话说,一旦到达数据集的末尾,它将停止生成元素并引发异常。

现在,next_element是一个图的节点(tf.Variable),它将在每次执行时包含数据集上iterator的下一个元素。现在,让我们运行它:

with tf.Session() as sess:
  for i in range(3):
    print(sess.run(next_element))
>'I use Tensorflow'
>'You use PyTorch'
>'Both are great'

我们可以轻松地将map应用于数据集。例如,按空格分割单词就像添加一行一样简单:

dataset = dataset.map(lambda string: tf.string_split([string]).values)

shuffle the dataset 也是直接了当的:

dataset = dataset.shuffle(buffer_size=3)

它将加载$3\times3$的元素,并在每次迭代中shuffle它们。

您还可以创建批:

dataset = dataset.batch(2)

并预取数据(换句话说,它总是有一个批准备加载)。

dataset = dataset.prefetch(1)

完整的代码为:

import tensorflow as tf
dataset =  tf.data.TextLineDataset("./file.txt")
dataset = dataset.map(lambda string: tf.string_split([string]).values)
dataset = dataset.shuffle(buffer_size=5)
dataset = dataset.batch(2)
dataset = dataset.prefetch(1)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
with tf.Session() as sess:
        print(sess.run(next_element))

tf.shuffle 使用指南

tf.shuffle(
    buffer_size,
    seed=None,
    reshuffle_each_iteration=None
)

随机打乱此数据集的元素。

该数据集使用buffer_size元素填充缓冲区,然后从该缓冲区随机抽取元素,用新元素替换所选元素。对于完美的洗牌,需要大于或等于数据集的完整大小的缓冲区大小

例如,如果数据集包含10,000个元素,但是buffer_size被设置为1,000,那么shuffle将首先从缓冲区中的前1,000个元素中随机选择一个元素。一旦选择了一个元素,它在缓冲区中的空间将被下一个(即1,001-st)元素替换,以维护1,000个元素缓冲区。

参数:
buffer_size:一个tf.int64标量 tf.Tensor,表示新数据集将从中采样的数据集中元素的数量。
seed:(可选)一个tf.int64标量 tf.Tensor,表示用来创建分布的随机种子。
reshuffle_each_iteration:(可选)。一个布尔值,如果为真,则表示每次遍历数据集时,数据集都应该被伪随机地重新shuffle。(default to True)。
Returns:
Dataset: A Dataset.

它的行为与上面的类似,但是由于init_op,我们可以选择从头“restart”。当我们想要执行多个epochs时,这将变得非常方便!

dataset = tf.data.TextLineDataset("file.txt")
iterator = dataset.make_initial_iterator()
next_element = iterator.get_next()
init_op = iterator.initializer
with tf.Session() as sess:
     # Initialize the iterators
     sess.run(init_op)
     print(sess.run(next_element))
     print(sess.run(next_element))
     # Move the iterator back to the begining
     sess.run(init)
     print(sess.run(next_element))
> 'I use Tensorflow'
  'You use PyTorch'
  'I use Tensorflow' # Iterator moved back at the beginning

由于我们在不同的epoch中只使用一个session,因此需要能够重新启动迭代器。其他一些方法(如tf.Estimator)通过在每个epoch中创建一个新会话来减少使用可初始化迭代器的需求。但是这是有代价的:每次调用estimator.train()或estimator.evaluate()时,都必须重新加载权重和图并重新初始化。

Building an image data pipeline

下面是图像数据集的样子。这里我们已经有了jpeg图像的文件名列表和相应的标签列表。我们采用以下步骤进行训练:

  • 从文件名和标签的切片创建数据集
  • 使用与数据集长度相等的缓冲区大小来shuffle数据。
  • 解析从文件名到像素值的图像。使用多线程来提高预处理的速度(可选用于训练)图像数据增强。使用多线程来提高预处理的速度
  • 批处理图像
  • 预取一批,以确保随时可以提供一批
dataset = tf.data.Dataset.from_tensor_slices((filenames,labels))
dataset = dataset.shuffle(len(filenames))
dataset = dataset.map(parse_function,num_parallel_calls=4)
dataset = dataset.map(train_preprocess,num_parallel_calls=4)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(1)

parse_function的作用:

  • read the content of the file
  • decode using jpeg format
  • convert to float values in [0, 1]
  • resize to size (64, 64)
def parse_function(filename, label):
    image_string = tf.read_file(filename)

    # Don't use tf.image.decode_image, or the output shape will be undefined
    image = tf.image.decode_jpeg(image_string, channels=3)

    # This will convert to float values in [0, 1]
    image = tf.image.convert_image_dtype(image, tf.float32)

    image = tf.image.resize_images(image, [64, 64])
    return resized_image, label

最后,train_preprocess可以在训练过程中是可选的,进行数据扩充:

  • 水平翻转图像的概率为1/2
  • 应用随机亮度和饱和度
def train_preprocess(image,label):
    image = tf.image.random_flip_left_right(image)

    image = tf.image.random_brightness(image,max_delta=32.0/255.0)
    image = tf.image.random_saturation(image,lower=0.5,upper = 1.5)

    # Make sure image is still in [0,1]
    image = tf.clip_by_value(image,0.0,1.0)
    return image,label

Building a text data pipeline

假设我们的任务是命名实体识别。换句话说,输入是一个句子,输出是每个单词的标签,比如:

John   lives in New   York
B-PER  O     O  B-LOC I-LOC

因此,我们的数据集需要同时加载句子和标签。我们将把它们存储在两个不同的文件中,一个句子.txt文件(每行一个)和一个标签.txt文件(包含标签)。例如:

# sentences.txt
John lives in New York
Where is John ?
# labels.txt
B-PER O O B-LOC I-LOC
O O B-PER O

构造 tf.data object。遍历这些文件的数据对象很简单,

sentences = tf.data.TextLineDataset("sentence.txt")
labels = tf.data.TextLineDataset("labels.txt")

Zip datasets together

在这个阶段,我们可能需要同时遍历这两个文件。这种操作通常称为“zip”。幸运的是,tf.data中带有这样一个函数:

# zip the sentence and the labels togehters
dataset = tf.data.Dataset.zip((sentences, labels))

# Create a one shot iterator over the zipped dataset
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
#  Actually run in a Session
with tf.Session() as sess:
   for i in range(2):
       print(sess.run(dataset))
> ('John lives in New York', 'B-PER O O B-LOC I-LOC')
  ('Where is John ?', 'O O B-PER O')

  转载规则: DorMin dataset pipeline

 上一篇
multimodal-neural-translation  3 multimodal-neural-translation 3
本文主要介绍多模态神经翻译的几个任务:Image-pivoted Zero-resource Translation (Bilingual lexicon induction)Unsupervised Bilingual Lexicon
下一篇 
Tensorflow指南(1) Tensorflow指南(1)
变量我们使用 tf.Variable 类操作变量。tf.Variable 表示可通过对其运行操作来改变其值的张量。与 tf.Tensor 对象不同,tf.Variable 存在于单个 session.run 调用的上下文之外,变量只存在与一
2019-09-17
  目录