使用get_json_object获取json里面字段的值
object Json_DataFrame {
def main(args: Array[String]): Unit = {
val start = System.currentTimeMillis()
val spark = SparkUtil.getSparkSession(this.getClass.getSimpleName, Constants.SPARK_LOCAL_MODE)
val sc = spark.sparkContext
val tmp = sc.textFile("in/test_Json.log")
import org.apache.spark.sql.functions._
import spark.implicits._
* 拆分数据 实现将 },{ 替换成 }\n timeServer| {
* 再通过\n 扁体化数据
val df = tmp.map(
line => {
val data = line.split("\\|")
val timeServer = data(0)
val dataStr = data(1).replaceAll("\\]|\\[", "")
val result = timeServer + "|" + dataStr
result.replaceAll("\\}\\,\\{", s"\\}\\\n$timeServer\\|\\{")
.flatMap(_.split("\n"))
.map(
line => {
val data = line.split("\\|")
(data(0), data(1))
}).toDF("timeServer", "value")
val jsonDf = df.select(
$"timeServer",
get_json_object($"value", "$.cp_game_id").alias("cp_game_id"),
get_json_object($"value", "$.event").alias("event"),
get_json_object($"value", "$.data").alias("data")
jsonDf.show(false)
val resultDf = jsonDf.select(
$"timeServer",
$"cp_game_id",
get_json_object($"event", "$.event_time").alias("event_time"),
get_json_object($"event", "$.event_name").alias("event_name"),
$"data"
resultDf.show()
sc.stop()
val end = System.currentTimeMillis()
println(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
到这来,就完成了特殊JSON数组解析
SparkJava的杰森
在执行以下命令之前,请在sparkjob.conf文件中更改spark.driver.extraClassPath属性。
./bin/spark-submit --class org.sparketl.etljobs.SparkEtl --properties-file sparkjob.conf /sparketl/target/sparketl-0.0.1-SNAPSHOT.jar {spark master url} {使用存在的city_list.json在此项目中} {输出文件(用于配对RDD){单个RDD的输出文件}} {国家(地区):美国}
对于Spark初学者,请转到
1. json数据格式–定义
JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式,易于人阅读和编写。
2.json数据格式解编码(2.1,2.2两种方法)
2.1 json函数实现解编码:json.dumps及json.loads
data_day_df.index.get_loc(current_kline.open_time, method='backfill')
open_time为当前的日线级数据。运算后得到周线数据。
1)直接将数据文件上传到对应的表的目录下 ;hdfs dfs -put 本地文件 hdfs中表对应的目录;
2)使用命令导入本地文件: 如果是本地数据 原理就是将本地数据上传到指定的表目录下
load data local inpath "本地文件" into table 表名;
3) 使用命令导入hdfs 中的文件: 如果是HDFS数据 原理是将HDFS的数据移动到指定的表目录下
load data inpath "hdfs文件的路径" into table ...
本文主要讲述的是标准化与归一化的区别,相同点和联系,重点讲述各自的使用场景,归一化主要是应用于没有距离计算的地方上,标准化则是使用在不关乎权重的地方上,因为各自丢失了距离信息和权重信息,最后还讲述了下归一化的使用场景,主要是针对数据分布差异比较大–标准化和奇异数据(单个数据对结果有影响的话)–归一化的情况下.使用
一.不同点
二.相同点及其联系
三.归一化(广义)的场景
3.1 特征/数据需...
一般出现这个问题都发生在数组的获取中,原因是本身一个只有4个元素的数组,你截取了超过他本身就会出现越界的问题。之前刚开始学JAVA的时候有个问题特别的典型,就是把int a=1000 赋值给 byte b=a 就会出现越界的问题。原因就是byte所承受的是-128到127,1000大于了他所能承受的范围,所以越界。
import play.api.libs.json._val input = sc.parallelize(List( """{"name":"过往记忆","website":"www.iteblog.com"}""","""{"other":"过往记忆"}"""))val parsed = input.map(Json.parse)parsed.collectoutput:{"name":"过往...
根据order值进行跳跃顺序排序,生成seq字段,同时生成索引值。
跳跃顺序排序
如order值相同,则排序是一样的,下一个排序会跳过,例如第1条数据的order值和第 2 条的值不同,第2条的order值与后2条的值相同,第4条数据的order值和第 5 条的值不同,则排序的 seq 值为 1,2,2,2,5。
**内有详情注释
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.r