添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
温暖的煎饼  ·  Resolve Git merge ...·  1 年前    · 
伤情的刺猬  ·  java - Exception in ...·  1 年前    · 
知识渊博的铁链  ·  Xamarin.Forms Q720p ...·  1 年前    · 
失落的饼干  ·  CompletableFuture in ...·  1 年前    · 

本文介绍如何使用 R 包(如 SparkR sparklyr dplyr )来处理 R data.frame Spark DataFrame 和内存中表。

请注意,使用 SparkR、sparklyr 和 dplyr 时,可能会发现可以使用所有这些包完成特定操作,并且可以使用最熟悉的包。 例如,若要运行查询,可以调用函数,例如 SparkR::sql sparklyr::sdf_sql dplyr::select 。 在其他情况下可能只需使用其中一到两个包来完成操作,你选择的操作取决于自己具体的使用情况。 例如,调用 sparklyr::sdf_quantile 的方式与调用 dplyr::percentile_approx 的方式略有不同,即使这两个函数都计算分位数。

可以将 SQL 用作 SparkR 和 sparklyr 之间的媒介。 例如,可以使用 SparkR::sql 查询使用 sparklyr 创建的表。 可以使用 sparklyr::sdf_sql 查询使用 SparkR 创建的表。 dplyr 代码在运行之前都会在内存中被转换为 SQL。 另请参阅 API 互操作性 SQL 转换

加载 SparkR、sparklyr 和 dplyr

SparkR、sparklyr 和 dplyr 包包含在 Azure Databricks 群集 上安装的 Databricks Runtime 中。 因此,在开始调用这些包之前,无需调用通常会使用的 install.package 。 但仍然必须先使用 library 加载这些包。 例如,在 Azure Databricks 工作区的 R 笔记本 中,在笔记本单元格中运行以下代码以加载 SparkR、sparklyr 和 dplyr:

library(SparkR)
library(sparklyr)
library(dplyr)

将 sparklyr 连接到群集

加载 sparklyr 后,必须调用 sparklyr::spark_connect 来连接到群集,以便指定 databricks 连接方法。 例如,在笔记本单元中运行以下代码以连接到托管笔记本的群集:

sc <- spark_connect(method = "databricks")

相比之下,Azure Databricks 笔记本已在群集上建立用于 SparkR 的 SparkSession 笔记本,因此在开始调用 SparkR 之前无需调用 SparkR::sparkR.session

将 JSON 数据文件上传到工作区

