Building a Data Pipeline with TensorFlow

Exploring the tf.data API in TensorFlow

When building machine learning models, we often tend to focus on the more glamorous aspects of machine learning, such as finding the best architecture and tuning hyperparameters, that we fail to recognize that our dataset is the lifeblood of our entire pipeline.

Current hardware for ML, such as TPUs and GPUs, are built with several hundred cores that enable them to massively parallelize machine learning workloads. This is a really great development, but in order to leverage this massive parallelism of such hardware, we need the right set of tools to enable us to fully utilize such high compute capabilities.

In this article, we’re going to explore the tf.data API, a very powerful tool built into TensorFlow that provides the flexibility needed to build highly-optimized data pipelines.

tf.data API: Overview

The tf.data API was built with three major focal areas. These are:

  1. Performance
  2. Flexibility
  3. Ease of use

In terms of performance, tf.data was built in such a way that our input data pipeline can harness the tremendous processing power of hardware accelerators to keep up with accelerated model execution (forward and backward propagation). tf.data provides us with the tools to squeeze out every bit of performance from these hardware accelerators.

On the flexibility side, tf.data widens the spectrum of options in terms of the kinds of data we want to use, without having to rely on any external tools.

Since TensorFlow was built to democratize AI, most of its tools are built to enable seamless usage by the average programmer, and tf.data is no exception. tf.data has a simplified interface that provides an intuitive way to interact with and manage datasets with minimal effort.

tf.data uses the Extract Transform Load (ETL) format to propagate data from its source all the way into your model. We’re going to look at these stages independently and the tools tf.data provides us to perform each of these stages.

Extract

Usually, we have our dataset stored on disk or memory or even sharded across a distributed file system, and we need a way to load these massive datasets into our model.

At this stage of our input pipeline, tf.data offers us two main benefits: flexibility and performance. On the flexibility side, tf.data provides us with the ability to load datasets of many different data types. This provides us with a wide spectrum of options when determining the structure in which our data should be in.

Some common sources of data supported by tf.data are Python lists, TFRecords, CSV files, and several image formats such as JPG and PNG, text, etc. This means that we don’t need any extra tools such as Pandas to build our data pipeline — all of that functionality is built into tf.data. How amazing!

Let’s look at some code examples of how to use tf.data extract data from our disk.


train_path = '/content/ICLR/train/train/'
train_list = glob.glob(train_path+"*")
train_images_list = glob.glob(train_path+"*/*")
train_ds = tf.data.Dataset.list_files(train_images_list)

Lines 1 to 3 of the snippet above return a Python list consisting of paths of all images in our dataset directory. That’s all we need to create a tf.data.Dataset object to start our data pipelining.

Let’s say that, instead of a directory of images, we have our dataset represented as a tensor (RaggedTensor, SparseTensor, etc). In cases like this, we can just create a tf.data.Dataset object by using the method from_tensor_slices. This method even grants us the flexibility of creating the Dataset object from Python lists as well as from NumPy arrays.

Let’s look at a snippet to create Dataset objects from tensors and arrays.


# creating dataset object from a numpy array
np_array = np.array([[1,2,3,4,5],[1,2,3,4,6]])
ds = tf.data.Dataset.from_tensor_slices(np_array)

# creating dataset object from python list
py_list = [1,2,3,4,4]
ds = tf.data.Dataset.from_tensor_slices(py_list)

# creating dataset object from TensorFlow Tensor(SparseTensor, RaggedTensor...)
ragged_dataset = tf.data.Dataset.from_tensors(tf.SparseTensor(indices=[[0, 0], [1, 2]], values=[1, 2], dense_shape=[3, 4])

tensor_dataset = tf.data.Dataset.from_tensor_slices(
    tf.random.uniform([4, 10], minval=1, maxval=10, dtype=tf.int32))

We can also use Python generators to create Dataset objects by passing the callable of the function (function name with no parentheses) and arguments of the generator function as an array. This also requires that we pass the datatype of our outputs into the Dataset constructor. Let’s see how.


def count(stop):
  i = 0
  while i<stop:
    yield i
    i += 1
ds_counter = tf.data.Dataset.from_generator(count, args=[25], output_types=tf.int32, output_shapes = (),)

Notice that we pass count as a callable and its arguments (stop) as a list (args). We also pass in the datatype of the output as well as its shape.

Let’s revisit the example of building a Dataset object for images in a directory. You might be wondering, how is tf.data going to load images from a Dataset object that has given just paths to the images and what the size of images are going to be. Well that’s what we are going to look into in the second stage of the pipeline.

Transform

What happens at the transform stage is exactly what its name indicates. Here, we build a mini pipeline of instructions to transform our data representation — in our case a list of filepaths—to something that our neural network understands, like a tensor.

In some cases, we might not need to do this, such as in the case where we pass a NumPy array or TensorFlow tensor into the dataset object directly. We build this pipeline using a function or a series of functions, and in this function, we describe the behavior of our transformations.

This is also the stage where we perform data augmentation to reduce overfitting in our model. We’re going to use the example of building a pipeline for images in a directory to demonstrate the transform stage. Recall the we created a Dataset object from a Python list of filepaths to each image. Let’s see how we can build the second stage of our pipeline with this Dataset object.


#data augmentation function
IMG_WIDTH=224
IMG_HEIGHT=224
def decode_img(img):
  img = tf.image.decode_jpeg(img, channels=3)
  img = tf.image.convert_image_dtype(img, tf.float32) 
  img = tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT]) 
  img = tf.image.random_flip_left_right(img)
  img = tf.image.random_flip_up_down(img)
  img = tf.image.random_brightness(img, 0.3)
  return img

def get_label(path):
  part_list = tf.strings.split(path, "/")
  # in the case where each class of images is in one folder
  return part_list[-2] == class_names

def process_path(file_path):
  label = get_label(file_path)
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

The input to the decode_img function is a tensor containing encoded, somewhat gibberish data, which is loaded from the image file path using tf.io.read_file in the second line of the process_path function.

We convert this into meaningful numeric data using the tf.image.decode function, and the rest from the decoding line just performs data augmentation and returns a transformed tensor.

In the case where we’re performing image classification, we might want to extract the labels from the image path. We extract this in the get_label function and return it as a one-hot encoded vector.

Now that we have our function to load and transform our dataset, we just pass it into the tf.data.Dataset.map function as a callable and enable the functionality of performing this mini-pipeline to multiple threads. Lets see how:


num_threads = 5 
train_ds = train_ds.map(process_path, num_parallel_calls=num_threads)

We might also want to batch our dataset and enable shuffle functionality. We batch the dataset using the tf.data.Dataset.batch function and pass into it the batch size. To enable shuffle functionality, we call the tf.data.Dataset.shuffle function and pass into it a buffer size, which should be at least equal to the size of our dataset to ensure proper shuffling.


train_ds = train_ds.cache()
train_ds = train_ds.shuffle(10000)
train_ds = train_ds.repeat(num_epochs)
train_ds = train_ds.batch(128)
train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

tf.data.Dataset.cache isn’t really recommend when we have a really huge dataset, since it loads the dataset once into memory for the entire duration of model training.

One feature of the tf.data API that personally fascinates me is the prefetch functionality. What it does is, during model execution (forward and back propagation), it drives the CPU to prepare the next batch of our dataset to be fed into our model immediately after training on the previous batch. This is really exciting, since it allows us to fully utilize our hardware resources and rescue our GPU from data starvation.

Formally, this was the sequence of executions of CPU and GPU:

Currently, this is how our CPU and GPU/TPU execute our entire training pipeline with tf.data.Dataset.prefetch. You can see that it overlaps model execution and data preparation to reduce the overhead that might be involved in the non-overlapping sequence.

We’re almost done with the tedious parts of our data pipeline. The last stage, where we load our dataset to feed it into our neural network, is the easiest part.

Load

There are couple of ways to load our dataset into memory since our model sits in memory. But we’re going to look at only two methods, which are recommended due to their simplicity and efficiency.

The first method is to use a ‘for loop’. It’s as simple as that. We iterate through batches of our dataset using a ‘for loop’, and with each batch, we execute our model. Let’s see how this works with our image dataset example:


steps_per_epc = len(train_images_list)/batch_size
for epoch in num_epochs:
  for images, labels in train_ds.take(steps_per_epc):
    execute_model(images, labels)
    ....

If you think this is the easiest to implement, then you’d unfortunately be mistaken. The second method is even easier and requires only one line of code. All we need to do is pass our dataset object into the model.fit() function along side other arguments, such as number of epochs and steps per epoch.

And that’s it, the tf.keras API takes care of the entire pipeline. What’s even more appealing about this method is that tf.keras logs metrics about the training progress on our display and also displays a progress bar as we iterate through our dataset.

model.fit(train_ds, epochs=num_epochs, steps_per_epoch=steps_per_epc)

Conclusion

That’s a wrap for this introductory tutorial on building data input pipelines with TensorFlow’s powerful data API. I can assure you that there are many more functionalities built into the tf.data API that we didn’t cover, but hopefully this tutorial will serve as a stepping stone for you to build even more powerful data pipelines with more advanced tf.data features.

As a next step, I challenge you to build data pipelines for more complex models like Natural language Processing models, Generative Adversarial Models and others and also explore the other functionalities tf.data provides

You can find more about the tf.data API on the official TensorFlow website.

Fritz

Our team has been at the forefront of Artificial Intelligence and Machine Learning research for more than 15 years and we're using our collective intelligence to help others learn, understand and grow using these new technologies in ethical and sustainable ways.

Comments 0 Responses

Leave a Reply

Your email address will not be published. Required fields are marked *

wix banner square