添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
难过的春卷  ·  Cannot add merged ...·  2 周前    · 
一直单身的小刀  ·  C++ ...·  1 年前    · 
听话的感冒药  ·  2021 ...·  1 年前    · 

一 数据示例

数据是带有时间戳的json数组
数据格式: xxx|[{},{}]

1610352196000|[{"cp_game_id":1658,"category":"cp_api","event":{"event_time":1610348596000,"event_name":"dungeon_flow"},"data":{"role_name":"xiaohao","role_vip":10,"dungeon_type":"主线关卡","dungeon_id":10916,"dungeon_name":"关卡23-24","chapter_id":33,"chapter_name":"23. 异化之地"}},{"cp_game_id":1658,"category":"cp_api","event":{"event_time":1610350804000,"event_name":"dungeon_flow"},"data":{"role_name":"我们一样酷","role_vip":8,"dungeon_type":"主线关卡","dungeon_id":10911,"dungeon_name":"关卡23-19","chapter_id":33,"chapter_name":"23. 异化之地"}}]
 val tmp = sc.textFile("in/test_Json.log")
 tmp.foreach(println)  //打印数据

二 拆分数据

json数组可根据 },{ 来切分数据。
分为两个步骤:
1 实现将 },{ 替换成 }\n timeServer| {
2 再扁体化数据

object Json_DataFrame {
  def main(args: Array[String]): Unit = {
    val start = System.currentTimeMillis()
    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName, Constants.SPARK_LOCAL_MODE) // 初始化spark
    val sc = spark.sparkContext
    //读取数据
    val tmp = sc.textFile("in/test_Json.log")
    tmp.foreach(println)
    //隐形转换
    import org.apache.spark.sql.functions._
    import spark.implicits._
      * 拆分数据  实现将 },{     替换成 }\n timeServer| {
      * 再通过\n 扁体化数据
    val df = tmp.map(
      line => {
        val data = line.split("\\|")
        val timeServer = data(0)
        val dataStr = data(1).replaceAll("\\]|\\[", "")
        val result = timeServer + "|" + dataStr
        result.replaceAll("\\}\\,\\{", s"\\}\\\n$timeServer\\|\\{")    //  },{  替换成  }\n timeServer|{
      .flatMap(_.split("\n")).toDF()
    df.show(false)  //打印数据
    val end = System.currentTimeMillis()
    println(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")

单条数据被拆分出来
在这里插入图片描述
再通过map算子将时间戳timeServer提取出来就可以

 .map(
        line => {
          val data = line.split("\\|")
          (data(0), data(1))
        }).toDF("timeServer", "value")

三 拆分嵌套子json

使用get_json_object获取json里面字段的值

object Json_DataFrame {
  def main(args: Array[String]): Unit = {
    val start = System.currentTimeMillis()
    val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName, Constants.SPARK_LOCAL_MODE) // 初始化spark
    val sc = spark.sparkContext
    //读取数据
    val tmp = sc.textFile("in/test_Json.log")
    //隐形转换
    import org.apache.spark.sql.functions._
    import spark.implicits._
      * 拆分数据  实现将 },{     替换成 }\n timeServer| {
      * 再通过\n 扁体化数据
    val df = tmp.map(
      line => {
        val data = line.split("\\|")
        val timeServer = data(0)
        val dataStr = data(1).replaceAll("\\]|\\[", "") // 去掉 []
        val result = timeServer + "|" + dataStr
        result.replaceAll("\\}\\,\\{", s"\\}\\\n$timeServer\\|\\{") //  },{  替换成  }\n timeServer|
      .flatMap(_.split("\n"))
      .map(
        line => {
          val data = line.split("\\|")
          (data(0), data(1))
        }).toDF("timeServer", "value")
    val jsonDf = df.select(
      $"timeServer",
      get_json_object($"value", "$.cp_game_id").alias("cp_game_id"),
      get_json_object($"value", "$.event").alias("event"),
      get_json_object($"value", "$.data").alias("data")
    jsonDf.show(false)
    // 获取子JSON里面字段
    val resultDf = jsonDf.select(
      $"timeServer",
      $"cp_game_id",
      get_json_object($"event", "$.event_time").alias("event_time"),
      get_json_object($"event", "$.event_name").alias("event_name"),
      $"data" // 使用get_json_object获取子字段
    resultDf.show()
    sc.stop()
    val end = System.currentTimeMillis()
    println(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")

到这来,就完成了特殊JSON数组解析

SparkJava的杰森 在执行以下命令之前,请在sparkjob.conf文件中更改spark.driver.extraClassPath属性。 ./bin/spark-submit --class org.sparketl.etljobs.SparkEtl --properties-file sparkjob.conf /sparketl/target/sparketl-0.0.1-SNAPSHOT.jar {spark master url} {使用存在的city_list.json在此项目中} {输出文件(用于配对RDD){单个RDD的输出文件}} {国家(地区):美国} 对于Spark初学者,请转到 1. json数据格式–定义 JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式,易于人阅读和编写。 2.json数据格式解编码(2.1,2.2两种方法) 2.1 json函数实现解编码:json.dumps及json.loads data_day_df.index.get_loc(current_kline.open_time, method='backfill') open_time为当前的日线级数据。运算后得到周线数据。 1)直接将数据文件上传到对应的表的目录下 ;hdfs dfs -put 本地文件 hdfs中表对应的目录; 2)使用命令导入本地文件: 如果是本地数据 原理就是将本地数据上传到指定的表目录下 load data local inpath "本地文件" into table 表名; 3) 使用命令导入hdfs 中的文件: 如果是HDFS数据 原理是将HDFS的数据移动到指定的表目录下 load data inpath "hdfs文件的路径" into table ... 本文主要讲述的是标准化与归一化的区别,相同点和联系,重点讲述各自的使用场景,归一化主要是应用于没有距离计算的地方上,标准化则是使用在不关乎权重的地方上,因为各自丢失了距离信息和权重信息,最后还讲述了下归一化的使用场景,主要是针对数据分布差异比较大–标准化和奇异数据(单个数据对结果有影响的话)–归一化的情况下.使用 一.不同点 二.相同点及其联系 三.归一化(广义)的场景 3.1 特征/数据需... 一般出现这个问题都发生在数组的获取中,原因是本身一个只有4个元素的数组,你截取了超过他本身就会出现越界的问题。之前刚开始学JAVA的时候有个问题特别的典型,就是把int a=1000 赋值给 byte b=a 就会出现越界的问题。原因就是byte所承受的是-128到127,1000大于了他所能承受的范围,所以越界。 import play.api.libs.json._val input = sc.parallelize(List( """{"name":"过往记忆","website":"www.iteblog.com"}""","""{"other":"过往记忆"}"""))val parsed = input.map(Json.parse)parsed.collectoutput:{"name":"过往... 根据order值进行跳跃顺序排序,生成seq字段,同时生成索引值。 跳跃顺序排序 如order值相同,则排序是一样的,下一个排序会跳过,例如第1条数据的order值和第 2 条的值不同,第2条的order值与后2条的值相同,第4条数据的order值和第 5 条的值不同,则排序的 seq 值为 1,2,2,2,5。 **内有详情注释 import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.spark.r