與大多數 AI 研究部門一樣,Zalando Research 也意識到了對創意進行嘗試和快速原型設計的重要性。隨著數據集變得越來越龐大,了解如何利用我們擁有的共享資源來高效快速地訓練深度學習模型變得大有用處。
TensorFlow 的估算器API 對于在分布式環境中使用多個 GPU 來訓練模型非常有用。本文將主要介紹這一工作流程。我們先使用 Fashion-MNIST 小數據集訓練一個用tf.keras編寫的自定義估算器,然后在文末介紹一個較實際的用例。
請注意:TensorFlow 團隊一直在開發另一項很酷的新功能(在我寫這篇文章時,該功能仍處于 Master 階段),使用這項新功能,您只需多輸入幾行代碼即可訓練tf.keras模型,而無需先將該模型轉化為估算器!其工作流程也很贊。下面我著重講講估算器 API。選擇哪一個由您自己決定!
注:功能鏈接
https://github.com/tensorflow/tensorflow/blob/master/tensorflow/contrib/distribute/python/examples/keras_mnist.py#L106-L114
TL; DR:基本上,我們需要記住,對于 tf.keras. 模型,我們只要通過 tf.keras.estimator.model_to_estimator 方法將其轉化為 tf.estimator.Estimator 對象,即可使用 tf.estimator API 來進行訓練。轉化完成后,我們可以使用估算器提供的機制用不同的硬件配置訓練模型。
您可以從此筆記本下載本文中的代碼并親自運行。
注:筆記本鏈接
https://github.com/kashif/tf-keras-tutorial/blob/master/7-estimators-multi-gpus.ipynb
import osimport time
#!pip install -q -U tensorflow-gpuimport tensorflow as tf
import numpy as np
導入 Fashion-MNIST 數據集
我們用Fashion-MNIST數據集隨手替換一下 MNIST,這里面包含幾千張Zalando時尚文章的灰度圖像。獲取訓練和測試數據非常簡單,如下所示:
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()
我們想把這些圖像的像素值從 0 到 255 之間的一個數字轉換為 0 到 1 之間的一個數字,并將該數據集轉換為 [B, H, W ,C] 格式,其中 B 代表批處理的圖像數,H 和 W 分別是高度和寬度,C 是我們數據集的通道數(灰度為 1):
TRAINING_SIZE = len(train_images)TEST_SIZE = len(test_images)
train_images = np.asarray(train_images, dtype=np.float32) / 255# Convert the train images and add channelstrain_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))
test_images = np.asarray(test_images, dtype=np.float32) / 255# Convert the test images and add channelstest_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
接下來,我們想將標簽從整數編號(例如,2 或套衫)轉換為獨熱編碼(例如,0,0,1,0,0,0,0,0,0,0)。為此,我們要使用 tf.keras.utils.to_categorical函數:
# How many categories we are predicting from (0-9)LABEL_DIMENSIONS = 10
train_labels = tf.keras.utils.to_categorical(train_labels, LABEL_DIMENSIONS)
test_labels = tf.keras.utils.to_categorical(test_labels, LABEL_DIMENSIONS)
# Cast the labels to floats, needed latertrain_labels = train_labels.astype(np.float32)test_labels = test_labels.astype(np.float32)
構建 tf.keras 模型
我們會使用Keras 功能 API來創建神經網絡。Keras 是一個高級 API,可用于構建和訓練深度學習模型,其采用模塊化設計,使用方便,易于擴展。tf.keras 是 TensorFlow 對這個 API 的實現,其支持Eager Execution、tf.data 管道和估算器等。
在架構方面,我們會使用 ConvNet。一個非常籠統的說法是,ConvNet 是卷積層 (Conv2D) 和池化層 (MaxPooling2D) 的堆棧。但最重要的是,ConvNet 將每個訓練示例當作一個 3D 形狀張量(高度、寬度、通道),對于灰度圖像,張量從通道 = 1 開始,然后返回一個 3D 張量。
因此,在 ConvNet 部分之后,我們需要將張量平面化,并添加密集層,其中最后一個返回 LABEL_DIMENSIONS 大小的向量,并附帶 tf.nn.softmax 激活:
inputs = tf.keras.Input(shape=(28,28,1)) # Returns a placeholder
x = tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation=tf.nn.relu)(inputs)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS, activation=tf.nn.softmax)(x)
現在,我們可以定義學習模型,請選擇優化器(我們從 TensorFlow 中選擇一個,而不使用來自 tf.keras. optimizers 的優化器)并進行編譯:
model = tf.keras.Model(inputs=inputs, outputs=predictions)
optimizer = tf.train.AdamOptimizer(learning_rate=0.001)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
創建估算器
使用已編譯的 Keras 模型創建估算器,也就是我們所說的 model_to_estimator 方法。請注意,Keras 模型的初始模型狀態保存在創建的估算器中。
那估算器有哪些優點呢?首先要提以下幾點:
您可以在本地主機或分布式多 GPU 環境中運行基于估算器的模型,而無需更改您的模型;
估算器能夠簡化模型開發者之間的共享實現;
估算器能夠為您構建圖形,所以有點像 Eager Execution,沒有明確的會話。
那么我們要如何訓練簡單的 tf.keras 模型來使用多 GPU?我們可以使用 tf.contrib.distribute.MirroredStrategy 范式,通過同步訓練進行圖形內復制。如需了解更多關于此策略的信息,請觀看分布式 TensorFlow 訓練講座。
注:分布式 TensorFlow 鏈接
https://www.youtube.com/watch?v=bRMGoPqsn20
基本上,每個工作器 GPU 都有一個網絡拷貝,并會獲取一個數據子集,據以計算本地梯度,然后等待所有工作器以同步方式結束。然后,工作器通過 Ring All-reduce 運算互相傳遞其本地梯度,這通常要進行優化,以減少網絡帶寬并增加吞吐量。在所有梯度到達后,每個工作器會計算其平均值并更新參數,然后開始下一步。理想情況下,您在單個節點上有多個高速互聯的 GPU。
要使用此策略,我們首先要用已編譯的 tf.keras 模型創建一個估算器,然后通過 RunConfig config 賦予其 MirroredStrategy 配置。默認情況下,該配置會使用全部 GPU,但您也可以賦予其一個 num_gpus 選項,以使用特定數量的 GPU:
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model, config=config)
創建估算器輸入函數
要通過管道將數據傳遞到估算器,我們需要定義一個數據導入函數,該函數返回批量數據的 tf.data 數據集(圖像、標簽)。下面的函數接收 numpy 數組,并通過ETL過程返回數據集。
請注意,最后我們還調用了預讀取方法,該方法會在訓練時將數據緩沖到 GPU,以便下一批數據準備就緒并等待 GPU,而不是在每次迭代時讓 GPU 等待數據。GPU 可能仍然沒有得到充分利用,要改善這一點,我們可以使用融合版轉換運算(如 shuffle_and_repeat),而不是兩個單獨的運算。不過,我在這里選用的是簡單用例。
def input_fn(images, labels, epochs, batch_size):
# Convert the inputs to a Dataset. (E)ds = tf.data.Dataset.from_tensor_slices((images, labels))
# Shuffle, repeat, and batch the examples. (T)SHUFFLE_SIZE = 5000ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)ds = ds.prefetch(2)
# Return the dataset. (L)return ds
訓練估算器
首先,我們定義一個 SessionRunHook 類,用于記錄隨機梯度下降法每次迭代的次數:
class TimeHistory(tf.train.SessionRunHook):def begin(self): self.times = []
def before_run(self, run_context): self.iter_time_start = time.time()
def after_run(self, run_context, run_values): self.times.append(time.time() - self.iter_time_start)
亮點在這里!我們可以對估算器調用 train 函數,并通過 hooks 參數,向其賦予我們定義的 input_fn (包含批次大小和我們希望的訓練回合次數)和 TimeHistory 實例:
time_hist = TimeHistory()
BATCH_SIZE = 512EPOCHS = 5
estimator.train(lambda:input_fn(train_images, train_labels, epochs=EPOCHS, batch_size=BATCH_SIZE), hooks=[time_hist])
性能
現在,我們可以使用時間鉤子來計算訓練的總時間和平均每秒訓練的圖像數量(平均吞吐量):
total_time = sum(time_hist.times)print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")
avg_time_per_batch = np.mean(time_hist.times)print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with {NUM_GPUS} GPU(s)")
使用兩塊 K80 GPU 進行訓練時的 Fashion-MNIST 訓練吞吐量和總時間,采用不同 NUM_GPUS,顯示縮放不良
評估估算器
為了檢驗模型的性能,我們要對估算器調用評估方法:
estimator.evaluate(lambda:input_fn(test_images, test_labels, epochs=1, batch_size=BATCH_SIZE))
視網膜 OCT (光學相干斷層成像術)圖像示例
為了測試模型在處理較大數據集時的擴展性能,我們使用視網膜 OCT 圖像數據集,這是Kaggle眾多大型數據集中的一個。該數據集由活人視網膜的橫截面 X 光圖像組成,分為四個類別:NORMAL、CNV、DME和DRUSEN:
光學相干斷層成像術的代表圖像,選自 Kermany 等人所著的《通過基于圖像的深度學習技術確定醫學診斷和可治療疾病》(Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning)
該數據集共有 84,495 張 JPEG 格式的 X 光圖像,尺寸多為 512x496,可以通過 KaggleCLI下載:
注:CLI 鏈接
https://github.com/Kaggle/kaggle-api
#!pip install kaggle#!kaggle datasets download -d paultimothymooney/kermany2018
下載完成后,訓練集和測試集圖像類位于各自的文件夾內,因此我們可以將模式定義為:
labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']
train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')
接下來,我們要編寫估算器的輸入函數,該函數可以提取任何文件模式,并返回已縮放圖像和獨熱編碼標簽作為 tf.data.Dataset。這次,我們遵循輸入管道性能指南中的最佳實踐。請特別注意,如果 prefetch 的 buffer_size 為 None,則 TensorFlow 會自動使用最優的預讀取緩沖區大小:
注:輸入管道性能指南鏈接
https://www.tensorflow.org/performance/datasets_performance
1def input_fn(file_pattern, labels,
2image_size=(224,224),
3shuffle=False,
4batch_size=64,
5num_epochs=None,
6buffer_size=4096,
7prefetch_buffer_size=None):
8
9table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
10num_classes = len(labels)
11
12def _map_func(filename):
13label = tf.string_split([filename], delimiter=os.sep).values[-2]
14image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
15image = tf.image.convert_image_dtype(image, dtype=tf.float32)
16image = tf.image.resize_images(image, size=image_size)
17return (image, tf.one_hot(table.lookup(label), num_classes))
18
19dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
20
21if num_epochs is not None and shuffle:
22dataset = dataset.apply(
23tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
24elif shuffle:
25dataset = dataset.shuffle(buffer_size)
26elif num_epochs is not None:
27dataset = dataset.repeat(num_epochs)
28
29dataset = dataset.apply(
30tf.contrib.data.map_and_batch(map_func=_map_func,
31 batch_size=batch_size,
32num_parallel_calls=os.cpu_count()))
33dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
34
35return dataset
這次訓練該模型時,我們將使用一個經過預訓練的 VGG16,并且只重新訓練其最后 5 層:
keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3), include_top=False)
output = keras_vgg16.outputoutput = tf.keras.layers.Flatten()(output)prediction = tf.keras.layers.Dense(len(labels), activation=tf.nn.softmax)(output)
model = tf.keras.Model(inputs=keras_vgg16.input, outputs=prediction)
for layer in keras_vgg16.layers[:-4]: layer.trainable = False
現在,我們萬事皆備,可以按照上述步驟進行,并使用 NUM_GPUS GPU 在幾分鐘內訓練我們的模型:
model.compile(loss='categorical_crossentropy', optimizer=tf.train.AdamOptimizer(), metrics=['accuracy'])
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model, config=config)
BATCH_SIZE = 64
EPOCHS = 1
estimator.train(input_fn=lambda:input_fn(train_folder, labels, shuffle=True, batch_size=BATCH_SIZE, buffer_size=2048, num_epochs=EPOCHS, prefetch_buffer_size=4), hooks=[time_hist])
訓練結束后,我們可以評估測試集的準確度,應該在 95% 左右(對初始基線來說還不錯):
estimator.evaluate(input_fn=lambda:input_fn(test_folder,
labels, shuffle=False, batch_size=BATCH_SIZE, buffer_size=1024, num_epochs=1))
使用兩塊 K80 GPU 進行訓練時的 Fashion-MNIST 訓練吞吐量和總時間,采用不同 NUM_GPUS,顯示線性縮放
總結
我們在上文中介紹了如何使用估算器 API 在多個 GPU 上輕松訓練 Keras 深度學習模型,如何編寫符合最佳實踐的輸入管道,以充分利用我們的資源(線性縮放),以及如何通過鉤子為我們的訓練吞吐量計時。
請務必注意,最后我們主要關注的是測試集錯誤。您可能會注意到,測試集的準確度會隨著 NUM_GPUS 值的增加而下降。其中一個原因可能是,使用 BATCH_SIZE*NUM_GPUS 的批量大小時,MirroredStrategy 能夠有效地訓練模型,而當我們增加 GPU 數量時,可能需要調整 BATCH_SIZE 或學習率。為便于制圖,文中除 NUM_GPUS 之外的所有其他超參數均保持不變,但實際上我們需要調整這些超參數。
數據集和模型的大小也會影響這些方案的縮放效果。在讀取或寫入小數據時,GPU 的帶寬較差,如果是較為老舊的 GPU(如 K80),則情形尤其如此,而且可能會造成上面 Fashion-MNIST 圖中所示情況。
-
深度學習
+關注
關注
73文章
5507瀏覽量
121299 -
tensorflow
+關注
關注
13文章
329瀏覽量
60545
原文標題:使用估算器、tf.keras 和 tf.data 進行多 GPU 訓練
文章出處:【微信號:tensorflowers,微信公眾號:Tensorflowers】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論