博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark学习笔记——读写HDFS
阅读量:6406 次
发布时间:2019-06-23

本文共 3320 字,大约阅读时间需要 11 分钟。

使用Spark读写HDFS中的parquet文件

文件夹中的parquet文件

build.sbt文件

name := "spark-hbase"version := "1.0"scalaVersion := "2.11.8"libraryDependencies ++= Seq(  "org.apache.spark" %% "spark-core" % "2.1.0",  "mysql" % "mysql-connector-java" % "5.1.31",  "org.apache.spark" %% "spark-sql" % "2.1.0",  "org.apache.hbase" % "hbase-common" % "1.3.0",  "org.apache.hbase" % "hbase-client" % "1.3.0",  "org.apache.hbase" % "hbase-server" % "1.3.0",  "org.apache.hbase" % "hbase" % "1.2.1")

 

Scala实现方法

import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql._import java.util.Propertiesimport com.google.common.collect.Listsimport org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}import org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.{Result, Scan}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableInputFormat/**  * Created by mi on 17-4-11.  */case class resultset(name: String,                     info: String,                     summary: String)case class IntroItem(name: String, value: String)case class BaikeLocation(name: String,                         url: String = "",                         info: Seq[IntroItem] = Seq(),                         summary: Option[String] = None)case class MewBaikeLocation(name: String,                         url: String = "",                         info: Option[String] = None,                         summary: Option[String] = None)object MysqlOpt {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("WordCount").setMaster("local")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    import sqlContext.implicits._    //定义数据库和表信息    val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8"    val table = "baike_pages"    //读取parquetFile,并写入Mysql    val sparkSession = SparkSession.builder()      .master("local")      .appName("spark session example")      .getOrCreate()    val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow")//    parquetDF.collect().take(20).foreach(println)    //parquetDF.show()    //BaikeLocation是读取的parquet文件中的case class    val ds = parquetDF.as[BaikeLocation].map { line =>      //把info转换为新的case class中的类型String      val info = line.info.map(item => item.name + ":" + item.value).mkString(",")      //注意需要把字段放在一个case class中,不然会丢失列信息      MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary)    }.cache()    ds.show()//    ds.take(2).foreach(println)    //写入Mysql    //    val prop = new Properties()    //    prop.setProperty("user", "root")    //    prop.setProperty("password", "123456")    //    ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop)    //写入parquetFile    ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1")  }}

 

df.show打印出来的信息,如果没放在一个case class中的话,name,url,info,summary这列信息会变成1,2,3,4

使用spark-shell查看写回去的parquet文件的信息

#进入spark-shellimport org.apache.spark.sql.SQLContextval sqlContext = new SQLContext(sc)val path = "file:///home/mi/coding/coding/baikeshow_data/baikeshow1"val df = sqlContext.parquetFile(path)df.showdf.count

 

如果只想显示某一列的话,可以这么做

df.select("title").take(100).foreach(println)  //只显示title这一列的信息

 

转载地址:http://oxtea.baihongyu.com/

你可能感兴趣的文章
node.js发送邮件email
查看>>
查看nginx配置文件路径的方法
查看>>
接口性能调优方案探索
查看>>
kali安装包或更新时提示“E: Sub-process /usr/bin/dpkg return”
查看>>
网站管理后台模板 Charisma
查看>>
EL:empty的用法
查看>>
Saltstack配置之 nodegroups
查看>>
Servlet和JSP优化经验总结
查看>>
squid使用rotate轮询(分割)日志
查看>>
VS2015安装EF Power Tools
查看>>
MySQL主从复制(笔记)
查看>>
keepalived高可用集群的简单配置
查看>>
Android Java Framework显示Toast(无Activity和Service)
查看>>
通过 SignalR 类库,实现 ASP.NET MVC 的实时通信
查看>>
NavigationController修改状态条颜色
查看>>
16大跨平台游戏引擎
查看>>
NPS如何配置基于mac地址的8021x认证
查看>>
XenServer架构之XAPI的调用流程
查看>>
redhat下搭建LAMP架构
查看>>
GitHub详细教程
查看>>