今天將介紹如何創建 TensorFlow Server 集群,及如何在該集群中發布計算圖。在這里,我們假設您熟悉編寫低級 TensorFlow 程序的基本概念 basic concepts。
你好,分布式 TensorFlow !
想要查看正在運行的簡單 TensorFlow 集群,請執行以下操作:
# Start a TensorFlow server as a single-process "cluster".$ python>>> import tensorflow as tf>>> c = tf.constant("Hello, distributed TensorFlow!")>>> server = tf.train.Server.create_local_server()>>> sess = tf.Session(server.target) # Create a session on the server.>>> sess.run(c)'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server方法使用進程內 Server 創建單進程集群。
創建一個集群
TensorFlow “集群” 是一組參與 TensorFlow 圖的分布式執行的 “任務”。每個任務都與 TensorFlow“Server” 相關聯,該 server 包含可用于創建會話的 “master 主干”,以及在圖中執行操作的 “worker”。群集還可以劃分為一個或多個 “作業”,其中每個作業包含一個或多個任務。
要創建群集,請在群集中的每個任務中啟動一個 TensorFlow Server。每個任務通常在不同的計算機上運行,但您可以在同一臺計算機上運行多個任務(例如,控制不同的 GPU 設備)。在每項任務中,執行以下操作:
創建一個描述集群中所有任務的 tf.train.ClusterSpec 每項任務都應該相同
創建一個 tf.train.Server,將 tf.train.ClusterSpec 傳遞給構造函數,并使用作業名稱和任務索引標識本地任務
創建一個 tf.train.ClusterSpec 來描述集群
集群規范字典將作業名稱映射到網絡地址列表。將此字典傳遞給tf.train.ClusterSpec 構造函數。例如:
在每個任務中創建一個tf.train.Server 實例
tf.train.Server 對象包含一組本地設備,一組與 tf.train.ClusterSpec 中其他任務的連接,以及一個可以使用它們執行分布式計算的tf.Session。每個 server 都是特定命名作業的成員,并且在該作業中具有任務索引。一個 server 可以與群集中的任何其他服務器通信。
例如,要啟動在 localhost:2222 和 localhost:2223 上運行的兩個服務器的集群,請在本地計算機上的兩個不同進程中運行以下代碼段:
# In task 0:cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})server = tf.train.Server(cluster, job_name="local", task_index=1)
注意:手動指定這些群集規范可能會很繁瑣,尤其是對于大型群集。我們正在開發以編程方式啟動任務的工具,例如:使用像 Kubernetes 這樣的集群管理器。如果您希望獲得某些特定集群管理器的技術支持,請在 GitHub issue 提出。
在模型中指定分布式設備
要對特定進程執行操作,可以使用相同的 tf.device函數來指定操作是在 CPU 還是 GPU 上運行。例如:
with tf.device("/job:ps/task:0"): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...)with tf.device("/job:ps/task:1"): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...)with tf.device("/job:worker/task:7"): input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ...with tf.Session("grpc://worker7.example.com:2222") as sess: for _ in range(10000): sess.run(train_op)
在上面的示例中,變量是在 ps 作業中的兩個任務上創建的,而模型的計算密集型部分是在 worker 作業中創建的。 TensorFlow 將在作業之間插入適當的數據傳輸(從 ps 到 worker 用于正向傳遞,從 worker 到 ps 用于應用漸變)。
復制訓練
一個常見的訓練配置,被稱為 “數據并行”,它涉及在不同的小批量數據上訓練相同的模型額 worker 作業中的多個任務,更新在 ps 作業中的一個或多個任務中托管的共享參數。所有任務通常在不同的機器上運行。在 TensorFlow 中有很多方法可以指定這個結構,我們正在構建庫,這將簡化指定復制模型的工作。可能有效的方法包括:
圖形內復制。在這種方法中,客戶端構建一個包含一組參數的tf.Graph(在tf.Variable 節點固定到 / job:ps);以及模型中計算密集型部分的多個副本,每個副本都固定在 / job:worker 中的不同任務中
圖之間的復制。在這種方法中,每個 / job:worker 任務都有一個單獨的客戶端,通常與 worker 任務在同一個進程中。每個客戶端構建一個包含參數的類似圖形(固定到 / job:psas,然后使用 tf.train.replica_device_setter將它們確定性地映射到相同的任務);以及模型的計算密集型部分的單個副本,固定到 / job:worker 中的本地任務。
異步訓練。在這種方法中,圖的每個副本都有一個獨立的訓練循環,無需協調即可執行。它與上述兩種復制形式兼容。
同步培訓。在此方法中,所有副本都為當前參數讀取相同的值,并行計算梯度,然后一起應用。它與圖形內復制兼容(例如,如在 CIFAR-10 multi-GPU trainer 中使用梯度平均),以及圖之間復制(例如,使用 tf.train.SyncReplicasOptimizer)均兼容。
綜合起來: 示例 trainer 計劃
以下代碼顯示了分布式 trainer 程序的框架,實現了圖形間復制和異步訓練。它包括參數 server 和 worker 任務的代碼。
import argparseimport sysimport tensorflow as tfFLAGS = Nonedef main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): # Build model... loss = ... global_step = tf.contrib.framework.get_or_create_global_step() train_op = tf.train.AdagradOptimizer(0.01).minimize( loss, global_step=global_step) # The StopAtStepHook handles stopping after running given steps. hooks=[tf.train.StopAtStepHook(last_step=1000000)] # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(FLAGS.task_index == 0), checkpoint_dir="/tmp/train_logs", hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Run a training step asynchronously. # See tf.train.SyncReplicasOptimizer
for additional details on how to # perform *synchronous* training. # mon_sess.run handles AbortedError in case of preempted PS. mon_sess.run(train_op)if __name__ == "__main__": parser = argparse.ArgumentParser() parser.register("type", "bool", lambda v: v.lower() == "true") # Flags for defining the tf.train.ClusterSpec parser.add_argument( "--ps_hosts", type=str, default="", help="Comma-separated list of hostname:port pairs" ) parser.add_argument( "--worker_hosts", type=str, default="", help="Comma-separated list of hostname:port pairs" ) parser.add_argument( "--job_name", type=str, default="", help="One of 'ps', 'worker'" ) # Flags for defining the tf.train.Server parser.add_argument( "--task_index", type=int, default=0, help="Index of task within the job" ) FLAGS, unparsed = parser.parse_known_args() tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
要使用兩個參數 server 和兩個 worker 程序啟動 trainer,請使用以下命令行(假設該腳本名為 trainer.py):
# On ps0.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=0# On ps1.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=1# On worker0.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=0# On worker1.example.com:$ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=1
詞匯表
客戶端
客戶端通常是一個程序,用于建立一個 TensorFlow 圖,并構造 tensorflow :: Session 以與集群交互。客戶端主要使用 Python 或 C ++ 編寫。一個單客戶端進程可以直接與多個 TensorFlow Server 交互(請參閱上面的 “復制訓練”),一個單 Server 也可以為多個客戶端提供服務。
集群
一個 TensorFlow 集群包括一個或多個 “作業”,每個 “作業” 被劃分為一個或多個 “任務” 列表。一個集群通常效力于一個特定的高級別的目標,例如多機并行訓練一個神經網絡,并行使用多臺機器。集群由 tf.train.ClusterSpec 目標函數定義。
作業
一個作業由一系列 “任務” 組成,通常用于實現共同目的。 例如,名為 ps 的作業(用于 “參數服務器”)通常承載存儲和更新變量參數的節點;而名為 worker 的作業通常托管執行計算密集型任務的無狀態節點。作業中的任務通常在不同的計算機上運行。作業角色集是靈活的:例如,一個 worker 可以維持某種狀態。
核心服務
RPC 服務為分布式設備之間提供遠程訪問,并充當會話目標的角色。核心設備實現 tensorflow :: Session 接口,負責協調跨一個或多個 “worker services” 之間的工作。所有 TensorFlow Servers 都實現核心服務。
任務
任務對應于特定的 TensorFlow Server,通常對應于單個進程。任務屬于特定的 “作業”,并通過該作業的任務列表中的索引標識來區分。
TensorFlowserver運行 tf.train.Server 實例的進程,該實例是集群的成員,并導出 “master service 主服務” 和 “worker service 工作服務”。
工作服務
RPC 服務,使用其本地設備執行 TensorFlow 圖的部分。 Worker service 工作服務執行 worker_service.proto。所有的 TensorFlow Server 都執行 worker service 工作服務。
-
gpu
+關注
關注
28文章
4764瀏覽量
129180 -
函數
+關注
關注
3文章
4344瀏覽量
62847 -
tensorflow
+關注
關注
13文章
329瀏覽量
60578
原文標題:如何創建 TensorFlow Server 集群
文章出處:【微信號:tensorflowers,微信公眾號:Tensorflowers】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論