本文共 8589 字,大约阅读时间需要 28 分钟。
{ "status":"200", "data":[ "id":1, "userid":2, "service":{ "3":{"a":1,"b":2}, "2":{"a":3,"b":2}, ..... }]}
{id:1,userid:2,service:3,a:1,b:2} {id:1,userid:2,service:2,a:3,b:2}
val df = SQLContext.getOrCreate(sc).read.format("driver class").//驱动程序,类似JDBC的 driver class options(Map(....)). //你需要额外传递给驱动的参数load("url")//资源路径
{ "name": "streaming.core.compositor.spark.source.SQLSourceCompositor", "params": [ { "format": "org.apache.spark.sql.execution.datasources.rest.json", "url": "http://[your dns]/path", "xPath": "$.data" } ] }
DefaultSource的实现
org.apache.spark.sql.execution.datasources.rest.json.DefaultSourceextends RelationProvider with DataSourceRegister
org.apache.spark.sql.execution.datasources.rest.json ==>restJSON
override def shortName(): String = "restJSON"
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
SchemaRelationProvider 允许你直接传递Schema信息给BaseRelation实现。HadoopFsRelationProvider 除了参数帮你加了path等,返回值也帮你约定成HadoopFsRelation. HadoopFsRelation 提供了和HDFS交互的大部分实现在我们的实现里,只要实现基础的RelationProvider就好。
override def createRelation( sqlContext: SQLContext, //还记的DataSource的options方法么,parameters就是 //用户通过options传递过来的 parameters: Map[String, String] ): BaseRelation = {//因为我们并需要用户提供schema//而是从JSON格式数据自己自己推导出来的// 所以这里有个采样率的概念 val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)// 还记得DataSource的 path么? 理论上是应该通过那个传递过来的,然而//这里是直接通过potions传递过来的。 val url = parameters.getOrElse("url", "")// 我们需要能够对通过XPATH语法抽取我们要的数据,比如//前面的例子,我们需要能够抽取出data那个数组 val xPath = parameters.getOrElse("xPath", "$") //这里是核心 new RestJSONRelation(None, url, xPath, samplingRatio, None)(sqlContext) }
RestJSONRelation先看看RestJSONRelation 的签名:private[sql] class RestJSONRelation( val inputRDD: Option[RDD[String]], val url: String, val xPath: String, val samplingRatio: Double, val maybeDataSchema: Option[StructType] )(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan {
override def schema: StructType = dataSchemalazy val dataSchema = .....
private def createBaseRdd(inputPaths: Array[String]): RDD[String]
//应该要再加个重试机制就更好了private def createBaseRdd(inputPaths: Array[String]): RDD[String] = { val url = inputPaths.head val res = Request.Get(new URL(url).toURI).execute() val response = res.returnResponse() val content = EntityUtils.toString(response.getEntity) if (response != null && response.getStatusLine.getStatusCode == 200) { //这里是做数据抽取的,把data的数组给抽取出来 import scala.collection.JavaConversions._ val extractContent = JSONArray.fromObject(JSONPath.read(content, xPath)). map(f => JSONObject.fromObject(f).toString).toSeq sqlContext.sparkContext.makeRDD(extractContent) } else { sqlContext.sparkContext.makeRDD(Seq()) } }
lazy val dataSchema = { //我们也允许用户传递给我们Schema,如果没有就自己推导 val jsonSchema = maybeDataSchema.getOrElse { InferSchema( //拿到数据 inputRDD.getOrElse(createBaseRdd(Array(url))), //采样率,其实就是拿sc.sample方法 samplingRatio, sqlContext.conf.columnNameOfCorruptRecord) } checkConstraints(jsonSchema) jsonSchema }
def buildScan(): RDD[Row] = { JacksonParser( inputRDD.getOrElse(createBaseRdd(Array(url))), dataSchema, sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]] }
//这个是createBaseRDD返回的RDD[String]//对应的String 其实是JSON格式//针对每个分区做处理json.mapPartitions { iter => val factory = new JsonFactory() iter.flatMap { record => try { //JSON的解析器 val parser = factory.createParser(record) parser.nextToken() //这里开始做类型转换了 convertField(factory, parser, schema) match { case null => failedRecord(record) case row: InternalRow => row :: Nil case array: ArrayData => if (array.numElements() == 0) { Nil } else { array.toArray[InternalRow](schema) } case _ => sys.error( s"Failed to parse record $record. Please make sure that each line of the file " + "(or each string in the RDD) is a valid JSON object or an array of JSON objects.") } } catch { case _: JsonProcessingException => failedRecord(record) } } }
private[sql] def convertField( factory: JsonFactory, parser: JsonParser, schema: DataType): Any = { import com.fasterxml.jackson.core.JsonToken._ (parser.getCurrentToken, schema) match { case (null | VALUE_NULL, _) => null case (FIELD_NAME, _) => parser.nextToken() convertField(factory, parser, schema) ..... case (START_OBJECT, st: StructType) => convertObject(factory, parser, st)
while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => row.update(index, convertField(factory, parser, schema(index).dataType)) case None => parser.skipChildren() } }
val df = SQLContext.getOrCreate(sc).read.format("org.apache.spark.sql.execution.datasources.rest.json").//驱动程序,类似JDBC的 driver class options(Map("url"->"http://[your dns]/path""xPath" -> "$.data")). //你需要额外传递给驱动的参数load("url")//资源路径获取到的Dataframe 你可以做任意的操作。
转载地址:http://lcscm.baihongyu.com/