, "SELECT id,aa FROM bbb where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)
参数解释:
1,sparkcontext。
2,一个创建链接的函数。
3,sql。必须有? <= ID AND ID <= ?。
4,要取数据的id最小行。
5,要取数据的id最大行号。
6,分区数。
7,一个将ResultSet转化为需要类型的方法。
2,JdbcRDD的getPartition方法
override def
getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end
val
length =
BigInt
(1) + upperBound - lowerBound
(0 until numPartitions).map(i => {
val
start = lowerBound + ((i * length) / numPartitions)
val
end = lowerBound + (((i + 1) * length) / numPartitions) - 1
new
JdbcPartition(i, start.toLong, end.toLong)
}).toArray
}
3,JdbcRDD的compute方法
就是一个通过jdbc获取指定范围数据的过程。
val
part
= thePart.asInstanceOf[JdbcPartition]
val
conn
= getConnection()
val
stmt
=
conn
.prepareStatement(sql, ResultSet.
TYPE_FORWARD_ONLY
, ResultSet.
CONCUR_READ_ONLY
)
stmt
.setLong(1,
part
.lower)
stmt
.setLong(2,
part
.upper)
val
rs
=
stmt
.executeQuery()