1. Preface
Spark SQL is a spark module used by spark for structured data processing.
1.1 the role of spark SQL
- One use of Spark SQL is to execute SQL queries
- Spark SQL can also be used to read data from existing Hive installations
1.2 relationship between hive and spark SQL
The predecessor of SparkSQL is Shark, which provides a quick start tool for technicians who are familiar with RDBMS but do not understand MapReduce.
Hive was the only SQL on Hadoop tool running on Hadoop in the early days. However, a large number of intermediate disks in MapReduce computing process consume a lot of I/O and reduce the operation efficiency. In order to improve the efficiency of SQL on Hadoop, a large number of SQL on Hadoop tools have been produced, among which Drill, Impala and Shark are the most prominent ones.
For developers, SparkSQL can simplify the development of RDD, improve the development efficiency, and the execution efficiency is very fast. Therefore, in practical work, SparkSQL is basically used. In order to simplify the development of RDD and improve the development efficiency, Spark SQL provides two programming abstractions, similar to the DataFrame and DataSet of RDD in Spark Core.
1.3 Spark SQL features
- Easy integration: seamless integration of SQL query and Spark programming
- Unified data access: connect different data sources in the same way
- Hive compatible: run SQL or HiveQL directly on the existing warehouse
- Standard data connection: connect through JDBC or ODBC
2. tamping foundation
To use Spark SQL, we first need to understand two core classes: DataFrame and DataSet.
2.1 DataFrame
In Spark, DataFrame is a distributed data set based on RDD, which is similar to two-dimensional tables in traditional databases. The main difference between DataFrame and RDD is that the former has schema meta information, that is, each column of the two-dimensional table dataset represented by DataFrame has a name and type. This enables Spark SQL to gain insight into more structural information, so as to optimize the data sources hidden behind the DataFrame and the transformations acting on the DataFrame, and finally achieve the goal of greatly improving the runtime efficiency. In contrast to RDD, because there is no way to know the specific internal structure of the stored data elements, Spark Core can only perform simple and general pipeline optimization at the stage level.
Meanwhile, like Hive, DataFrame also supports nested data types (struct, array, and map). From the perspective of API usability, DataFrame API provides a set of high-level relational operations, which is more friendly and lower threshold than functional RDD API.
The above figure visually shows the difference between DataFrame and RDD.
- Although the RDD[Person] on the left takes Person as the type parameter, the Spark framework itself does not understand the internal structure of the Person class.
- The DataFrame on the right provides detailed structure information, so that Spark SQL can clearly know which columns are included in the dataset, and what is the name and type of each column.
DataFrame is a view that provides Schema for data. It can be treated as a table in the database. DataFrame is also lazy to execute, but its performance is higher than RDD. The main reason is that the optimized execution plan, that is, the query plan is optimized through spark catalyst optimizer.
2.2 DataSet
DataSet is a distributed data set. DataSet is a new abstraction added in Spark 1.6 and an extension of DataFrame. It provides the advantages of RDD (strong typing, the ability to use powerful lambda functions) and Spark SQL optimized execution engine. DataSet can also use functional transformation (operation map, flatMap, filter, etc.).
- DataSet is an extension of DataFrame API and the latest data abstraction of SparkSQL
- User friendly API style, with both type safety check and DataFrame query optimization features;
- The sample class is used to define the data structure information in the DataSet. The name of each attribute in the sample class is directly mapped to the field name in the DataSet;
- DataSet is strongly typed. For example, there can be DataSet[Car], DataSet[Person].
- DataFrame is a special column of DataSet, DataFrame=DataSet[Row], so you can convert a DataFrame to a DataSet through the as method. Row is a type. Like Car and Person, all table structure information is represented by row. You need to specify the order when getting data.
3. practical application
3.1 case code
In fact, maven is still included in the spark example project in the previous chapter. Spark SQL is added_ 2.12 this package
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> </dependencies>
Main method, basic operations of SparkSQL, basic operations such as reading json file data, outputting data, grouping, etc
package com.sparkwordcount; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import static org.apache.spark.sql.functions.col; public class SparkSQLMain { public static void main(String[] args) { // Initialize the environment variables of SparkSQl to provide an initialization environment for subsequent execution SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); // To read json data, you can not only read json files, but also read data sources from csv and txt Dataset<Row> df = spark.read().json("/test/input/people.json"); // Read from csv. I have written another csv file. You can unlock the annotation test part // Dataset<Row> df = spark.read().format("csv").option("header", "true").load("/test/input/people.csv"); // Output all data df.show(); // Tree structure of output table structure df.printSchema(); // Show only the column data of name df.select("name").show(); // Show all the data and show +1 for each age df.select(col("name"), col("age").plus(1)).show(); // Show all data older than 21 df.filter(col("age").gt(21)).show(); // Count the number by age group df.groupBy("age").count().show(); // Store this part of data in a temporary table. When this SparkSession ends, the table will disappear df.createOrReplaceTempView("people"); // Query by SQL Dataset<Row> sqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); } }
Read the data through csv, store it in the temporary table, and read it through SQLs
package com.sparkwordcount; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class SparkSQLGlobalTempMain { public static void main(String[] args) throws Exception { SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example global create") .config("spark.some.config.option", "some-value") .getOrCreate(); // csv read data Dataset<Row> df = spark.read().format("csv").option("header", "true").load("/test/input/people.csv"); // Output all data df.show(); // Store this part of data in the Global temporary table. When the SparkSession ends, the table exists in the Global df.createOrReplaceGlobalTempView("people"); // The Global temporary table has just been created, and it needs to pass the Global_ SQL query of temp Dataset<Row> sqlDF = spark.sql("SELECT * FROM global_temp.people"); sqlDF.show(); } }
3.2 project packaging
Set Artifacts packaging
Perform build artifacts packaging
The jar package sparkwordcount jar
3.3 upload to docker and run
Follow the command below and use your own directory.
#Upload the jar package just packaged to docker on the host machine (the computer running docker) docker cp /Users/SparkWordCount/out/SparkWordCount.jar master:/usr/local #Enter docker #Enter docker docker exec -it master bash #Enter Spark directory cd /usr/local/spark-3.0.3-bin-hadoop2.7 #When editing the json format of the data file, pay attention to one line of data, and an error will be reported if the line breaks vi people.json {"age":1,"name":"Jim"} {"age":30,"name":"Andy"} {"age":19,"name":"Justin"} #Edit the data file. Pay attention to one line of data, and an error will be reported if the line breaks vi people.csv age,name 1,Jim 30,Andy 19,Justin #Upload json and csv to hdfs hadoop fs -put ./people.json /test/input hadoop fs -put ./people.csv /test/input #Enter spark directory to execute cd /usr/local/spark-3.0.3-bin-hadoop2.7 #Execute SparkSQLMain ./bin/spark-submit \ --class com.sparkwordcount.SparkSQLMain \ --master local \ ../SparkWordCount.jar \ 100 #Output results and ignore a lot of messy logs #df.show(); result +---+------+ |age| name| +---+------+ | 1| Jim| | 30| Andy| | 19|Justin| +---+------+ #df.printSchema(); result root |-- age: long (nullable = true) |-- name: string (nullable = true) #df.select("name").show(); result +------+ | name| +------+ | Jim| | Andy| |Justin| +------+ #df.select(col("name"), col("age").plus(1)).show(); result +------+---------+ | name|(age + 1)| +------+---------+ | Jim| 2| | Andy| 31| |Justin| 20| +------+---------+ #df.filter(col("age").gt(21)).show(); result +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+ #df.groupBy("age").count().show(); result +---+-----+ |age|count| +---+-----+ | 19| 1| | 1| 1| | 30| 1| +---+-----+ #sqlDF.show(); Output, +---+------+ |age| name| +---+------+ | 1| Jim| | 30| Andy| | 19|Justin| +---+------+ #Execute SparkSQLGlobalTempCreateMain ./bin/spark-submit \ --class com.sparkwordcount.SparkSQLGlobalTempCreateMain \ --master local \ ../SparkWordCount.jar \ 100 #df.show(); output +---+------+ |age| name| +---+------+ | 1| Jim| | 30| Andy| | 19|Justin| +---+------+ #sqlDF.show(); output +---+------+ |age| name| +---+------+ | 1| Jim| | 30| Andy| | 19|Justin| +---+------+
4. summary
Therefore, in fact, SparkSQL can be used as a way of using SQL to facilitate the statistical calculation and sorting of data, making it more convenient for people to process data, and to directly read data from various data sources such as json, csv, txt and so on for processing. It is extremely convenient. For more details, you can view the official website documents: https://spark.apache.org/docs/latest/quick-start.html