[Tf]数据管道Dataset

数据管道Dataset

如果需要训练的数据大小不大,例如不到1G,那么可以直接全部读入内存中进行训练,这样一般效率最高。

但如果需要训练的数据很大,例如超过10G,无法一次载入内存,那么通常需要在训练的过程中分批逐渐读入。

使用tf.data API可以构建数据输入管道,轻松处理大量的数据,不同的数据格式,以及不同的数据转换。

构建数据管道

可以从 Numpy array, Pandas DataFrame, Python generator, csv文件, 文本文件, 文件路径, tfrecords文件等方式构建数据管道。

其中通过Numpy array, Pandas DataFrame, 文件路径构建数据管道是最常用的方法。

通过tfrecords文件方式构建数据管道较为复杂,需要对样本构建tf.Example后压缩成字符串写到tfrecords文件,读取后再解析成tf.Example。

但tfrecords文件的优点是压缩后文件较小,便于网络传播,加载速度较快。

从numpy array构建数据管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import tensorflow as tf
import numpy as np 
from sklearn import datasets
iris = datasets.load_iris()

ds1 = tf.data.Dataset.from_tensor_slices((iris["data"],iris["target"]))
for features,label in ds1.take(5):
    print(features,label)

#tf.Tensor([5.1 3.5 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
#tf.Tensor([4.9 3.  1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
#tf.Tensor([4.7 3.2 1.3 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
#tf.Tensor([4.6 3.1 1.5 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
#tf.Tensor([5.  3.6 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)

从Pdandas DataFrame构建数据管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 从 Pandas DataFrame构建数据管道
import tensorflow as tf
from sklearn import datasets 
import pandas as pd
iris = datasets.load_iris()
dfiris = pd.DataFrame(iris["data"],columns = iris.feature_names)
ds2 = tf.data.Dataset.from_tensor_slices((dfiris.to_dict("list"),iris["target"]))

for features,label in ds2.take(3):
    print(features,label)

#{'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=5.1>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.5>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)
#{'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.9>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.0>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)
#{'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.7>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.2>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.3>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)

从Python generator构建数据管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 从Python generator构建数据管道
import tensorflow as tf
from matplotlib import pyplot as plt 
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# 定义一个从文件中读取图片的generator
image_generator = ImageDataGenerator(rescale=1.0/255).flow_from_directory(
                    "./data/cifar2/test/",
                    target_size=(32, 32),
                    batch_size=20,
                    class_mode='binary')

classdict = image_generator.class_indices
print(classdict)

def generator():
    for features,label in image_generator:
        yield (features,label)

ds3 = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))

从csv文件构建数据管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 从csv文件构建数据管道
ds4 = tf.data.experimental.make_csv_dataset(
      file_pattern = ["./data/titanic/train.csv","./data/titanic/test.csv"],
      batch_size=3, 
      label_name="Survived",
      na_value="",
      num_epochs=1,
      ignore_errors=True)

for data,label in ds4.take(2):
    print(data,label)

从文本文件构建数据管道

1
2
3
4
5
6
7
8
# 从文本文件构建数据管道

ds5 = tf.data.TextLineDataset(
    filenames = ["./data/titanic/train.csv","./data/titanic/test.csv"]
    ).skip(1) #略去第一行header

for line in ds5.take(5):
    print(line)

从文件路径构建数据管道

1
2
3
ds6 = tf.data.Dataset.list_files("./data/cifar2/train/*/*.jpg")
for file in ds6.take(5):
    print(file)

从threcords文件构建数据管道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import numpy as np

# inpath:原始数据路径 outpath:TFRecord文件输出路径
def create_tfrecords(inpath,outpath): 
    writer = tf.io.TFRecordWriter(outpath)
    dirs = os.listdir(inpath)
    for index, name in enumerate(dirs):
        class_path = inpath +"/"+ name+"/"
        for img_name in os.listdir(class_path):
            img_path = class_path + img_name
            img = tf.io.read_file(img_path)
            #img = tf.image.decode_image(img)
            #img = tf.image.encode_jpeg(img) #统一成jpeg格式压缩
            example = tf.train.Example(
               features=tf.train.Features(feature={
                    'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[index])),
                    'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img.numpy()]))
               }))
            writer.write(example.SerializeToString())
    writer.close()
    