本文中的许多代码示例基于 Azure Databricks 工作区中特定位置的数据,其中包含特定的列名称和数据类型。 此代码示例的数据来自 GitHub 中的一个名为 book.json 的 JSON 文件。 若要获取此文件并将其上传到工作区:

  • 转到 GitHub 上的 books.json 文件,并使用文本编辑器将其内容复制到名为 books.json 的文件中(该文件位于本地计算机上)。
  • 在 Azure Databricks 工作区的“数据科学”&“工程”或“机器学习”视图中,单击边栏上的“数据”。
  • 单击“创建表”。
  • 在“上传文件”选项卡上,将 books.json 文件从本地计算机拖放到“拖放文件以进行上传”框中。 或者选择“单击以浏览”,然后在本地计算机中浏览到 books.json 文件。
  • 默认情况下,Azure Databricks 通过路径 /FileStore/tables/books.json 将本地 books.json 文件上传到工作区中的 DBFS 位置。

    请勿单击“使用 UI 创建表”或“在笔记本中创建表”。 本文中的代码示例使用 DBFS 位置中上传的 books.json 文件的数据。

    将 JSON 数据读取到 DataFrame

    使用 sparklyr::spark_read_json 将上传的 JSON 文件读取到 DataFrame 中,指定连接、JSON 文件的路径和数据的内部表表示名称。 在此示例中,必须指定包含多行的 book.json 文件。 可在此处选择性地指定列的架构。 如果不指定,则在默认情况下 sparklyr 会推断列的架构。 例如,在笔记本单元格中运行以下代码,将上传的 JSON 文件的数据读取到名为 jsonDF 的 DataFrame 中:

    jsonDF <- spark_read_json(
      sc      = sc,
      name    = "jsonTable",
      path    = "/FileStore/tables/books.json",
      options = list("multiLine" = TRUE),
      columns = c(
        author    = "character",
        country   = "character",
        imageLink = "character",
        language  = "character",
        link      = "character",
        pages     = "integer",
        title     = "character",
        year      = "integer"
    
    

    可以使用 SparkR::headSparkR::showsparklyr::collect 打印 DataFrame 的第一行。 默认情况下,head 会打印前六行。 showcollect 打印前 10 行。 例如,在笔记本单元格中运行以下代码以打印名为 jsonDF 的 DataFrame 的第一行:

    head(jsonDF)
    # Source: spark<?> [?? x 8]
    #   author                  country        image…¹ langu…² link  pages title  year
    #   <chr>                   <chr>          <chr>   <chr>   <chr> <int> <chr> <int>
    # 1 Chinua Achebe           Nigeria        images… English "htt…   209 Thin…  1958
    # 2 Hans Christian Andersen Denmark        images… Danish  "htt…   784 Fair…  1836
    # 3 Dante Alighieri         Italy          images… Italian "htt…   928 The …  1315
    # 4 Unknown                 Sumer and Akk… images… Akkadi… "htt…   160 The … -1700
    # 5 Unknown                 Achaemenid Em… images… Hebrew  "htt…   176 The …  -600
    # 6 Unknown                 India/Iran/Ir… images… Arabic  "htt…   288 One …  1200
    # … with abbreviated variable names ¹​imageLink, ²​language
    show(jsonDF)
    # Source: spark<jsonTable> [?? x 8]
    #    author                  country       image…¹ langu…² link  pages title  year
    #    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
    #  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
    #  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
    #  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
    #  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
    #  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
    #  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
    #  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
    #  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
    #  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
    # 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
    # … with more rows, and abbreviated variable names ¹​imageLink, ²​language
    # ℹ Use `print(n = ...)` to see more rows
    collect(jsonDF)
    # A tibble: 100 × 8
    #    author                  country       image…¹ langu…² link  pages title  year
    #    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
    #  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
    #  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
    #  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
    #  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
    #  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
    #  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
    #  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
    #  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
    #  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
    # 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
    # … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
    # ℹ Use `print(n = ...)` to see more rows
    

    运行 SQL 查询,对表进行写入和读取

    可以使用 dplyr 函数在 DataFrame 上运行 SQL 查询。 例如,在笔记本单元格中运行以下代码,使用 dplyr::group_bydployr::count 从名为 jsonDF 的 DataFrame 中按作者获取计数。 使用 dplyr::arrangedplyr::desc 按计数降序对结果进行排序。 随后打印前 10 行(默认)。

    group_by(jsonDF, author) %>%
      count() %>%
      arrange(desc(n))
    # Source:     spark<?> [?? x 2]
    # Ordered by: desc(n)
    #    author                     n
    #    <chr>                  <dbl>
    #  1 Fyodor Dostoevsky          4
    #  2 Unknown                    4
    #  3 Leo Tolstoy                3
    #  4 Franz Kafka                3
    #  5 William Shakespeare        3
    #  6 William Faulkner           2
    #  7 Gustave Flaubert           2
    #  8 Homer                      2
    #  9 Gabriel García Márquez     2
    # 10 Thomas Mann                2
    # … with more rows
    # ℹ Use `print(n = ...)` to see more rows
    

    然后可以使用 sparklyr::spark_write_table 将结果写入 Azure Databricks 中的表。 例如,在笔记本单元格中运行以下代码以重新运行查询,然后将结果写入名为 json_books_agg 的表:

    group_by(jsonDF, author) %>%
      count() %>%
      arrange(desc(n)) %>%
      spark_write_table(
        name = "json_books_agg",
        mode = "overwrite"
    

    若要验证表是否已创建,可以使用 sparklyr::sdf_sqlSparkR::showDF 来显示表的数据。 例如,在笔记本单元格中运行以下代码查询表并将其汇入至 DataFrame,然后使用 sparklyr::collect 打印 DataFrame 的前 10 行(默认):

    collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
    # A tibble: 82 × 2
    #    author                     n
    #    <chr>                  <dbl>
    #  1 Fyodor Dostoevsky          4
    #  2 Unknown                    4
    #  3 Leo Tolstoy                3
    #  4 Franz Kafka                3
    #  5 William Shakespeare        3
    #  6 William Faulkner           2
    #  7 Homer                      2
    #  8 Gustave Flaubert           2
    #  9 Gabriel García Márquez     2
    # 10 Thomas Mann                2
    # … with 72 more rows
    # ℹ Use `print(n = ...)` to see more rows
    

    还可以用 sparklyr::spark_read_table 执行类似操作。 例如,在笔记本单元格中运行以下代码,将上文名为 jsonDF 的 DataFrame 查询并汇入到一个 DataFrame 中,然后使用 sparklyr::collect 打印 DataFrame 的前 10 行(默认):

    fromTable <- spark_read_table(
      sc   = sc,
      name = "json_books_agg"
    collect(fromTable)
    # A tibble: 82 × 2
    #    author                     n
    #    <chr>                  <dbl>
    #  1 Fyodor Dostoevsky          4
    #  2 Unknown                    4
    #  3 Leo Tolstoy                3
    #  4 Franz Kafka                3
    #  5 William Shakespeare        3
    #  6 William Faulkner           2
    #  7 Homer                      2
    #  8 Gustave Flaubert           2
    #  9 Gabriel García Márquez     2
    # 10 Thomas Mann                2
    # … with 72 more rows
    # ℹ Use `print(n = ...)` to see more rows
    

    在 DataFrame 中添加列并计算列值

    你可以使用 dplyr 函数来向 DataFrames 添加列,并计算列的值。

    例如,在笔记本单元格中运行以下代码,以获取名为 jsonDF 的 DataFrame 的内容。 使用 dplyr::mutate 添加一个名为 today 的列,并用当前时间戳填充这个新列。 然后将这些内容写入名为 withDate 的新 DataFrame 中,并使用 dplyr::collect 打印新 DataFrame 的前 10 行(默认)。

    dplyr::mutate 仅接受符合 Hive 内置函数(也称为 UDF)和内置聚合函数(也称为 UDAF)的参数。 有关常规信息,请参阅 Hive 函数。 有关本部分中与日期相关的函数的信息,请参阅日期函数

    withDate <- jsonDF %>%
      mutate(today = current_timestamp())
    collect(withDate)
    # A tibble: 100 × 9
    #    author    country image…¹ langu…² link  pages title  year today
    #    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
    #  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
    #  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
    #  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
    #  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
    #  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
    #  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
    #  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
    #  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
    #  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
    # 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
    # … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
    # ℹ Use `print(n = ...)` to see more rows
    

    现在请用 dplyr::mutatewithDate DataFrame 的内容添加另外两列。 新的 monthyear 列包含来自 today 列的月份和年份数值。 然后将这些内容写入一个名为 withMMyyyy 的新 DataFrame 中,并使用 dplyr::selectdplyr::collect 打印新 DataFrame 前十行(默认)中的 authortitlemonthyear 列:

    withMMyyyy <- withDate %>%
      mutate(month = month(today),
             year  = year(today))
    collect(select(withMMyyyy, c("author", "title", "month", "year")))
    # A tibble: 100 × 4
    #    author                  title                                     month  year
    #    <chr>                   <chr>                                     <int> <int>
    #  1 Chinua Achebe           Things Fall Apart                             9  2022
    #  2 Hans Christian Andersen Fairy tales                                   9  2022
    #  3 Dante Alighieri         The Divine Comedy                             9  2022
    #  4 Unknown                 The Epic Of Gilgamesh                         9  2022
    #  5 Unknown                 The Book Of Job                               9  2022
    #  6 Unknown                 One Thousand and One Nights                   9  2022
    #  7 Unknown                 Njál's Saga                                   9  2022
    #  8 Jane Austen             Pride and Prejudice                           9  2022
    #  9 Honoré de Balzac        Le Père Goriot                                9  2022
    # 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
    # … with 90 more rows
    # ℹ Use `print(n = ...)` to see more rows
    

    现在使用 dplyr::mutatewithMMyyyy DataFrame 的内容添加另外两列。 新的 formatted_date 列包含来自 today 列的 yyyy-MM-dd 部分,而新的 day 列包含来自新 formatted_date 列的日期数值。 然后将这些内容写入一个名为 withUnixTimestamp 的新 DataFrame 中,并使用 dplyr::selectdplyr::collect 打印新 DataFrame 前十行(默认)中的 titleformatted_dateday 列:

    withUnixTimestamp <- withMMyyyy %>%
      mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
             day            = dayofmonth(formatted_date))
    collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
    # A tibble: 100 × 3
    #    title                                           formatted_date   day
    #    <chr>                                           <chr>          <int>
    #  1 Things Fall Apart                               2022-09-27        27
    #  2 Fairy tales                                     2022-09-27        27
    #  3 The Divine Comedy                               2022-09-27        27
    #  4 The Epic Of Gilgamesh                           2022-09-27        27
    #  5 The Book Of Job                                 2022-09-27        27
    #  6 One Thousand and One Nights                     2022-09-27        27
    #  7 Njál's Saga                                     2022-09-27        27
    #  8 Pride and Prejudice                             2022-09-27        27
    #  9 Le Père Goriot                                  2022-09-27        27
    # 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
    # … with 90 more rows
    # ℹ Use `print(n = ...)` to see more rows
    

    创建一个临时视图

    可以在内存中创建基于现有 DataFrame 的命名临时视图。 例如,在笔记本单元格中运行以下代码,使用 SparkR::createOrReplaceTempView 获取上文名为 jsonTable 的 DataFrame 的内容,并从中创建一个名为 timestampTable 的临时视图。 然后使用 sparklyr::spark_read_table 读取临时视图的内容。 使用 sparklyr::collect 打印临时表的前 10 行(默认):

    createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
    spark_read_table(
      sc = sc,
      name = "timestampTable"
    ) %>% collect()
    # A tibble: 100 × 10
    #    author    country image…¹ langu…² link  pages title  year today
    #    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
    #  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
    #  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
    #  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
    #  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
    #  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
    #  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
    #  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
    #  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
    #  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
    # 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
    # … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
    #   names ¹​imageLink, ²​language
    # ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
    

    对 DataFrame 执行统计分析

    可以使用 sparklyr 和 dplyr 进行统计分析。

    例如创建一个 DataFrame 来运行统计信息。 为此,请在笔记本单元格中运行以下代码,使用 sparklyr::sdf_copy_to 将内置到 R 中的 iris 数据集的内容写入名为 iris 的 DataFrame 中。 使用 sparklyr::sdf_collect 打印临时表的前 10 行(默认):

    irisDF <- sdf_copy_to(
      sc        = sc,
      x         = iris,
      name      = "iris",
      overwrite = TRUE
    sdf_collect(irisDF, "row-wise")
    # A tibble: 150 × 5
    #    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
    #           <dbl>       <dbl>        <dbl>       <dbl> <chr>
    #  1          5.1         3.5          1.4         0.2 setosa
    #  2          4.9         3            1.4         0.2 setosa
    #  3          4.7         3.2          1.3         0.2 setosa
    #  4          4.6         3.1          1.5         0.2 setosa
    #  5          5           3.6          1.4         0.2 setosa
    #  6          5.4         3.9          1.7         0.4 setosa
    #  7          4.6         3.4          1.4         0.3 setosa
    #  8          5           3.4          1.5         0.2 setosa
    #  9          4.4         2.9          1.4         0.2 setosa
    # 10          4.9         3.1          1.5         0.1 setosa
    # … with 140 more rows
    # ℹ Use `print(n = ...)` to see more rows
    

    现在使用 dplyr::group_bySpecies 列对行进行分组。 使用 dplyr::summarizedplyr::percentile_approx 通过 SpeciesSepal_Length 列的第 25、50、75、100 分位数计算汇总统计信息。 使用 sparklyr::collect 打印结果:

    dplyr::summarize 仅接受符合 Hive 内置函数(也称为 UDF)和内置聚合函数(也称为 UDAF)的参数。 有关常规信息,请参阅 Hive 函数。 有关 percentile_approx 的信息,请参阅内置聚合函数 (UDAF)

    quantileDF <- irisDF %>%
      group_by(Species) %>%
      summarize(
        quantile_25th = percentile_approx(
          Sepal_Length,
        quantile_50th = percentile_approx(
          Sepal_Length,
        quantile_75th = percentile_approx(
          Sepal_Length,
        quantile_100th = percentile_approx(
          Sepal_Length,
    collect(quantileDF)
    # A tibble: 3 × 5
    #   Species    quantile_25th quantile_50th quantile_75th quantile_100th
    #   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
    # 1 virginica            6.2           6.5           6.9            7.9
    # 2 versicolor           5.6           5.9           6.3            7
    # 3 setosa               4.8           5             5.2            5.8
    

    可以使用以下方法计算类似的结果,例如使用 sparklyr::sdf_quantile

    print(sdf_quantile(
      x = irisDF %>%
        filter(Species == "virginica"),
      column = "Sepal_Length",
      probabilities = c(0.25, 0.5, 0.75, 1.0)
    # 25%  50%  75% 100%
    # 6.2  6.5  6.9  7.9