通过多 GPU 并行的方式可以有很好的加速效果,然而一台机器上所支持的 GPU 是有限的,因此本文介绍了分布式 TensorFlow。分布式 TensorFlow 允许我们在多台机器上运行一个模型,所以训练速度或加速效果能显著地提升。本文简要介绍了分布式 TensorFlow 的原理与实践,希望能为准备入坑分布式训练的读者提供简要的介绍。
不幸的是,关于分布式 TensorFlow 的官方文档过于简略。我们需要一个稍微易懂的介绍,即通过 Jupyter 运行一些基本例子。
如果你想交互式地使用 Jupyter,可以在 GitHub 上找到源代码。此外,本文的一些解释是作者自己对实证结果或 TensorFlow 文档的解释,因此可能会有一些小误差。
GitHub 地址:https://github.com/mrahtz/distributed_tensorflow_a_gentle_introduction
简介
import tensorflow as tf
比方说,我们希望多个进程共享一些共同的参数。为了简单起见,假设这只是一个单一的变量:
var = tf.Variable(initial_value=0.0)
第一步,我们需要为每个进程创建自己的会话。(假设 sess1 在一个进程中创建,而 sess2 会在另一个进程中创建)。
sess1 = tf.Session()
sess2 = tf.Session()
sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())
每次调用 tf.Session() 都会创建一个单独的「执行引擎」,然后将会话句柄连接到执行引擎。执行引擎是实际存储变量值并运行操作的东西。且 Python 天生是面向对象的编程,它里面的元素都是类或对象,因此更正式地说,tf.Seesio() 是 TensorFlow 中的一个方法,它会打开一个会话并运行计算图。
通常,不同进程中的执行引擎是不相关的。在一个会话中更改变量(在一个执行引擎上)不会影响其他会话中的变量。
print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))
sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")
print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))
上面代码块的输出结果为:
Initial value of var in session 1: 0.0
Initial value of var in session 2: 0.0
Incremented var in session 1
Value of var in session 1: 1.0
Value of var in session 2: 0.0
对于分布式 TensorFlow,我们首先需要了解它的基本原理。以下代码展示了如何构建一个最简单 TensorFlow 集群,以帮助我们理解它的基本原理。
import tensorflow as tf
c = tf.constant("Hello, Distributed TensorFlow!")
# 创建一个本地TensorFlow集群
server = tf.train.Server.create_local_server()
# 在集群上创建一个会话
sess = tf.Session(server.target)
print(sess.run(c))
在以上代码中,我们先通过 tf.train.Server.create_local_server 在本地创建一个只有一台机器的 TensorFlow 集群。然后在集群上生成一个会话,通过该对话,我们可以将创建的计算图运行在 TensorFlow 集群上。虽然这只是一个单机集群,但它基本上反映了 TensorFlow 集群的工作流程。
TensorFlow 集群会通过一系列任务(task)来执行计算图中的运算,一般来说不同的任务会在不同的机器上运行。TensorFlow 集群中的任务也会被聚集为工作(job)。例如在训练深度模型时,一台运行反向传播的机器是一个任务,而所有运行反向传播的集合是一个工作。上面简单的案例只是一个任务的集群,若一个 TensorFlow 集群有多个任务时,我们需要使用 tf.train.ClusterSpec 来指定每一个任务的机器。
使用分布式 TensorFlow 训练深度学习模型一般有两种方式,即 in-graph replication 和 between-graph replication。第一种计算图内的分布式会令所有任务都使用一个 TensorFlow 计算图中的变量,而只是将计算部分分配到不同的服务器上。而另一种计算图间的分布式会在每一个计算服务器上创建一个独立的 TensorFlow 计算图,但不同计算图中的相同参数需要以一种固定的方式存放到同一个参数服务器中。以上大概就是分布式 TensorFlow 的基本概念,随后我们将通过具体的案例与代码加深这一部分的理解。
分布式 TensorFlow
为了在进程之间共享变量,我们需要将不同的执行引擎连接在一起,并输入分布式张量流。
若使用分布式 TensorFlow,每个进程会运行一个特殊的执行引擎:一个 TensorFlow 服务器。服务器作为集群的一部分链接在一起。(群集中的每个服务器也称为任务。)
第一步是定义集群的规模。我们从最简单的集群开始:即两台服务器(两个任务),它们都在同一台机器上,一个在 2222 端口,一个在 2223 端口。
tasks = ["localhost:2222", "localhost:2223"]
每个任务都与「工作」(job)相关联,该工作是相关任务的集合。我们将这两个任务与一个称为「local」的工作相关联。
jobs = {"local": tasks}
所有这些即定义为一个集群。
cluster = tf.train.ClusterSpec(jobs)
我们现在可以启动服务器,指定每个服务器对应为集群定义中的哪个服务器。立即启动各服务器,监听集群设置中指定的端口。
# "This server corresponds to the the first task (task_index=0)
# of the tasks associated with the 'local' job."
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)
将服务器连接在同一个集群中,我们现在可以体验到分布式 TensorFlow 的强大功能:任何具有相同名称的变量都将在所有服务器之间共享。
最简单的例子是在所有的服务器上运行同一张静态计算图,且每个图只有一个变量:
tf.reset_default_graph()
var = tf.Variable(initial_value=0.0, name='var')
sess1 = tf.Session(server1.target)
sess2 = tf.Session(server2.target)
现在,在一台服务器上对变量所作的修改将在第二台服务器上作镜像处理。
sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())
print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))
sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")
print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))
Initial value of var in session 1: 0.0
Initial value of var in session 2: 0.0
Incremented var in session 1
Value of var in session 1: 1.0
Value of var in session 2: 1.0
请注意,因为我们只有一个变量且该变量由两个会话共享,第二个会话再调用 global_variables_initializer 就有些多余。
存放
现在我们可能会想:变量究竟存储在哪个服务器上?又是哪个服务器在运行操作?
按经验来说,变量和操作都默认存储在集群的第一个任务上。
def run_with_location_trace(sess, op):
# From https://stackoverflow.com/a/41525764/7832197
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
sess.run(op, options=run_options, run_metadata=run_metadata)
for device in run_metadata.step_stats.dev_stats:
print(device.device)
for node in device.node_stats:
print(" ", node.node_name)
例如,如果我们使用连接到第一个任务的会话来处理变量 var,那么所有操作都会运行在这个任务上:
run_with_location_trace(sess1, var)
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var
run_with_location_trace(sess1, var.assign_add(1.0))
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
AssignAdd_1/value
var
AssignAdd_1
但是,如果我们尝试使用连接到第二个任务的会话处理变量 var,那么图节点仍然会在第一个任务上运行。
run_with_location_trace(sess2, var)
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var
要将一个变量或操作固定到特定任务上,我们可以使用 tf.device:
with tf.device("/job:local/task:0"):
var1 = tf.Variable(0.0, name='var1')
with tf.device("/job:local/task:1"):
var2 = tf.Variable(0.0, name='var2')
# (This will initialize both variables)
sess1.run(tf.global_variables_initializer())
现在,var1 像之前一样运行在第一个任务上。
run_with_location_trace(sess1, var1)
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var1
但是 var2 运行在第二个任务上。即使我们尝试使用连接到第一个任务的会话来评估它,它仍然在第二个任务上运行。
run_with_location_trace(sess1, var2)
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
var2
变量 2 亦是如此。
run_with_location_trace(sess2, var2)
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
var2
run_with_location_trace(sess2, var1)
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var1
计算图
分布式 TensorFlow 处理图的过程有几点需要注意。
谁构建了这个图?首先,尽管在整个集群中共享变量值,但图并不会自动共享。我们用两台服务器创建一个新的集群,然后用显式创建的图设置第一台服务器。
cluster = tf.train.ClusterSpec({"local": ["localhost:2224", "localhost:2225"]})
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)
graph1 = tf.Graph()
with graph1.as_default():
var1 = tf.Variable(0.0, name='var')
sess1 = tf.Session(target=server1.target, graph=graph1)
print(graph1.get_operations())
[, , , ]
如果我们创建连接到第二台服务器的会话,请注意图不会自动获取镜像。
graph2 = tf.Graph()
sess2 = tf.Session(target=server2.target, graph=graph2)
print(graph2.get_operations())
————————————————————————————
[]
要访问共享变量,我们必须手动添加一个同名的变量到第二个图中。
with graph2.as_default():
var2 = tf.Variable(0.0, name='var')
只有如此我们才可以访问它。
sess1.run(var1.assign(1.0))
sess2.run(var2)
————————————————————————————
1.0
关键是:每个服务器负责创建自己的图。
所有服务器上的图都必须一样吗?
到目前为止,我们所有的例子都是在两台服务器上运行相同的图。这被称为图内复制(in-graph replication)。
例如,假设我们有一个包含三台服务器的集群。服务器 1 保存共享参数,而服务器 2 和服务器 3 是工作站节点,每个都有本地变量。在图内复制中,每台服务器的图如下所示:
图内复制的问题在于每个服务器都必须具有整个图的副本,包括可能只与其他服务器相关的子图。这可能会导致图变得非常大。
另一种方法是图间复制(between-graph replication)。在这里,每个服务器都运行一个只包含共享参数的图,而且任何变量和操作都与单个服务器相关。
这种方法缩减了图的大小,因此我们推荐使用图间复制。
实践细节
在介绍完整示例之前,有几个实践中遇到的细节问题需要讨论一下。
如果在所有服务器互联之前尝试在集群上运行某些程序,会发生什么?我们再次创建一个双任务集群。
cluster = tf.train.ClusterSpec({
"local": ["localhost:2226", "localhost:2227"]
})
这一次,让我们在隔离进程中启动每个服务器。(这允许我们随时关闭服务器,以便再次启动它们进行后续的实验。除了关闭启动服务器的进程之外,目前没有其它办法关闭服务器。)
from multiprocessing import Process
from time import sleep
def s1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
sess1 = tf.Session(server1.target)
print("server 1: running no-op...")
sess1.run(tf.no_op())
print("server 1: no-op run!")
server1.join() # Block
def s2():
for i in range(3):
print("server 2: %d seconds left before connecting..."
% (3 - i))
sleep(1.0)
server2 = tf.train.Server(cluster,
job_name="local",
task_index=1)
print("server 2: connected!")
server2.join() # Block
# daemon=True so that these processes will definitely be killed
# when the parent process restarts
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
服务器 1 即刻加入集群,但服务器 2 在连接之前等待了一会儿。结果如下所示:
p1.start()
p2.start()
server 2: 3 seconds left before connecting...
server 1: running no-op...
server 2: 2 seconds left before connecting...
server 2: 1 seconds left before connecting...
server 2: connected!
server 1: no-op run!
可以看出,每个服务器都试图在集群上运行一个操作,直到所有的服务器都加入。
p1.terminate()
p2.terminate()
当服务器脱离集群会怎样?
我们用两台服务器建立一个集群。服务器 1 只是反复尝试和运行位于服务器 1 上的 no-op 操作。服务器 2 将在两秒钟后宕机。
def s1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
with tf.device("/job:local/task:0"):
no_op = tf.no_op()
sess1 = tf.Session(server1.target)
for _ in range(6):
print("Server 1: about to run no-op...", end="")
sess1.run(no_op)
print("success!")
sleep(1.0)
def s2():
server2 = tf.train.Server(cluster,
job_name="local",
task_index=1)
sleep(2.0)
print("Server 2 dieing...")
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()
————————————————————————————————
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 2 dieing...
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
短期内,只要我们试图运行的操作不在脱离的服务器上,似乎不会出现问题(我没有测试过长期运行会发生什么)。如果操作是在脱离的服务器上:
def s1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
# This time, we place the no-op on server 2,
# which is going to leave
with tf.device("/job:local/task:1"):
no_op = tf.no_op()
sess1 = tf.Session(server1.target)
for _ in range(5):
print("Server 1: about to run no-op...", end="")
sess1.run(no_op)
print("success!")
sleep(1.0)
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()
——————————————————————————————————
—
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 2 dieing...
然后尝试运行操作代码。
p1.terminate()
p2.terminate()
如果服务器又加入集群会怎样?
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()
sleep(3.0)
# At this point, server 1 is blocked, and server 2 is dead.
print("Restarting server 2...")
p2 = Process(target=s2, daemon=True)
p2.start()
————————————————————————————
Server 1: about to run no-op...success!
Server 1: about to run no-op...success!
Server 2 dieing...
Restarting server 2...
Process Process-7:
Traceback (most recent call last):
File "/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1323, in _do_call
return fn(*args)
File "/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/client/session.py", line 1302, in _run_fn
status, run_metadata)
File "/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 473, in __exit__
c_api.TF_GetCode(self.status.status))
tensorflow.python.framework.errors_impl.AbortedError: Graph handle is not found: 0000000000000001
Server 1: about to run no-op...Server 2 dieing...
系统报了一个 Graph handle is not found 的错误。
因此分布式 TensorFlow 不会自动恢复服务器故障。(如果您对容错有兴趣,请查看 https://www.youtube.com/watch?v=la_M6bCV91M。)
谁负责初始化共享变量?
一种方法是让所有工作站运行 tf.global_variables_initializer()。
但是如果我们想保持代码整洁并且只用一个服务器进行初始化,那么如果有其他服务器在初始化之前尝试使用这些变量,可能会遇到问题。一个解决方案就是让其他工作站等待,直到使用 tf.report_uninitialized_variables 的初始化开始。
def s1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
var = tf.Variable(0.0, name='var')
sess1 = tf.Session(server1.target)
print("Server 1: waiting for connection...")
sess1.run(tf.report_uninitialized_variables())
while len(sess1.run(tf.report_uninitialized_variables())) > 0:
print("Server 1: waiting for initialization...")
sleep(1.0)
print("Server 1: variables initialized!")
def s2():
server2 = tf.train.Server(cluster,
job_name="local",
task_index=1)
var = tf.Variable(0.0, name='var')
sess2 = tf.Session(server2.target)
for i in range(3):
print("Server 2: waiting %d seconds before initializing..."
% (3 - i))
sleep(1.0)
sess2.run(tf.global_variables_initializer())
p1 = Process(target=s1, daemon=True)
p2 = Process(target=s2, daemon=True)
p1.start()
p2.start()
—————————————————————————————————
Server 1: waiting for connection...
Server 2: waiting 3 seconds before initializing...
Server 1: waiting for initialization...
Server 2: waiting 2 seconds before initializing...
Server 1: waiting for initialization...
Server 2: waiting 1 seconds before initializing...
Server 1: waiting for initialization...
Server 1: variables initialized!
p1.terminate()
p2.terminate()
示例
让我们把所学的知识融合到最后一个使用多进程的例子中。
我们将创建:
一个存储单个变量 var 的参数服务器。两个工作站任务(worker task),每个工作站将多次增加变量 var 的值。我们将让参数服务器多输出几次 var 的值,以便查看其变化。
import tensorflow as tf
from multiprocessing import Process
from time import sleep
cluster = tf.train.ClusterSpec({
"worker": [
"localhost:3333",
"localhost:3334",
],
"ps": [
"localhost:3335"
]
})
def parameter_server():
with tf.device("/job:ps/task:0"):
var = tf.Variable(0.0, name='var')
server = tf.train.Server(cluster,
job_name="ps",
task_index=0)
sess = tf.Session(target=server.target)
print("Parameter server: waiting for cluster connection...")
sess.run(tf.report_uninitialized_variables())
print("Parameter server: cluster ready!")
print("Parameter server: initializing variables...")
sess.run(tf.global_variables_initializer())
print("Parameter server: variables initialized")
for i in range(5):
val = sess.run(var)
print("Parameter server: var has value %.1f" % val)
sleep(1.0)
print("Parameter server: blocking...")
server.join()
def worker(worker_n):
with tf.device("/job:ps/task:0"):
var = tf.Variable(0.0, name='var')
server = tf.train.Server(cluster,
job_name="worker",
task_index=worker_n)
sess = tf.Session(target=server.target)
print("Worker %d: waiting for cluster connection..." % worker_n)
sess.run(tf.report_uninitialized_variables())
print("Worker %d: cluster ready!" % worker_n)
while sess.run(tf.report_uninitialized_variables()):
print("Worker %d: waiting for variable initialization..." % worker_n)
sleep(1.0)
print("Worker %d: variables initialized" % worker_n)
for i in range(5):
print("Worker %d: incrementing var" % worker_n)
sess.run(var.assign_add(1.0))
sleep(1.0)
print("Worker %d: blocking..." % worker_n)
server.join()
ps_proc = Process(target=parameter_server, daemon=True)
w1_proc = Process(target=worker, args=(0, ), daemon=True)
w2_proc = Process(target=worker, args=(1, ), daemon=True)
ps_proc.start()
————————————————————————————
Parameter server: waiting for cluster connection...
Parameter server: cluster ready!
Parameter server: initializing variables...
Parameter server: variables initialized
Parameter server: var has value 0.0
Parameter server: var has value 2.0
Parameter server: var has value 4.0
Parameter server: var has value 5.0
Parameter server: var has value 7.0
Parameter server: blocking...
w1_proc.start()
————————————————————————————————
Worker 0: waiting for cluster connection...
Worker 0: cluster ready!
Worker 0: waiting for variable initialization...
Worker 0: variables initialized
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: incrementing var
Worker 0: blocking...
w2_proc.start()
———————————————————————————————
Worker 1: waiting for cluster connection...
Worker 1: cluster ready!
Worker 1: waiting for variable initialization...
Worker 1: variables initialized
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: incrementing var
Worker 1: blocking...
for proc in [w1_proc, w2_proc, ps_proc]:
proc.terminate()
总结
通过本文,我们了解了:
- 如何将多个 TensorFlow 执行引擎(运行在不同进程或不同机器上)集成为一个集群,以便共享变量。
- 如何为变量或操作指定服务器。
- 图内复制与图间复制。
- 在所有服务器互联之前或在服务器脱离集群之后在集群上运行操作,会发生什么。
- 如何等待变量被集群中的另一个任务初始化。
更多信息和实例,请查阅官方文档:https://www.tensorflow.org/deploy/distributed。
原文链接:http://amid.fish/distributed-tensorflow-a-gentle-introduction