自从 DeepMind 团队提出 DQN,在 Atari 游戏中表现出超人技巧,已经过去很长一段时间了。在此期间持续有新的方法被提出,不断创造出 Deep RL 领域新 SOTA。然而,目前不论是同策略或异策略强化学习方法(此处仅比较无模型 RL),仍然需要强大的算力予以支撑。即便研究者已将 Atari 游戏的分辨率降低到 84x84,一般情况下仍然需要使用 GPU 进行策略的训练。如今,来自 Ogma Intelligent Systems Corp. 的研究人员突破了这一限制。他们在稀疏预测性阶层机制(Sparse Predictive Hierarchies)的基础上,提出一种不需要反传机制的策略搜索框架,使得实时在树莓派上训练 Atari 游戏的控制策略成为可能。下图展示了使用该算法在树莓派上进行实时训练的情形。可以看到,agent 学会了如何正确调整滑块位置来接住小球,并发动进攻的策略。值得注意的是,观测输入为每一时刻产生的图片。也就是说,该算法做到了在树莓派这样算力较小的边缘设备上,实时学习从像素到策略的映射关系。研究者开源了他们的 SPH 机制实现代码,并提供了相应 Python API。这是一个结合了动态系统应用数学、计算神经科学以及机器学习的扩展库。他们的方法曾经还被 MIT 科技评论列为「Best of the Physics arXiv」。https://github.com/ogmacorp/OgmaNeo2研究者所提出的 SPH 机制不仅在 Pong 中表现良好,在连续策略领域也有不错的表现。下图分别是使用该算法在 OpenAI gym 中 Lunar Lander 环境与 PyBullet 中四足机器人环境的训练结果。在 Lunar Lander 环境中,训练 1000 代之后,每个 episode 下 agent 取得了平均 100 分左右的 reward。如果训练时间更长(3000 代以上),agent 的平均 reward 甚至能达到 200。在 PyBullet 的 Minitaur 环境中,agent 的训练目标是在其自身能量限制条件下,跑得越快越好。从图中可以看到,经过一段时间训练,这个四足机器人学会了保持身体平衡与快速奔跑(虽然它的步态看起来不是那么地自然)。看起来效果还是很棒的,机器之心也上手测试了一番。OgmaNeo2 用来学习 Pong 控制策略的整体框架如下图所示。图像观测值通过图像编码器输入两层 exponential memory 结构中,计算结果输出到之后的 RL 层产生相应动作策略。在安装 PyOgmaNeo2 之前,我们需要先编译安装其对应的 C++库。将 OgmaNeo2 克隆到本地:!git clone https://github.com/ogmacorp/OgmaNeo2.git
之后将工作目录切换到 OgmaNeo2 下,并在其中创建一个名为 build 的文件夹,用于存放编译过程产生的文件。import os
os.chdir('OgmaNeo2')
!mkdir build
os.chdir('build')
接下来我们对 OgmaNeo2 进行编译。这里值得注意的是,我们需要将-DBUILD_SHARED_LIBS=ON 命令传入 cmake 中,这样我们才能在之后的 PyOgmaNeo2 扩展库里使用它。!cmake .. -DBUILD_SHARED_LIBS=ON
!make
!make install
当 OgmaNeo2 安装成功后,安装 SWIG v3 及 OgmaNeo2 的相应 Python 扩展库:!apt-get install swig3.0
os.chdir('/content')
!git clone https://github.com/ogmacorp/PyOgmaNeo2
os.chdir('PyOgmaNeo2')
!python3 setup.py install --user
接下来输入 import pyogmaneo,如果没有错误提示就说明已经成功安装了 PyOgmaNeo2。我们先用一个官方提供的时间序列回归来测试一下,在 notebook 中输入:import numpy as np
import pyogmaneo
import matplotlib.pyplot as plt
# Set the number of threads
pyogmaneo.ComputeSystem.setNumThreads(4)
# Create the compute system
cs = pyogmaneo.ComputeSystem()
# This defines the resolution of the input encoding - we are using a simple single column that represents a bounded scalar through a one-hot encoding. This value is the number of "bins"
inputColumnSize = 64
# The bounds of the scalar we are encoding (low, high)
bounds = (-1.0, 1.0)
# Define layer descriptors: Parameters of each layer upon creation
lds = []
for i in range(5): # Layers with exponential memory
ld = pyogmaneo.LayerDesc()
# Set the hidden (encoder) layer size: width x height x columnSize
ld.hiddenSize = pyogmaneo.Int3(4, 4, 16)
ld.ffRadius = 2 # Sparse coder radius onto visible layers
ld.pRadius = 2 # Predictor radius onto sparse coder hidden layer (and feed back)
ld.ticksPerUpdate = 2 # How many ticks before a layer updates (compared to previous layer) - clock speed for exponential memory
ld.temporalHorizon = 2 # Memory horizon of the layer. Must be greater or equal to ticksPerUpdate, usually equal (minimum required)
lds.append(ld)
# Create the hierarchy: Provided with input layer sizes (a single column in this case), and input types (a single predicted layer)
h = pyogmaneo.Hierarchy(cs, [ pyogmaneo.Int3(1, 1, inputColumnSize) ], [ pyogmaneo.inputTypePrediction ], lds)
# Present the wave sequence for some timesteps
iters = 2000
for t in range(iters):
# The value to encode into the input column
valueToEncode = np.sin(t * 0.02 * 2.0 * np.pi) * np.sin(t * 0.035 * 2.0 * np.pi + 0.45) # Some wavy line
valueToEncodeBinned = int((valueToEncode - bounds[0]) / (bounds[1] - bounds[0]) * (inputColumnSize - 1) + 0.5)
# Step the hierarchy given the inputs (just one here)
h.step(cs, [ [ valueToEncodeBinned ] ], True) # True for enabling learning
# Print progress
if t % 100 == 0:
print(t)
# Recall the sequence
ts = [] # Time step
vs = [] # Predicted value
trgs = [] # True value
for t2 in range(300):
t = t2 + iters # Continue where previous sequence left off
# New, continued value for comparison to what the hierarchy predicts
valueToEncode = np.sin(t * 0.02 * 2.0 * np.pi) * np.sin(t * 0.035 * 2.0 * np.pi + 0.45) # Some wavy line
# Bin the value into the column and write into the input buffer. We are simply rounding to the nearest integer location to "bin" the scalar into the column
valueToEncodeBinned = int((valueToEncode - bounds[0]) / (bounds[1] - bounds[0]) * (inputColumnSize - 1) + 0.5)
# Run off of own predictions with learning disabled
h.step(cs, [ [ valueToEncodeBinned ] ], False) # Learning disabled
predIndex = h.getPredictionCs(0)[0] # First (only in this case) input layer prediction
# Decode value (de-bin)
value = predIndex / float(inputColumnSize - 1) * (bounds[1] - bounds[0]) + bounds[0]
# Append to plot data
ts.append(t2)
vs.append(value)
trgs.append(valueToEncode)
# Show predicted value
print(value)
# Show plot
plt.plot(ts, vs, ts, trgs)
可得到如下结果。图中橙色曲线为真实值,蓝色曲线为预测值。可以看到,该方法以极小的误差拟合了真实曲线。最后是该项目在 CartPole 任务中的表现。运行!python3 ./examples/CartPole.py,得到如下训练结果。可以看到,其仅用 150 个 episode 左右即解决了 CartPole 任务。