create_tfrecords("./data/cifar2/test/","./data/cifar2_test.tfrecords/")

应用数据转换

Dataset数据结构应用非常灵活,因为它本质上是一个Sequece序列,其每个元素可以是各种类型,例如可以是张量,列表,字典,也可以是Dataset。

Dataset包含了非常丰富的数据转换功能。

  • map: 将转换函数映射到数据集每一个元素。

  • flat_map: 将转换函数映射到数据集的每一个元素,并将嵌套的Dataset压平。

  • interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。

  • filter: 过滤掉某些元素。

  • zip: 将两个长度相同的Dataset横向铰合。

  • concatenate: 将两个Dataset纵向连接。

  • reduce: 执行归并操作。

  • batch : 构建批次,每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。

  • padded_batch: 构建批次,类似batch, 但可以填充到相同的形状。

  • window :构建滑动窗口,返回Dataset of Dataset.

  • shuffle: 数据顺序洗牌。

  • repeat: 重复数据若干次,不带参数时,重复无数次。

  • shard: 采样,从某个位置开始隔固定距离采样一个元素。

  • take: 采样,从开始位置取前几个元素。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#map:将转换函数映射到数据集每一个元素

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
ds_map = ds.map(lambda x:tf.strings.split(x," "))
for x in ds_map:
    print(x)

