SparkSQL is a sub module of Spark, which is mainly used to operate structured data, using Hive for reference.
Previously, the RDD structure of SparkCore module was used for data processing, and SparkSQL provided structured data structures DataFrame and DataSet.
SparkSQL supports the development of SQL and DSL (domain specific language) in multiple languages (Scala, Java, Python and R). Finally, the bottom layer is converted to RDD
SparkSQL supports multiple data sources (Hive, Avro, Parquet, ORC, JSON, JDBC, etc.), HiveQL syntax, Hive SerDes and UDF, allowing you to access existing Hive repositories
Zero, SparkSession, IDEA integration Spark
SparkSession: This is a new entry, replacing the original SQLContext and HiveContext.
SparkContext can also be obtained through SparkSession.
1,SparkSession
After spark is started in an interactive environment, it comes with a variable spark
Spark context available as 'sc' (master = local[*], app id = local-1608185209816). Spark session available as 'spark'. #read file scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt") text: org.apache.spark.sql.Dataset[String] = [value: string] scala> text.show() +--------------------+ | value| +--------------------+ |java hadoop spark...| |spark scala spark...| | scala spark hive| +--------------------+
In the file program, the builder method is used to obtain, and the related constructor that is not exposed by itself can be used.
val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()
Through the SparkSession portal, you can load data, save, execute SQL, and obtain other portals (sqlContext, SparkContext)
2. Using IDEA to develop Spark
Spark uses local development to integrate it into the IDEA. First, it needs to have the sdk of scala, and the version should adapt to the spark of the operation.
Secondly, IDEA needs to have installed the scala development plug-in.
Next, import the following dependent addresses in the maven project configuration file.
<!-- SparkCore,Basic module --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0-preview2</version> </dependency> <!-- SparkSQL,SQL modular --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0-preview2</version> </dependency> <!-- Spark and hive Integrated modules for processing hive Data in --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.0.0-preview2</version> <scope>provided</scope> </dependency> <!--mysql Drive, and Mysql Interaction needs--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.22</version> </dependency>
If too many logs are inconvenient, you can increase the log level. The first line is modified to ERROR, and log4j is used in the resources directory The properties file overrides the default log configuration file.
#hadoop.root.logger=warn,console # Set everything to be logged to the console log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Set the default spark-shell log level to WARN. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=WARN # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=WARN log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR # Parquet related logging log4j.logger.org.apache.parquet.CorruptStatistics=ERROR log4j.logger.parquet.CorruptStatistics=ERROR
1, RDD, DataFrame, DataSet
As the data structure of SparkCore, RDD can process structured and unstructured data, but its efficiency is low.
SparkSQL encapsulates DataFrame and DateSet
RDD has data types for rows in behavior units.
DataFrame comes from the data structure of pandas library. Compared with RDD, which provides field types, there is a clear Scheme structure in DataFrame, that is, column names and column field types are known. The benefits of this are to reduce data reading and better optimize execution plans, so as to ensure query efficiency.
Dataset is also a distributed data set, which was introduced in spark version 1.6. It integrates RDD and DataFrame
It also supports Lambda functions, but can only be used in Scala and Java languages.
The three can be converted to each other.
Dataset[String] returned for text file
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt") text: org.apache.spark.sql.Dataset[String] = [value: string] scala> text.show() +--------------------+ | value| +--------------------+ |java hadoop spark...| |spark scala spark...| | scala spark hive| +--------------------+ #structure scala> text.printSchema root |-- value: string (nullable = true)
#Return a new session scala> val nsp = spark.newSession nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b #Reading json returns DF by default scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json") user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field] #Define a sample class as the data type scala> case class User(id :Long,username:String,email:String){} defined class User #Add data type and return ds scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User] userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]
internal structure
scala> userds.printSchema root |-- email: string (nullable = true) |-- id: long (nullable = true) |-- username: string (nullable = true)
Mutual conversion:
scala> val udf = user.rdd udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25 scala> userds.toDF res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]
2, SQL, DSL
SparkSQL can be programmed using SQL or by calling APIs based on a specific language.
1,SQL
There are two steps to programming with SQL:
1. Create a temporary view. (only tables or views can execute sql. spark can create temporary views.)
2. Write sql statements.
#Create view users scala> userds.createOrReplaceTempView("users") #Write statements scala> nsp.sql("select id,username from users order by id desc").show +---+--------+ | id|username| +---+--------+ | 3| Jim| | 2| jack| | 1| Bret| +---+--------+ /* createGlobalTempView createOrReplaceTempView crossJoin createOrReplaceGlobalTempView createTempView */
If you are writing in a file program, you need to import the following implicit transformations.
import spark.implicits._ #spark is the variable name import org.apache.spark.sql.functions._
2,DSL
DSL is a specific language, such as scala, java, etc.
Call the api for processing.
#scala DSL programming scala> userds.select('id,'username).orderBy(desc("id")).show() +---+--------+ | id|username| +---+--------+ | 3| Jim| | 2| jack| | 1| Bret| +---+--------+
Programming in Java.
final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate(); Dataset<Row> dataset = spark.read().json("spark/data/user3.json"); try{ dataset.createTempView("users"); //SQL spark.sql("select id,username from users order by id desc").show(); }catch (Exception e){ e.printStackTrace(); } //DSL dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show(); spark.close();
3, UDF
UDF: like Hive, spark allows users to customize functions to supplement functions.
Both SQL mode and DSL mode are supported.
1. SQL mode
#Register function scala> nsp.udf.register("plus0",(id:String)=>0+id) res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true) scala> nsp.sql("select plus0(id),username from users order by id desc").show +---------+--------+ |plus0(id)|username| +---------+--------+ | 03| Jim| | 02| jack| | 01| Bret| +---------+--------+ #Register function scala> nsp.udf.register("plus1",(id:Int)=>1+id) res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true) scala> nsp.sql("select plus1(id),username from users order by id desc").show +---------+--------+ |plus1(id)|username| +---------+--------+ | 4| Jim| | 3| jack| | 2| Bret| +---------+--------+
2. DSL mode
#Define function scala> val prefix_name =udf( | (name:String)=>{ "user: "+name}) prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true) #Nested field name scala> userds.select($"id",prefix_name('username).as("newname")).orderBy(desc("id")).show() +---+----------+ | id| newname| +---+----------+ | 3| user: Jim| | 2|user: jack| | 1|user: Bret| +---+----------+
Java writing
#Receive parameter name and a function interface spark.udf().register("prefix_name", new UDF1<String, String>() { @Override public String call(String s) throws Exception { return "user: "+s; } }, DataTypes.StringType); try{ dataset.createTempView("users"); #Same as SQL spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show(); }catch (Exception e){ e.printStackTrace(); } +---+----------+ | id| new_name| +---+----------+ | 3| user: Jim| | 2|user: jack| | 1|user: Bret| +---+----------+
4, Spark on Hive
SparkSQL integrates Hive, loads and reads Hive table data for analysis, which is called Spark on Hive;
The underlying partition engine of Hive framework can change MapReduce to Spark, which is called Hive on Spark;
Spark on Hive is equivalent to spark using Hive data.
Hive on Spark is equivalent to hive changing its calculation from MR to SparkRDD
Spark on Hive
1,Sparksql
Spark sql is an interactive mode provided by spark. It uses sql statements for processing. By default, it uses the data in hive.
You need to put the mysql driver under the jar directory or specify the driver.
spark-sql --master local[*] #Specify drive spark-sql --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
2. Using spark on hive in IDEA
Add the dependency of spark operation hive.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.0.0-preview2</version> </dependency>
core-site. xml,hdfs-site. xml,hive-site. All XML files are copied to the Rsource directory
You can use Spark to operate Hive data.
val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]") val spark = SparkSession.builder().config(conf) .enableHiveSupport()//!!!! External hives are not supported by default. Here, the method needs to be called to support external hives .getOrCreate() import spark.implicits._ spark.sql("use spark_sql_hive") spark.sql( """ |select t.id,t.name,t.email from |(select cast('id' as INT) ,id ,name,email from user where id >1001) t """.stripMargin).show()