Chapter 13 - Loading and Preprocessing Data with TensorFlow

The meeting will start with a brief summary of the chapter, then we'll walk through Exercise 10., loading and preprocessing the Large Movie Review Dataset then building and training a binary classification model containing an Embedding layer.

The Data API

  • dataset: iterable sequence of data instances
  • tensor to dataset: dataset = tf.data.Dataset.from_tensor_slices(X)
  • transform datasets with methods such as (copy data, not modify it):
    • .map(...): apply function item by item
    • .take(n): grab n items from dataset
    • .batch(n): split dataset up into batches of length n
  • Shuffle data:
    • data loaded sequentially into buffer then picked out randomly... buffer must be large enoughto provide sufficient randomness
dataset = tf.data.Dataset.range(10).repeat(3# 0 to 9, three times
dataset = dataset.shuffle(buffer_size=5seed=42).batch(7)
  • Shuffle more:
    • pre-shuffle data
    • split into multiple files and interleave while loading
  • Read files line by line with interleaving:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers)
  • line-by-line parsing from CSV using fields = tf.io.decode_csv(line, record_defaults=defs)where defs gives default values for missing data

  • Concurrency:
    • num_parallel_calls argument
    • tf.data.experimental.AUTOTUNE (dynamic, also affects other arguments)
    • dataset.batch(batch_size).prefetch(1) to stay one step ahead of training

preprocessing

  • A typical preprocessing pipeline:
    • dataset from list of filepaths
    • interleave lines of data from the filepaths
    • preprocess each line: parse data, transform
    • repeat and shuffle the data
    • split into batches with prefetching
X_mean, X_std = [...] # mean and scale of each feature in the training set
n_inputs = 8
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y # (X,y) tuple compatible with Keras model
def csv_reader_dataset(filepathsrepeat=1n_readers=5,
                        n_read_threads=Noneshuffle_buffer_size=10000,
                        n_parse_threads=5batch_size=32):
   dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
   dataset = dataset.shuffle(shuffle_buffer_size)
    return dataset.batch(batch_size).prefetch(1)
  • other functions from TF Data
    • concatenate() Creates a Dataset by concatenating given dataset with this dataset.
    • filter() Filters this dataset according to predicate.
    • flat_map() Maps map_func across this dataset and flattens the result.
    • from_generator() Creates a Dataset whose elements are generated by generator.
    • from_tensors() Creates a Dataset with a single element, comprising the given tensors.
    • padded_batch() Combines consecutive elements of this dataset into padded batches.
    • shard() Creates a Dataset that includes only 1/num_shards of this dataset.
    • window() Combines input elements into a dataset of windows.
    • zip() Creates a Dataset by zipping together the given datasets.

TFRecord Format

  • if loading/parsing from e.g. csv files is the bottle-neck, consider TFRecord
  • efficient serialized data format
  • can be used in TF Functions (with TF protocol buffer data structures)
  • protocol buffer: "protobuf", defines format for serial data
  • use TF Protobufs (Example, SequenceExample) so data can be serialized/parsed with TF functions ()
  • typical workflow for writing to TFRecord:
    • read data from CSV to dataset
    • create Example protobuf for each instance
    • serialize the dataset
    • save to several TFRecord files with shuffling
  • typical workflow for loading and parsing from TFRecord:
    • define description dictionary of serialized data in the TFRecord
    • read data in batches from TFRecord using TFRecordDataset class
    • parse the examples in each batch
feature_description = {
    "name": tf.io.FixedLenFeature([], tf.string, default_value=""),
    "id": tf.io.FixedLenFeature([], tf.int64, default_value=0),
    "emails": tf.io.VarLenFeature(tf.string),
}

dataset = tf.data.TFRecordDataset(["my_contacts.tfrecord"]).batch(10)
for serialized_examples in dataset:
    parsed_examples = tf.io.parse_example(serialized_examples,
                                          feature_description)
  • any tensor can be stored parsed directly from a ByteList in an Example protobuff using tf.io.serialize_tensor() and tf.io.parse:tensor()

