本文讲的是如何快速而不求完美地部署一个训练好的 机器学习 模型并应用到实际中。如果你已经成功地使用诸如 Tensor flow或Caffe这样的框架训练好了一个 机器学习 模型,现在你正在试图让这个模型能够快速的演示,那么读这篇文章就对了。
阅读时长: 10-15分钟
使用前检查清单
-
检查tensorflow的安装
-
从 stdin 运行在线分类
-
在本地运行分类
-
把分类器放到硬编码(hardcoded)的代理
-
把分类器放到有服务发现(service discovery)的代理
-
用一个伪DNS调用分类器
机器学习 的实际应用
当我们第一次进入Hive的 机器学习 空间时,针对我们的实际应用场景,我们已经拥有了数百万张准确标记的图像, 这些图像使我们能够在一周之内,从头开始训练最先进的深度卷积 神经网络 图像分类模型 (即随机 权重 )。然而,在更典型的应用场景中,图像的数量级通常只有数百幅,这种情况下,我建议微调现有的模型。比如,https://www.tensorflow.org/tutorials/image_retraining有一个关于如何微调Imagenet模型(在1.2M图像上训练1000个类别)以对花进行分类的样本数据集(3647个图像, 5个类别)。
上面的 Tensor flow教程简要而言,是在安装bazel和tensorflow之后,需要运行以下代码,用大约30分钟的来建模,5分钟来训练:
(
cd "$HOME" && \
curl -O http://download.tensorflow.org/example_images/flower_photos.tgz && \
tar xzf flower_photos.tgz ;
bazel build tensorflow/examples/image_retraining:retrain \
tensorflow/examples/image_retraining:label_image \
bazel-bin/tensorflow/examples/image_retraining/retrain \
--image_dir "$HOME"/flower_photos \
--how_many_training_steps=200
bazel-bin/tensorflow/examples/image_retraining/label_image \
--graph=/tmp/output_graph.pb \
--labels=/tmp/output_labels.txt \
--output_layer=final_result:0 \
--image=$HOME/flower_photos/daisy/21652746_cc379e0eea_m.jpg
或者,如果你安装了Docker,则可以使用以下预构建的Docker镜像:
sudo docker run -it --net=host liubowei/simple-ml-serving:latest /bin/bash
>>> cat test.sh && bash test.sh
这将进入容器内部的交互式shell中并运行上述命令; 如果你愿意的话,也可以按照容器内的其余部分进行操作。
现在,tensorflow已经将模型信息保存到/tmp/output_graph.pb和/tmp/output_labels.txt中,这些作为命令行 参数 传递给label_image.py脚本。Google的image_recognition教程也链接到另一个脚本,但是这里我们仍将使用label_image.py。
将本地运行转换为在线运行( Tensor flow)
如果我们只想接受来自标准输入的文件名,每行一个,我们就可以很容易地进行“在线”运行:
while read line ; do
bazel-bin/tensorflow/examples/image_retraining/label_image \
--graph=/tmp/output_graph.pb --labels=/tmp/output_labels.txt \
--output_layer=final_result:0 \
--image="$line" ;
done
然而,从性能的角度来看这样糟糕透了—— 每一个输入都要重新加载 神经网络 , 权重 ,整个 Tensor flow框架和python本身!
当然可以改进。先修改label_image.py 脚本。对我而言,这个脚本的位置在:
in bazel-bin/tensorflow/examples/image_retraining/label_image.runfiles/org_tensorflow/tensorflow/examples/image_retraining/label_image.py.
修改如下:
141: run_graph(image_data, labels, FLAGS.input_layer, FLAGS.output_layer,
142: FLAGS.num_top_predictions)141: for line in sys.stdin:
修改后马上快了很多,但这还不是最好。
141: for line in sys.stdin:
142: run_graph(load_image(line), labels, FLAGS.input_layer, FLAGS.output_layer,
142: FLAGS.num_top_predictions)
原因在于用with tf.Session()构建对话。 Tensor flow本质上是在每次调用run_graph时将所有的计算加载到内存中。一旦开始尝试在GPU上进行运算,这一点就会变得很明显—— 可以看到GPU内存使用随着 Tensor flow加载和卸载GPU的模型 参数 而上下波动。 据我所知,这种结构并不存在于Caffe或Pytorch框架中。
解决方法是把with命令去掉,传递一个sess变量到run_graph:
def run_graph(image_data, labels, input_layer_name, output_layer_name,
num_top_predictions, sess):
# Feed the image_data as input to the graph.
# predictions will contain a two-dimensional array, where one
# dimension represents the input image count, and the other has
# predictions per class
softmax_tensor = sess.graph.get_tensor_by_name(output_layer_name)
predictions, = sess.run(softmax_tensor, {input_layer_name: image_data})
# Sort to show labels in order of confidence
top_k = predictions.argsort()[-num_top_predictions:][::-1]
for node_id in top_k:
human_string = labels[node_id]
score = predictions[node_id]
print('%s (score = %.5f)' % (human_string, score))
return [ (labels[node_id], predictions[node_id].item()) for node_id in top_k ] # numpy floats are not json serializable, have to run item
with tf.Session() as sess:
for line in sys.stdin:
run_graph(load_image(line), labels, FLAGS.input_layer, FLAGS.output_layer,
FLAGS.num_top_predictions, sess)
如果你运行完这一段,你会发现每张图只需要大约0.1秒,对于在线应用来说已经够快了。
将本地运行转换为在线运行(其他ML框架)
Caffe使用net.forward代码,很容易被放入一个可调用的框架中:see http://nbviewer.jupyter.org/github/BVLC/caffe/blob/master/examples/00-classification.ipynb
Mxnet也是非常独特的:它实际上已经准备好了面向大众的服务器代码。
部署
我们的计划是,将这些代码包装到一个Flask应用程序中。如果你没有听说Flask,简单解释一下, Flask是一个非常轻量级的Python Web框架,它允许你以最少的工作启动一个http api服务器。
作为一个快速参考,这里是一个Flask应用程序,它接收包含多部分表单数据的POST请求:
#!/usr/bin/env python
# usage: python echo.py to launch the server ; and then in another session, do
# curl -v -XPOST 127.0.0.1:12480 -F "data=@./image.jpg"
from flask import Flask, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def classify():
data = request.files.get('data').read()
print repr(data)[:1000]
return data, 200
except Exception as e:
return repr(e), 500
app.run(host='127.0.0.1',port=12480)
这里是如何将相应的FLASK应用程序连接到上面的run_graph:
And here is the corresponding flask app hooked up to run_graph above:
#!/usr/bin/env python
# usage: bash tf_classify_server.sh
from flask import Flask, request
import tensorflow as tf
import label_image as tf_classify
import json
app = Flask(__name__)
FLAGS, unparsed = tf_classify.parser.parse_known_args()
labels = tf_classify.load_labels(FLAGS.labels)
tf_classify.load_graph(FLAGS.graph)
sess = tf.Session()
@app.route('/', methods=['POST'])
def classify():
data = request.files.get('data').read()
result = tf_classify.run_graph(data, labels, FLAGS.input_layer, FLAGS.output_layer, FLAGS.num_top_predictions, sess)
return json.dumps(result), 200
except Exception as e:
return repr(e), 500
app.run(host='127.0.0.1',port=12480)
模型部署至此看起来还是相当不错的。除了一点——需要FlASK和 Tensor flow完全同步——Flask按照接收的顺序一次处理一个请求,并且 Tensor flow在进行图像分类时完全占用线程。
速度瓶颈可能还是在实际的计算工作中,所以升级Flask包装代码没有太多的意义。现在,也许这个代码足以处理你的负载。
有两种显而易见的方法可以扩大请求的通量:通过增加工人数量来横向放大,这在下一节将会介绍,或者通过使用GPU和批处理 逻辑 来纵向扩展。实现后者需要一个能够一次处理多个待处理请求的web服务器,并决定是否继续等待更大的批处理或将其发送到 Tensor flow图形线程进行分类,对于这个Flask应用程序是非常不适合的。有两种可能性:使用Twisted + Klein来保留Python代码,或者如果你更喜欢一流的事件循环支持,并且能够连接到非Python ML框架(如Torch),则可以使用Node.js + ZeroMQ。
扩展:负载平衡和服务发现
那么,假设现在你只有一台服务器来部署模型,由于它太慢了,或者我们的负载变得太高了,此时你想要启动更多服务器——如何在每个服务器上分配请求?
常规的方法是添加一个代理层,也许是haproxy或nginx, 它能够平衡后端服务器之间的负载,同时向客户端呈现一个统一的接口。 为了在本节稍后使用,以下是运行基本Node.js负载均衡器http代理的一些示例代码:
// Usage : node basic_proxy.js WORKER_PORT_0,WORKER_PORT_1,...
const worker_ports = process.argv[2].split(',')
if (worker_ports.length === 0) { console.err('missing worker ports') ; process.exit(1) }
const proxy = require('http-proxy').createProxyServer({})
proxy.on('error', () => console.log('proxy error'))
let i = 0
require('http').createServer((req, res) => {
proxy.web(req,res, {target: 'http://localhost:' + worker_ports[ (i++) % worker_ports.length ]})
}).listen(12480)
console.log(`Proxying localhost:${12480} to [${worker_ports.toString()}]`)
// spin up the ML workers
const { exec } = require('child_process')
worker_ports.map(port => exec(`/bin/bash ./tf_classify_server.sh ${port}`))
为了自动检测后端服务器的数量和位置, 人们通常使用“服务发现”工具,该工具可能与负载平衡器捆绑在一起,或者是分开的。一些知名例子的是Consul和Zookeeper。 设置和学习使用它们不在本文的讨论范围之内,所以我使用了一个非常基本的,通过node.js服务发现包seport实现的代理。
Proxy代码:
// Usage : node seaport_proxy.js
const seaportServer = require('seaport').createServer()
seaportServer.listen(12481)
const proxy = require('http-proxy').createProxyServer({})
proxy.on('error', () => console.log('proxy error'))
let i = 0
require('http').createServer((req, res) => {
seaportServer.get('tf_classify_server', worker_ports => {
const this_port = worker_ports[ (i++) % worker_ports.length ].port
proxy.web(req,res, {target: 'http://localhost:' + this_port })
}).listen(12480)
console.log(`Seaport proxy listening on ${12480} to '${'tf_classify_server'}' servers registered to ${12481}`)
Worker代码:
// Usage : node tf_classify_server.js
const port = require('seaport').connect(12481).register('tf_classify_server')
console.log(`Launching tf classify worker on ${port}`)
require('child_process').exec(`/bin/bash ./tf_classify_server.sh ${port}`)
然而,当应用于 机器学习 时,这个设置遇到了带宽问题。
每秒几十到几百张图像,这个系统就会成为网络带宽的瓶颈。在目前的设置中,所有的数据都必须通过我们的单个seaport 主节点,这也是呈现给客户端的端点。
为了解决这个问题,我们需要我们的客户端不要访问http://127.0.0.1:12480这个端点,而是要在后端服务器之间通过自动轮换来访问。如果你懂网络,一定会想:这不就是DNS干的活嘛!
但是,设置自定义的DNS服务器已经超出了本文的范围。相反,通过更改客户端以遵循两步“手动DNS”协议,我们可以重新使用我们的基础版的seaport 代理来实现客户端直接连接到其服务器的“点对点”协议:
Proxy代码:
// Usage : node p2p_proxy.js
const seaportServer = require('seaport').createServer()
seaportServer.listen(12481)
let i = 0
require('http').createServer((req, res) => {
seaportServer.get('tf_classify_server', worker_ports => {
const this_port = worker_ports[ (i++) % worker_ports.length ].port