使用Spark读写HDFS中的parquet文件
文件夹中的parquet文件
build.sbt文件
name := "spark-hbase"version := "1.0"scalaVersion := "2.11.8"libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", "mysql" % "mysql-connector-java" % "5.1.31", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.hbase" % "hbase-common" % "1.3.0", "org.apache.hbase" % "hbase-client" % "1.3.0", "org.apache.hbase" % "hbase-server" % "1.3.0", "org.apache.hbase" % "hbase" % "1.2.1")
Scala实现方法
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql._import java.util.Propertiesimport com.google.common.collect.Listsimport org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}import org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.{Result, Scan}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormat/** * Created by mi on 17-4-11. */case class resultset(name: String, info: String, summary: String)case class IntroItem(name: String, value: String)case class BaikeLocation(name: String, url: String = "", info: Seq[IntroItem] = Seq(), summary: Option[String] = None)case class MewBaikeLocation(name: String, url: String = "", info: Option[String] = None, summary: Option[String] = None)object MysqlOpt { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //定义数据库和表信息 val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8" val table = "baike_pages" //读取parquetFile,并写入Mysql val sparkSession = SparkSession.builder() .master("local") .appName("spark session example") .getOrCreate() val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")// parquetDF.collect().take(20).foreach(println) //parquetDF.show() //BaikeLocation是读取的parquet文件中的case class val ds = parquetDF.as[BaikeLocation].map { line => //把info转换为新的case class中的类型String val info = line.info.map(item => item.name + ":" + item.value).mkString(",") //注意需要把字段放在一个case class中,不然会丢失列信息 MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary) }.cache() ds.show()// ds.take(2).foreach(println) //写入Mysql // val prop = new Properties() // prop.setProperty("user", "root") // prop.setProperty("password", "123456") // ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop) //写入parquetFile ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1") }}
df.show打印出来的信息,如果没放在一个case class中的话,name,url,info,summary这列信息会变成1,2,3,4
使用spark-shell查看写回去的parquet文件的信息
#进入spark-shellimport org.apache.spark.sql.SQLContextval sqlContext = new SQLContext(sc)val path = "file:///home/mi/coding/coding/baikeshow_data/baikeshow1"val df = sqlContext.parquetFile(path)df.showdf.count
如果只想显示某一列的话,可以这么做
df.select("title").take(100).foreach(println) //只显示title这一列的信息