用SparkSQL处理Txt文件数据

- 4 mins

用SparkSQL处理Txt文件数据

ID|BUSSINESS_NO|REPORT_CODE|NAME|DEBIT_KEY|DEBIT_VALUE
361|8a8501c9467fe5ec01468a47adab4a31|1468330281025|2011-12|不良负债余额|0.00
362|8a8501c9467fe5ec01468a47adab4a31|1468330281025|2012-03|全部负债余额|1,210.00
363|8a8501c9467fe5ec01468a47adab4a31|1468330281025|2012-03|不良负债余额|0.00
364|8a8501c9467fe5ec01468a47adab4a31|1468330281025|2012-06|全部负债余额|1,210.00

代码如下:

package luoli.spark.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import java.io.File

object TxtParseToTable {

	// Data format like below
	// ID|BUSSINESS_NO|REPORT_CODE|NAME|DEBIT_KEY|DEBIT_VALUE
	// 356|8a8501c9467fe5ec01468a47adab4a31|1468330281025|2011-06|全部负债余额|0.00
	case class CreditInfo(id:Int, 
						  businessNo:String, 
					      reportCode:Long, 
						  name:String, 
						  uncreditKey:String,
						  uncreditValue:Double)
	
	def main(args: Array[String]) {

		val dataFile = "file:///Users/luoli/Downloads/credit/PBC_PUBLIC_DEBIT_INFO.txt"

		val spark = SparkSession.builder().appName("TxtParseToTable").getOrCreate()
		import spark.implicits._
		import java.lang.Double

		// function to process in mapPartitionsWithIndex
		def skipLines(index: Int, lines: Iterator[String], num: Int): Iterator[String] = {
			if (index == 0) {
				lines.drop(num)
			}
			lines
		}

		val creditInfoData = 
				spark.sparkContext.textFile(dataFile)
					 .mapPartitionsWithIndex((idx, iter) => skipLines(idx, iter, 1)) //  利用mapPartitionsWithIndex去除数据中第一行的schema说明行
					 .filter(!_.isEmpty()).map(_.split("\\|"))
					 .map( r => CreditInfo(r(0).trim.toInt,    // ID 
										   r(1),               // BUSSINESS_NO
								  		   r(2).trim.toLong,   // REPORT_CODE
								  		   r(3),               // NAME
								  		   r(4),               // DEBIT_KEY 
								  		   Double.parseDouble(r(5).trim.replaceAll(",",""))  // DEBIT_VALUE
								  		   )).toDF()

		creditInfoData.show()

		val tmpDirStr = "/tmp/creditInfo.parquet"
		creditInfoData.write.mode(SaveMode.Overwrite).parquet(tmpDirStr)

		val creditInfoDFLoadFromParquet =  spark.read.parquet(tmpDirStr)

		creditInfoDFLoadFromParquet.createOrReplaceTempView("credit_info")
		val selectResult = spark.sql("SELECT * FROM credit_info")
		selectResult.map( attr => "ID: " + attr(0)).show()
	}
}


package luoli.scala.sql

import org.apache.spark.sql.Row
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

object TxtWithSchemaParse2Table {

	def main(args: Array[String]) {
		if (args.length < 1) {
      println("Usage: TxtWithSchemaParse2Table <txt_file_path>")
      System.exit(1)
    }

    def dropLines (index: Int, lines: Iterator[String], linesToDrop: Int) : Iterator[String] = {
      if (index == 0) {
        lines.drop(linesToDrop)
      }
      lines
    }

    val txtFile = args(0)

    val spark = SparkSession.builder().appName("TxtWithSchemaParse2Table").getOrCreate()
    import spark.implicits._

    val initialRDD = spark.sparkContext.textFile(txtFile)
    val schemaString = initialRDD.take(1)(0)
    println("The Schema Line of the Table : " + schemaString)

    val fields = schemaString.split("\\|")
    val structFileds = fields.map( fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(structFileds)

    val rowRDD = initialRDD.mapPartitionsWithIndex( (index, iter) => dropLines(index, iter, 1))
                           .map(_.split('|'))
                           .map( r => Row.fromSeq(r.toSeq) )

    val dataDF = spark.createDataFrame(rowRDD, schema)
    dataDF.createOrReplaceTempView("table")

    val result = spark.sql("SELECT * FROM table")
    result.show()
	}
}
comments powered by Disqus
rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora