添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

可以调用 Databricks REST API 以使用 Python 代码自动执行 Azure Databricks,而不是使用非 Python 命令行工具(如 curl )或 API 客户端(如 Postman)。 若要使用 Python 调用 Databricks REST API,可以使用 Databricks CLI 包 作为库。 此库以 Python 编写,使你能够通过 Python 类调用 Databricks REST API,这些类密切建模 Databricks REST API 请求和响应有效负载。

另一种方法是直接使用 Python requests 库。 但是,你需要在较低级别进行操作,手动提供必要的标头,处理错误和其他相关的低级别编码任务。 有关详细信息,请参阅 使用 Python 调用 Databricks REST API

我们正积极开发此 Databricks CLI,将以 试验 客户端的形式发布它。 这意味着,相关界面仍可能会变化。

Databricks CLI 支持调用以下 Databricks REST API:

  • 群集策略 API 2.0
  • 群集 API 2.0
  • DBFS API 2.0
  • 组 API 2.0
  • 实例池 API 2.0
  • 作业 API 2.1、2.0
  • 库 API 2.0
  • Delta Live Tables API 2.0
  • 存储库 API 2.0
  • 机密 API 2.0
  • 令牌 API 2.0
  • Unity Catalog API 2.1
  • 工作区 API 2.0
  • Databricks CLI 不支持调用以下 Databricks REST API:

  • Databricks SQL 查询、仪表板和警报 API 2.0
  • Databricks SQL 查询历史记录 API 2.0
  • Databricks SQL 仓库 API 2.0
  • Git 凭据 API 2.0
  • 全局初始化脚本 API 2.0
  • IP 访问列表 API 2.0
  • MLflow API 2.0
  • 权限 API 2.0
  • SCIM API 2.0
  • 令牌管理 API 2.0
  • API 1.2
  • 有关详细信息,请参阅 Databricks REST API 参考

  • Python 3.6 或更高版本。 若要检查是否已安装 Python,并在已安装的情况下检查安装的版本,请从 PowerShell 的终端运行 python --version 安装 Python (如果尚未安装)。

    python --version
    

    Python 的某些安装需要 python3 而不是 python。 如果遇到此情况,请在本文中将 python 替换为 python3

  • Databricks CLI。 若要检查是否已安装 Databricks CLI,并在已安装的情况下检查安装的版本,请运行 databricks --version。 若要安装 Databricks CLI,请运行 pip install databricks-clipython -m pip install databricks-cli

    # Check whether the Databricks CLI is installed, and if so check the installed version.
    databricks --version
    # Install the Databricks CLI.
    pip install databricks-cli
    # Or...
    python -m pip install databricks-cli
    

    pip 的某些安装需要 pip3 而不是 pip。 如果遇到此情况,请在本文中将 pip 替换为 pip3

  • 你的每工作区 URL,例如 https://adb-1234567890123456.7.azuredatabricks.net

  • 用于 Azure Databricks 工作区的 Azure Databricks 个人访问令牌或 Azure Active Directory (Azure AD) 令牌。 若要创建 Azure Databricks 个人访问令牌,请参阅 个人访问令牌;另请参阅管理个人访问令牌。 若要创建 Azure AD 令牌,请参阅 Azure AD 令牌

    步骤 1:设置身份验证

    若要通过 Databricks CLI 包库对 Databricks REST API 进行身份验证,Python 代码至少需要两条信息:

  • 每工作区 URL,例如 https://adb-1234567890123456.7.azuredatabricks.net
  • 用于 Azure Databricks 工作区的 Azure Databricks 个人访问令牌或 Azure Active Directory (Azure AD) 令牌。 若要创建 Azure Databricks 个人访问令牌,请参阅 个人访问令牌;另请参阅管理个人访问令牌。 若要创建 Azure AD 令牌,请参阅 Azure AD 令牌
  • 为提升代码模块化、可移植性和安全性,不应将此信息硬编码到 Python 代码中。 而是应该在运行时从安全位置检索此信息。 例如,本文中的代码使用以下环境变量:

  • DATABRICKS_HOST表示你的 每工作区 URL例如 https://adb-1234567890123456.7.azuredatabricks.net
  • DATABRICKS_TOKEN. 用于 Azure Databricks 工作区的 Azure Databricks 个人访问令牌或 Azure Active Directory (Azure AD) 令牌。 若要创建 Azure Databricks 个人访问令牌,请参阅 Azure Databricks 个人访问令牌;另请参阅管理个人访问令牌。 若要创建 Azure AD 令牌,请参阅 Azure AD 令牌
  • 可以按如下所示设置这些环境变量:

    Unix、linux 和 macos

    如果只为当前终端会话设置环境变量,请运行以下命令。 要为所有终端会话设置环境变量,请在 shell 的启动文件中输入以下命令,然后重启终端。 将此处的示例值替换为你自己的值。

    export DATABRICKS_HOST="https://adb-12345678901234567.8.azuredatabricks.net"
    export DATABRICKS_TOKEN="dapi1234567890b2cd34ef5a67bc8de90fa12b"
    

    Windows

    如果只为当前 PowerShell 会话设置环境变量,请运行以下命令。 将此处的示例值替换为你自己的值。

    set DATABRICKS_HOST="https://adb-12345678901234567.8.azuredatabricks.net"
    set DATABRICKS_TOKEN="dapi1234567890b2cd34ef5a67bc8de90fa12b"
    

    要为所有命令提示符会话设置环境变量,请运行以下命令,然后重启命令提示符。 将此处的示例值替换为你自己的值。

    setx DATABRICKS_HOST "https://adb-12345678901234567.8.azuredatabricks.net"
    setx DATABRICKS_TOKEN "dapi1234567890b2cd34ef5a67bc8de90fa12b"
    

    步骤 2:编写代码

  • 在 Python 代码文件中,导入 os 库,以使代码能够获取环境变量值。

    import os
    
  • databricks_cli.sdk.api_client 模块导入 ApiClient 类,以使代码能够对 Databricks REST API 进行身份验证。

    from databricks_cli.sdk.api_client import ApiClient
    
  • 根据需要导入其他类,使代码能够在进行身份验证后调用 Databricks REST API,如下所示。

    REST API import 语句 DBFS API 2.0 from databricks_cli.dbfs.api import DbfsApi

    from databricks_cli.dbfs.dbfs_path import DbfsPath 组 API 2.0 from databricks_cli.groups.api import GroupsApi 实例池 API 2.0 from databricks_cli.instance_pools.api import InstancePoolsApi 作业 API 2.1 from databricks_cli.jobs.api import JobsApi ( 1 )

    from databricks_cli.runs.api import RunsApi ( 2 ) 库 API 2.0 from databricks_cli.libraries.api import LibrariesApi Delta Live Tables API 2.0 from databricks_cli.pipelines.api import PipelinesApi, LibraryObject 存储库 API 2.0 from databricks_cli.repos.api import ReposApi 机密 API 2.0 from databricks_cli.secrets.api import SecretApi 令牌 API 2.0 from databricks_cli.tokens.api import TokensApi Unity Catalog API 2.1 from databricks_cli.unity_catalog.api import UnityCatalogApi 工作区 API 2.0 from databricks_cli.workspace.api import WorkspaceApi
  • 使用 ApiClient 类对 Databricks REST API 进行身份验证。 使用 os 库的 getenv 函数以获取每工作区 URL(例如 https://adb-1234567890123456.7.azuredatabricks.net 和令牌值)。 以下示例使用变量名称 api_client 来表示 ApiClient 类的实例。

    api_client = ApiClient(
      host  = os.getenv('DATABRICKS_HOST'),
      token = os.getenv('DATABRICKS_TOKEN')
    
  • 根据需要初始化类的实例,以在进行身份验证后调用 Databricks REST API,例如:

    REST API 建议的类初始化语句

    例如,若要使用群集 API 2.0 列出工作区中的可用群集名称及其 ID,请添加以下代码:

    clusters_list = clusters_api.list_clusters()
    print("Cluster name, cluster ID")
    for cluster in clusters_list['clusters']:
      print(f"{cluster['cluster_name']}, {cluster['cluster_id']}")
    

    上述说明的完整代码如下所示:

    import os
    from databricks_cli.sdk.api_client import ApiClient
    from databricks_cli.clusters.api import ClusterApi
    api_client = ApiClient(
      host  = os.getenv('DATABRICKS_HOST'),
      token = os.getenv('DATABRICKS_TOKEN')
    clusters_api   = ClusterApi(api_client)
    clusters_list  = cluster_api.list_clusters()
    print("Cluster name, cluster ID")
    for cluster in clusters_list['clusters']:
      print(f"{cluster['cluster_name']}, {cluster['cluster_id']}")
    

    以下示例演示如何使用 Databricks CLI 和 Python 中的源代码为一些基本使用方案自动执行 Databricks REST API。

    从 DBFS 路径下载文件

    import os
    from databricks_cli.sdk.api_client import ApiClient
    from databricks_cli.dbfs.api import DbfsApi
    from databricks_cli.dbfs.dbfs_path import DbfsPath
    api_client = ApiClient(
      host  = os.getenv('DATABRICKS_HOST'),
      token = os.getenv('DATABRICKS_TOKEN')
    dbfs_source_file_path      = 'dbfs:/tmp/users/someone@example.com//hello-world.txt'
    local_file_download_path   = './hello-world.txt'
    dbfs_api  = DbfsApi(api_client)
    dbfs_path = DbfsPath(dbfs_source_file_path)
    # Download the workspace file locally.
    dbfs_api.get_file(
      dbfs_path,
      local_file_download_path,
      overwrite = True
    # Print the downloaded file's contents.
    print(open(local_file_download_path, 'r').read())
    

    获取有关增量实时表管道的信息

    import os
    from databricks_cli.sdk.api_client import ApiClient
    from databricks_cli.pipelines.api import PipelinesApi
    api_client = ApiClient(
      host  = os.getenv('DATABRICKS_HOST'),
      token = os.getenv('DATABRICKS_TOKEN')
    pipelines_api = PipelinesApi(api_client)
    pipelines_get = pipelines_api.get('1234a56b-c789-0123-d456-78901234e5f6')
    print(f"Name:    {pipelines_get['name']}\n" \
          f"ID:      {pipelines_get['pipeline_id']}\n" \
          f"State:   {pipelines_get['state']}\n" \
          f"Creator: {pipelines_get['creator_user_name']}"
    

    在 Unity Catalog 中创建架构

    import os
    from databricks_cli.sdk.api_client import ApiClient
    from databricks_cli.unity_catalog.api import UnityCatalogApi
    api_client = ApiClient(
      host  = os.getenv('DATABRICKS_HOST'),
      token = os.getenv('DATABRICKS_TOKEN')
    unity_catalog_api = UnityCatalogApi(api_client)
    catalog           = "main"
    schema            = "my_schema"
    # Create the schema (also known as a database) in the specified catalog.
    unity_catalog_create_schema = unity_catalog_api.create_schema(
      catalog_name = catalog,
      schema_name  = schema,
      comment      = "This is my schema"
    print(f"Schema:       {unity_catalog_create_schema['name']}\n" \
          f"Owner:        {unity_catalog_create_schema['owner']}\n" \
          f"Metastore ID: {unity_catalog_create_schema['metastore_id']}"
    # Delete the schema.
    unity_catalog_api.delete_schema(f"{catalog}.{schema}")
    

    列出工作区路径中的对象

    import os
    from databricks_cli.sdk.api_client import ApiClient
    from databricks_cli.workspace.api import WorkspaceApi, WorkspaceFileInfo
    api_client = ApiClient(
      host  = os.getenv('DATABRICKS_HOST'),
      token = os.getenv('DATABRICKS_TOKEN')
    workspace_api          = WorkspaceApi(api_client)
    workspace_list_objects = workspace_api.list_objects('/Users/someone@example.com/')
    for object in workspace_list_objects:
      print(
        object.to_row(
          is_long_form = True,
          is_absolute  = True
    
  •