Preprocessing Input Features

  • convert categorical features to numeric features, normalize, discretize, etc.
  • can be done with Data API (e.g. .map()) but better to implement as custom Keras layer or use existing ones (self-contained, flexible, easy to incorporate into a model or preprocessing pipeline)
  • encode categorical features to one-hot vectors
    • usually okay for <10 features, but doesn't scale well
    • create lookup table from category to index, including extra "out of vocabulary buckets" for categories not encountered when making the lookup table
    • convert to one-hot vectors with tf.one_hot(cat_indicies, depth=...)
    • bundle functionality into a custom layer, or use experimental keras.layers.TextVectorization:
      • .adapt(data_sample): extract data categories and create lookup table
      • .call(train.data): use lookup table to map categories to indices
      • use tf.one_hot() in a keras Lambda layer to vectorize indices
vocab = ["<1H OCEAN""INLAND""NEAR OCEAN""NEAR BAY""ISLAND"]
indices = tf.range(len(vocab), dtype=tf.int64)
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
num_oov_buckets = 2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)
categories = tf.constant(["NEAR BAY""DESERT""INLAND""INLAND"])
cat_indices = table.lookup(categories)

>>> cat_indices

<tf.Tensor: id=514, shape=(4,), dtype=int64, numpy=array([3511])>

cat_one_hot = tf.one_hot(cat_indices, 
depth=len(vocab) + num_oov_buckets)

>>> cat_one_hot

<tf.Tensor: id=524, shape=(47), dtype=float32, numpy=
array([[0., 0., 0., 1., 0., 0., 0.],
[0., 0., 0., 0., 0., 1., 0.],
[0., 1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0.]], dtype=float32)>
  • Encode categorical features with Embeddings
    • scales better with number of categories than one-hot
    • represent categories with dense vectors instead of indices
      • randomly initialized
      • trainable, move similar categories closer "Representation Learning"
      • length of vector becomes model hyperparameter
      • See Ch. 16 on natural language processing
    • same category index lookup table as before, but now assigned a vector using keras.layers.Embedding(input_dim=6, output_dim=2)(cat_indices)
embeddings
  • keras preprocessing layers
    • Normalization
    • TextVectorization (experimental)
      • encode word to index (make lookup table)
      • not only integer indicies, but also counts for each word index ("bag of words)
    • Discretization: split continuous feature into bins
    • PreprocessingStage: combine multiple preprocessing layers
normalization = keras.layers.Normalization()
discretization = keras.layers.Discretization([...])
pipeline = keras.layers.PreprocessingStage([normalization, discretization])
pipeline.adapt(data_sample)

TF Transform

  • if on the fly preprocessing is too slow, it may be better to do it all in advance of training (and avoid repetition each epoch)
  • defining preprocessing function using TF Transfrom gives a TF Function that can be deployed across platforms with the TF model
import tensorflow_transform as tft

def preprocess(inputs): # inputs = a batch of input features
    median_age = inputs["housing_median_age"]
    ocean_proximity = inputs["ocean_proximity"]
    standardized_age = tft.scale_to_z_score(median_age)
    ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
    return {
        "standardized_median_age": standardized_age,
        "ocean_proximity_id": ocean_proximity_id
    }
  • Done on the entire dataset, so need to account for limited RAM (look at Apache Beam, Spark to handle large datasets)

TF Datasets

  • lots of common datasets
  • datasets load as dictionaries, so use .map() to convert to (X,y) tuple for Keras, or specify as_supervised=True when loading
import tensorflow_datasets as tfds

datasets = tfds.load(name="mnist", batch_size=32, as_supervised=True)
mnist_train = datasets["train"].repeat().prefetch(1)
model = keras.models.Sequential(...)
model.compile(...)
model.fit(mnist_train, epochs=5)