Java Hive UDTF 将WKT格式的Geomotry转换成GeoJSON
原创背景知识
WKT (Well-known text)是一种文本标记语言,用于表示矢量几何对象、空间参照系统及空间参照系统之间的转换。它的二进制表示方式,亦即WKB(well-known-binary)则胜于在传输和在数据库中存储相同的信息。
GeoJSON 是一种对各种地理数据结构进行编 码的格式,可以表示几何、特征或者特征集合。
WTK
点 POINT(6 10)
线 LINESTRING(44 4,11 44,33 25)
面 POLYGON((1 1,2 2,3 3,4 4,5 5,1 1),(11 11,2 13,34 43,34 42,52 52,11 11))
多点 MULTIPOINT(44 4,11 44,33 25)
多线 MULTILINESTRING((3 4,10 50,20 25),(-5 -8,-10 -8,-15 -4))
多面 MULTIPOLYGON(((1 1,5 1,5 5,1 5,1 1),(2 2,2 3,3 3,3 2,2 2)),((6 3,9 2,9 4,6 3)))
几何集合 GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))
对应geojson
点 {"type":"Point","coordinates":[6,10]}
线 {"type":"LineString","coordinates":[[44, 4],[11, 44],[33, 25]]}
面 {"type":"Polygon","coordinates":[[[1, 1],[2, 2],[3, 3],[4, 4],[5, 5],[1, 1]],[[11, 11],[2, 13],[34, 43],[34, 42],[52, 52],[11, 11]]]}
多点 {"type":"MultiPoint","coordinates":[[44, 4],[11, 44],[33, 25]]}
多线 {"type":"MultiLineString","coordinates":[[[3, 4],[10, 50],[20, 25]],[[-5, -8],[-10, -8],[-15, -4]]}
多面 {"type":"MultiPolygon","coordinates":[[[3, 4],[10, 50],[20, 25]],[[-5, -8],[-10, -8],[-15, -4]]]}
几何集合 {"type":"FeatureCollection","features":[{"type":"Feature","geometry":{"type":"Point","coordinates":[4,6]},},{"type":"Feature","geometry":{"type":"LineString","coordinates":[[[4,6],[7,10]]}]}
具体需求
目前MULTIPOLYGON不能直接转成geojson,主要是业务方不能处理多面,需要对多面进行拆分成多个Polygon ,具体到hive数据就是实现udtf对这种多面类型的数据进行解析处理,拆分成多行一行代表一个Polygon。
解决方案
方法一:调研库函数直接封装进行处理:比如 vividsolutions jts包 进行处理
vividsolutions比较强大,其实现原理如下:
Geometry类作为基础数据类型,单点、单线、单面都继承Geometry去实现,同时定义了一个GeometryCollection来实现Geometry 底层用Geometry类型的数组数据结果进行存储,多点、多线、多面都继承自GeometryCollection ,基本上解析后只需要对类型进行判断,然后根据类型进行层层处理 比如:多面类型里面有多个面,对多面进行循环解析就可以了。
如下代码片段
// 如下传如一个 Geometry 基础类,这里会利用java多态进行判断
public static JSONObject parsePolygon2Geojson(Geometry geom) throws ParseException {
JSONObject jsonObject = new JSONObject();
String type = geom.getGeometryType();
jsonObject.put("type", geom.getGeometryType());
if ("MultiPolygon".equalsIgnoreCase(type)) {
JSONArray coordJson = new JSONArray();
//geom 实际上底层数据结构是数组,可以从中获取单个Polygon
for (int i = 0; i < geom.getNumGeometries(); i++) {
// polygon
coordJson.add(parsePolygon(geom.getGeometryN(i)));
jsonObject.put("coordinates", coordJson);
} else if ("Polygon".equalsIgnoreCase(type)) {
jsonObject.put("coordinates", parsePolygon(geom));
} else if ("MultiLineString".equalsIgnoreCase(type)) {
JSONArray coordJson = new JSONArray();
for (int i = 0; i < geom.getNumGeometries(); i++) {
List<List<Double>> line = new ArrayList<>();
for (Coordinate coordinate : geom.getGeometryN(i).getCoordinates()) {
line.add(getPoint(coordinate));
coordJson.add(line);
jsonObject.put("coordinates", coordJson);
} else if ("Point".equalsIgnoreCase(type)) {
jsonObject.put("coordinates", getPoint(geom.getCoordinate()));
} else if ("MultiPoint".equalsIgnoreCase(type) || "LineString".equalsIgnoreCase(type)) {
List<List<Double>> coordJson = new ArrayList<>();
for (Coordinate coordinate : geom.getCoordinates()) {
coordJson.add(getPoint(coordinate));
jsonObject.put("coordinates", coordJson);
} else {
jsonObject.put("coordinates", new JSONArray());
return jsonObject;
}
方法二 :暴力求解 按照点-多点-线-多线-面-多面 层次关系进行数据解析
测试案例
案例一 : wkt 格式数据转为geojson 给google map使用
数据格式:
wkt: MULTIPOLYGON(((1 1,5 1,5 5,1 5,1 1),(2 2,2 3,3 3,3 2,2 2)),((6 3,9 2,9 4,6 3))) : 这里包含两个Polygon
geoJson: {"type":"MultiPolygon","coordinates":[[[[1,1],[5,1],[5,5],[1,5],[1,1]],[[2,2],[2,3],[3,3],[3,2],[2,2]]],[[[6,3],[9,2],[9,4],[6,3]]]]}
拆分成多个 POLYGON
转换后返回数组UDTF循环处理每个对象
[{"coordinates":[[[1.0,1.0],[5.0,1.0],[5.0,5.0],[1.0,5.0],[1.0,1.0]],[[2.0,2.0],[2.0,3.0],[3.0,3.0],[3.0,2.0],[2.0,2.0]]],"type":"Polygon"},{"coordinates":[[[6.0,3.0],[9.0,2.0],[9.0,4.0],[6.0,3.0]]],"type":"Polygon"}]
一共有【2】个POLYGON转换后的结果第:【1】 个POLYGON 结果 : {"coordinates":[[[1.0,1.0],[5.0,1.0],[5.0,5.0],[1.0,5.0],[1.0,1.0]],[[2.0,2.0],[2.0,3.0],[3.0,3.0],[3.0,2.0],[2.0,2.0]]],"type":"Polygon"}
一共有【2】个POLYGON转换后的结果第:【2】 个POLYGON 结果 : {"coordinates":[[[6.0,3.0],[9.0,2.0],[9.0,4.0],[6.0,3.0]]],"type":"Polygon"}
测试案例二:单独POLYGON类型数据
原始Wkt字符串: POLYGON((1 1,2 2,3 3,4 4,5 5,1 1),(11 11,2 13,34 43,34 42,52 52,11 11))
转换后返回数组UDTF循环处理每个对象:[{"coordinates":[[[1.0,1.0],[2.0,2.0],[3.0,3.0],[4.0,4.0],[5.0,5.0],[1.0,1.0]],[[11.0,11.0],[2.0,13.0],[34.0,43.0],[34.0,42.0],[52.0,52.0],[11.0,11.0]]],"type":"Polygon"}]
一共有【1】个POLYGON转换后的结果第:【1】 个POLYGON 结果 : {"coordinates":[[[1.0,1.0],[2.0,2.0],[3.0,3.0],[4.0,4.0],[5.0,5.0],[1.0,1.0]],[[11.0,11.0],[2.0,13.0],[34.0,43.0],[34.0,42.0],[52.0,52.0],[11.0,11.0]]],"type":"Polygon"}
HIVE UDTF封装
hive支持三种类型的UDF函数:
- 普通UDF函数 :
操作单个数据行,且产生一个数据作为输出。例如(数学函数,字符串函数)
- 聚合udf (UDAF)
接受多个数据行,并产生一个数据行作为输出。例如(COUNT,MAX函数等)
- 表生成UDF(UDTF)
接受一个数据行,然后返回产生多个数据行(一个表作为输出)
这里的需求是MULTIPOLYGON拆成POLYGON所以需要实现 UDTF函数
UDTF自定义函数的实现:
UDTF函数的实现必须通过继承抽象类
GenericUDTF
,并且要实现
initialize, process,close
函数
- Hive 调用 initialize 方法来确定传入参数的类型并确定 UDTF 生成表的每个字段的数据类型(即输入类型和输出类型。initialize 方法必须返回一个生成表的字段的相应的 StructObjectInspector。
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
//1.判断参数个数 参数都是用结构体对象检查器来封装的,注意返回值类型也是
//如果参数的个数不是1个则抛出异常
if (argOIs.getAllStructFieldRefs().size() != 1) {
throw new UDFArgumentLengthException("输出参数个数不对---");
//2.判断参数类型
//如果参数的类型不是String则抛出异常
String typeName = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName();
//注意hive中的字符串是 string
if (!"string".equals(typeName)) {
throw new UDFArgumentTypeException(0, "函数参数类型不对");
//3.返回值
//StructObjectInspector 是<key,value>形式的 key为列名,value 为列的类型,所以要用list封装
List<String> fieldName = new ArrayList<>();
List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldName.add("item"); //设置默认列名
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //设置列的类型为string
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldName, fieldOIs); //返回值类型封装
}
- 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。
@Override
public void process(Object[] objects) throws HiveException {
String wktStr = objects[0].toString();
try {
JSONArray jsonArray = WKTUtil.parsePolygonArrayGeojsonByStr(wktStr);
if (jsonArray != null && jsonArray.size() > 0) {
for (Object polygy :jsonArray) {
forward(polygy);