# tf.Tensor([b'hello' b'world'], shape=(2,), dtype=string)
# tf.Tensor([b'hello' b'China'], shape=(2,), dtype=string)
# tf.Tensor([b'hello' b'Beijing'], shape=(2,), dtype=string)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#flat_map:将转换函数映射到数据集的每一个元素,并将嵌套的Dataset压平。

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
ds_flatmap = ds.flat_map(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
for x in ds_flatmap:
    print(x)

#tf.Tensor(b'hello', shape=(), dtype=string)
#tf.Tensor(b'world', shape=(), dtype=string)
#tf.Tensor(b'hello', shape=(), dtype=string)
#tf.Tensor(b'China', shape=(), dtype=string)
#tf.Tensor(b'hello', shape=(), dtype=string)
#tf.Tensor(b'Beijing', shape=(), dtype=string)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
ds_interleave = ds.interleave(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
for x in ds_interleave:
    print(x)

#tf.Tensor(b'hello', shape=(), dtype=string)
#tf.Tensor(b'hello', shape=(), dtype=string)
#tf.Tensor(b'hello', shape=(), dtype=string)
#tf.Tensor(b'world', shape=(), dtype=string)
#tf.Tensor(b'China', shape=(), dtype=string)
#tf.Tensor(b'Beijing', shape=(), dtype=string)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#filter:过滤掉某些元素。

ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
#找出含有字母a或B的元素
ds_filter = ds.filter(lambda x: tf.strings.regex_full_match(x, ".*[a|B].*"))
for x in ds_filter:
    print(x)

# tf.Tensor(b'hello China', shape=(), dtype=string)
# tf.Tensor(b'hello Beijing', shape=(), dtype=string)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#zip:将两个长度相同的Dataset横向铰合。

ds1 = tf.data.Dataset.range(0,3)
ds2 = tf.data.Dataset.range(3,6)
ds3 = tf.data.Dataset.range(6,9)
ds_zip = tf.data.Dataset.zip((ds1,ds2,ds3))
for x,y,z in ds_zip:
    print(x.numpy(),y.numpy(),z.numpy())

# 0 3 6
# 1 4 7
# 2 5 8
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#condatenate:将两个Dataset纵向连接。

ds1 = tf.data.Dataset.range(0,3)
ds2 = tf.data.Dataset.range(3,6)
ds_concat = tf.data.Dataset.concatenate(ds1,ds2)
for x in ds_concat:
    print(x)

#tf.Tensor(0, shape=(), dtype=int64)
#tf.Tensor(1, shape=(), dtype=int64)
#tf.Tensor(2, shape=(), dtype=int64)
#tf.Tensor(3, shape=(), dtype=int64)
#tf.Tensor(4, shape=(), dtype=int64)
#tf.Tensor(5, shape=(), dtype=int64)
1
2
3
4
5
6
7
8
#reduce:执行归并操作。

ds = tf.data.Dataset.from_tensor_slices([1,2,3,4,5.0])
result = ds.reduce(0.0,lambda x,y:tf.add(x,y))
result

#<tf.Tensor: shape=(), dtype=float32, numpy=15.0>

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#batch:构建批次,每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。 

ds = tf.data.Dataset.range(12)
ds_batch = ds.batch(4)
for x in ds_batch:
    print(x)

#tf.Tensor([0 1 2 3], shape=(4,), dtype=int64)
#tf.Tensor([4 5 6 7], shape=(4,), dtype=int64)
#tf.Tensor([ 8  9 10 11], shape=(4,), dtype=int64)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#padded_batch:构建批次,类似batch, 但可以填充到相同的形状。

elements = [[1, 2],[3, 4, 5],[6, 7],[8]]
ds = tf.data.Dataset.from_generator(lambda: iter(elements), tf.int32)

ds_padded_batch = ds.padded_batch(2,padded_shapes = [4,])
for x in ds_padded_batch:
    print(x)    

#tf.Tensor(
#[[1 2 0 0]
# [3 4 5 0]], shape=(2, 4), dtype=int32)
#tf.Tensor(
#[[6 7 0 0]
# [8 0 0 0]], shape=(2, 4), dtype=int32)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#window:构建滑动窗口,返回Dataset of Dataset.

ds = tf.data.Dataset.range(5)
#window返回的是Dataset of Dataset,可以用flat_map压平
ds_window = ds.window(3, shift=1).flat_map(lambda x: x.batch(3,drop_remainder=True)) 
for x in ds_window:
    print(x)

# tf.Tensor([0 1 2], shape=(3,), dtype=int64)
# tf.Tensor([1 2 3], shape=(3,), dtype=int64)
# tf.Tensor([2 3 4], shape=(3,), dtype=int64)
# tf.Tensor([3 4 5], shape=(3,), dtype=int64)
1
2
3
4
5
6
7
8
#shuffle:数据顺序洗牌。

ds = tf.data.Dataset.range(12)
ds_shuffle = ds.shuffle(buffer_size = 5)
for x in ds_shuffle:
    print(x)
    

1
2
3
4
5
6
7
8
#repeat:重复数据若干次,不带参数时,重复无数次。

ds = tf.data.Dataset.range(3)
ds_repeat = ds.repeat(3)
for x in ds_repeat:
    print(x)


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#shard:采样,从某个位置开始隔固定距离采样一个元素。

ds = tf.data.Dataset.range(12)
ds_shard = ds.shard(3,index = 1)

for x in ds_shard:
    print(x)

#tf.Tensor(1, shape=(), dtype=int64)
#tf.Tensor(4, shape=(), dtype=int64)
#tf.Tensor(7, shape=(), dtype=int64)
#tf.Tensor(10, shape=(), dtype=int64)

1
2
3
4
5
6
7
8
#take:采样,从开始位置取前几个元素。

ds = tf.data.Dataset.range(12)
ds_take = ds.take(3)

list(ds_take.as_numpy_iterator())

#[0, 1, 2]

提升管道性能

训练深度学习模型常常会非常耗时。

模型训练的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数迭代。

参数迭代过程的耗时通常依赖于GPU来提升。

而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。

以下是一些构建高效数据管道的建议。

  • 1,使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。

  • 2,使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起。

  • 3,使用 map 时设置num_parallel_calls 让数据转换过程多进程执行。

  • 4,使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。

  • 5,使用 map转换时,先batch, 然后采用向量化的转换方法对每个batch进行转换。

updatedupdated2020-04-202020-04-20