你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问
https://docs.azure.cn
。
从存储中的事件网格事件运行批处理终结点
项目
06/14/2024
7 个参与者
适用范围:
Azure CLI ml 扩展 v2(最新版)
Python SDK azure-ai-ml v2(最新版)
事件网格是一项全面托管的服务,可用于在许多不同的 Azure 服务和应用程序中轻松管理事件。 该服务简化了生成事件驱动和无服务器应用程序的方式。 本教程介绍如何在存储帐户中创建文件后立即触发批处理终结点的作业来处理这些文件。 此体系结构中使用逻辑应用工作流来订阅这些事件并触发终结点。
下图显示了此解决方案的体系结构:
以下步骤描述了此解决方案中的大致步骤:
在特定存储帐户中创建新的 Blob 时,将触发“已创建文件”事件
。
该事件会发送到事件网格,经过处理后发送给所有订阅者。
逻辑应用工作流订阅并侦听这些事件。
由于存储帐户可以包含多个数据资产,因此将应用事件筛选以仅对存储帐户内特定文件夹中发生的事件做出反应。 如果需要,可进一步筛选(例如,根据文件扩展名)。
逻辑应用工作流触发并执行以下操作:
获得一个授权令牌,使用来自服务主体的凭据调用批处理终结点。
使用新创建的文件作为输入触发批处理终结点(默认部署)。
批处理终结点将返回为处理文件而创建的作业的名称。
使用与事件网格连接的逻辑应用工作流调用批处理终结点时,将为在存储帐户中创建的每个 Blob 文件创建一个作业。
请记住,由于批处理终结点在文件级别分发工作,因此不会发生任何并行化。 相反,可以使用批处理终结点的功能在相同的计算群集上执行多个作业。 如果需要以自动方式在整个文件夹上运行作业,我们建议切换到
从 Azure 数据工厂调用批处理终结点
。
你已将模型正确部署为批处理终结点。 如果需要,可以扩展此体系结构来与
管道组件部署
配合使用。
你的批处理部署在名为
batch-cluster
的计算群集中运行。
你创建的逻辑应用将使用 REST 与 Azure 机器学习批处理终结点进行通信。
有关如何使用批处理终结点的 REST API 的详细信息,请阅读
为批处理终结点创建作业和输入数据
。
对批处理终结点进行身份验证
Azure 逻辑应用可以使用
HTTP
操作调用批处理终结点的 REST API。 Batch 终结点支持 Microsoft Entra ID 进行授权,因此对 API 发出的请求需要适当的身份验证处理。
在这种情况下,本教程使用服务主体进行身份验证以及与批处理终结点交互。
按照
向 Microsoft Entra ID 注册应用程序并创建服务主体
中的步骤创建服务主体。
按照
选项 3:创建新的客户端密码
中的说明创建用于身份验证的机密。
务必保存生成的客户端密码值,该值仅显示一次
。
务必保存应用程序“概述”窗格中的
client ID
和
tenant id
。
按照
授予访问权限
中的说明授予服务主体访问工作区的权限。 在本例中,服务主体需要以下权限:
工作区中读取批处理部署并对其执行操作的权限。
在数据存储中读取/写入的权限。
启用数据访问
为了指示要发送到部署作业的输入数据,本教程使用事件网格提供的云 URI。 批处理终结点使用计算的标识来装载数据,同时保留作业的标识,以便读取装载的作业。 因此,必须将用户分配的托管标识分配给计算群集,以确保群集有权装载基础数据。 若要确保数据访问,请执行以下步骤:
创建
托管标识资源
:
Azure CLI
Python
# Use the Azure CLI to create the managed identity. Then copy the value of the variable IDENTITY into a Python variable
identity="/subscriptions/<subscription>/resourcegroups/<resource-group>/providers/Microsoft.ManagedIdentity/userAssignedIdentities/azureml-cpu-cluster-idn"
from azure.ai.ml import MLClient
from azure.ai.ml.entities import AmlCompute, ManagedIdentityConfiguration
from azure.ai.ml.constants import ManagedServiceIdentityType
compute_name = "batch-cluster"
compute_cluster = ml_client.compute.get(name=compute_name)
compute_cluster.identity.type = ManagedServiceIdentityType.USER_ASSIGNED
compute_cluster.identity.user_assigned_identities = [
ManagedIdentityConfiguration(resource_id=identity)
ml_client.compute.begin_create_or_update(compute_cluster)
在 Azure 门户 中,确保托管标识具有读取数据的正确权限。
若要访问存储服务,必须至少具有存储帐户存储 Blob 数据读取者 访问权限。 只有存储帐户所有者可以通过 Azure 门户更改访问级别 。
创建逻辑应用
在 Azure 门户 中的 Azure 主页上,选择“创建资源”。
在“Azure 市场”菜单上,选择“集成”>“逻辑应用”。
在“创建逻辑应用”窗格中的“基本信息”选项卡上,提供有关你的逻辑应用资源的以下信息。
properties
LA-TravelTime-RG
你在其中创建逻辑应用资源和相关资源的 Azure 资源组 。 此名称在各个区域中必须唯一,并且只能包含字母、数字、连字符 (-
)、下划线 (_
)、括号((
、)
)和句点 (.
)。
LA-TravelTime
逻辑应用资源名称,在各个区域中必须唯一,并且只能包含字母、数字、连字符 (-
)、下划线 (_
)、括号((
、)
)和句点 (.
)。
我们希望每次在存储帐户的特定文件夹(数据资产)中创建新文件时触发逻辑应用工作流。 逻辑应用使用事件的信息来调用批处理终结点并传递要处理的特定文件。
在工作流设计器中,按照以下常规步骤添加 名为“发生资源事件时”的事件网格触发器。
在连接信息框中,选择要使用的身份验证类型,然后选择“登录”。
在触发器框中,请提供以下信息:
从“高级参数”列表中选择“前缀筛选器”,并提供以下值:
/blobServices/default/containers/<container-name>/blobs/<path-to-data-folder>
“前缀筛选器”属性允许事件网格仅在我们指定的特定路径中创建 blob 时通知工作流。 在这种情况下,我们假设文件将由所选存储帐户中 <container-name> 容器内的 <path-to-data-folder> 文件夹中的某个外部进程创建。 配置此参数以匹配数据的位置。 否则,将为在存储帐户的任何位置创建的任何文件触发该事件。 有关详细信息,请参阅事件网格的事件筛选 。
以下示例演示了触发器的显示方式:
concat('https://login.microsoftonline.com/', parameters('tenant_id'), '/oauth2/token')
若要输入此表达式,请在“URI”框中选择。 从显示的选项中选择表达式编辑器(公式图标)。
值为 application/x-www-form-urlencoded
的 Content-Type
concat('grant_type=client_credentials&client_id=', parameters('client_id'), '&client_secret=', parameters('client_secret'), '&resource=https://ml.azure.com')
若要输入此表达式,请在“正文”框中选择。 从显示的选项中选择表达式编辑器(公式图标)。
以下示例演示了一个示例“授权”操作:
在“授权”操作下,添加另一个 HTTP 操作,并将标题重命名为“调用”。
在“调用”操作中提供以下信息:
以前的有效负载对应于模型部署。 如果使用管道组件部署,请根据管道输入的预期调整格式。 有关如何在 REST 调用中构建输入的详细信息,请参阅为批处理终结点创建作业和输入数据 (REST) 。
以下示例演示了一个示例“调用”操作:
“调用”操作会触发批处理作业,但该操作不会等待作业完成。 默认情况下,Azure 逻辑应用不是为长时间运行的应用程序设置的。 如果你需要等待作业完成,建议切换到从 Azure 数据工厂运行批处理终结点 。
完成后,保存工作流。
逻辑应用工作流已准备好执行,每次在指定路径下创建新文件时都会自动触发。
若要确认应用已成功收到事件,请查看应用的“运行历史记录”: