Big data practice SparkSQL module foundation

SparkSQL is a sub module of Spark, which is mainly used to operate structured data, using Hive for reference.

Previously, the RDD structure of SparkCore module was used for data processing, and SparkSQL provided structured data structures DataFrame and DataSet.

SparkSQL supports the development of SQL and DSL (domain specific language) in multiple languages (Scala, Java, Python and R). Finally, the bottom layer is converted to RDD

SparkSQL supports multiple data sources (Hive, Avro, Parquet, ORC, JSON, JDBC, etc.), HiveQL syntax, Hive SerDes and UDF, allowing you to access existing Hive repositories

Zero, SparkSession, IDEA integration Spark

SparkSession: This is a new entry, replacing the original SQLContext and HiveContext.

SparkContext can also be obtained through SparkSession.

1,SparkSession

After spark is started in an interactive environment, it comes with a variable spark

Spark context available as 'sc' (master = local[*], app id = local-1608185209816).
Spark session available as 'spark'.

#read file
scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]  

scala> text.show()
+--------------------+                                                          
|               value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
|    scala spark hive|
+--------------------+

In the file program, the builder method is used to obtain, and the related constructor that is not exposed by itself can be used.

val spark: SparkSession = SparkSession.builder().master("local[3]").getOrCreate()

Through the SparkSession portal, you can load data, save, execute SQL, and obtain other portals (sqlContext, SparkContext)

2. Using IDEA to develop Spark

Spark uses local development to integrate it into the IDEA. First, it needs to have the sdk of scala, and the version should adapt to the spark of the operation.

Secondly, IDEA needs to have installed the scala development plug-in.

Next, import the following dependent addresses in the maven project configuration file.

 	<!-- SparkCore,Basic module -->
	<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>

        <!-- SparkSQL,SQL modular -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0-preview2</version>
        </dependency>


        <!-- Spark and hive Integrated modules for processing hive Data in -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0-preview2</version>
            <scope>provided</scope>
        </dependency>
<!--mysql Drive, and Mysql Interaction needs-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>

If too many logs are inconvenient, you can increase the log level. The first line is modified to ERROR, and log4j is used in the resources directory The properties file overrides the default log configuration file.

#hadoop.root.logger=warn,console
# Set everything to be logged to the console
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR

1, RDD, DataFrame, DataSet

As the data structure of SparkCore, RDD can process structured and unstructured data, but its efficiency is low.

SparkSQL encapsulates DataFrame and DateSet

RDD has data types for rows in behavior units.

DataFrame comes from the data structure of pandas library. Compared with RDD, which provides field types, there is a clear Scheme structure in DataFrame, that is, column names and column field types are known. The benefits of this are to reduce data reading and better optimize execution plans, so as to ensure query efficiency.

Dataset is also a distributed data set, which was introduced in spark version 1.6. It integrates RDD and DataFrame
It also supports Lambda functions, but can only be used in Scala and Java languages.

The three can be converted to each other.

Dataset[String] returned for text file

scala> val text = spark.read.textFile("/usr/local/bigdata/file/wordCount.txt")
text: org.apache.spark.sql.Dataset[String] = [value: string]

scala> text.show()
+--------------------+                                                          
|               value|
+--------------------+
|java hadoop spark...|
|spark scala spark...|
|    scala spark hive|
+--------------------+


#structure
scala> text.printSchema
root
 |-- value: string (nullable = true)
#Return a new session
scala> val nsp = spark.newSession
nsp: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@c01b

#Reading json returns DF by default
scala> val user = nsp.read.json("/usr/local/bigdata/file/user.json")
user: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]

#Define a sample class as the data type
scala> case class User(id :Long,username:String,email:String){}
defined class User

#Add data type and return ds
scala> val userds = nsp.read.json("/usr/local/bigdata/file/user.json").as[User]
userds: org.apache.spark.sql.Dataset[User] = [email: string, id: bigint ... 1 more field]

internal structure

scala> userds.printSchema
root
 |-- email: string (nullable = true)
 |-- id: long (nullable = true)
 |-- username: string (nullable = true)

Mutual conversion:

scala> val udf = user.rdd
udf: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at rdd at <console>:25

scala> userds.toDF
res3: org.apache.spark.sql.DataFrame = [email: string, id: bigint ... 1 more field]

2, SQL, DSL

SparkSQL can be programmed using SQL or by calling APIs based on a specific language.

1,SQL

There are two steps to programming with SQL:

1. Create a temporary view. (only tables or views can execute sql. spark can create temporary views.)

2. Write sql statements.

#Create view users
scala> userds.createOrReplaceTempView("users")

#Write statements
scala> nsp.sql("select id,username from users order by id desc").show
+---+--------+
| id|username|
+---+--------+
|  3|     Jim|
|  2|    jack|
|  1|    Bret|
+---+--------+

/*
createGlobalTempView            createOrReplaceTempView   crossJoin   
createOrReplaceGlobalTempView   createTempView 
*/

If you are writing in a file program, you need to import the following implicit transformations.

    import spark.implicits._ #spark is the variable name
    import org.apache.spark.sql.functions._
2,DSL

DSL is a specific language, such as scala, java, etc.

Call the api for processing.

#scala DSL programming
scala> userds.select('id,'username).orderBy(desc("id")).show()
+---+--------+
| id|username|
+---+--------+
|  3|     Jim|
|  2|    jack|
|  1|    Bret|
+---+--------+

Programming in Java.

 final SparkSession spark = SparkSession.builder().master("local[3]").getOrCreate();

        Dataset<Row> dataset = spark.read().json("spark/data/user3.json");

        try{
            dataset.createTempView("users");
			//SQL
            spark.sql("select id,username from users order by id desc").show();
        }catch (Exception e){
            e.printStackTrace();
        }
		
		//DSL
		dataset.filter(dataset.col("id").geq(2)).orderBy(dataset.col("id").desc()).show();
		
        spark.close();

3, UDF

UDF: like Hive, spark allows users to customize functions to supplement functions.

Both SQL mode and DSL mode are supported.

1. SQL mode

#Register function
scala> nsp.udf.register("plus0",(id:String)=>0+id)
res11: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3815/2132917061@2c634771,StringType,List(Some(Schema(StringType,true))),Some(plus0),true,true)

scala> nsp.sql("select plus0(id),username from users order by id desc").show
+---------+--------+
|plus0(id)|username|
+---------+--------+
|       03|     Jim|
|       02|    jack|
|       01|    Bret|
+---------+--------+

#Register function
scala> nsp.udf.register("plus1",(id:Int)=>1+id)
res13: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3857/1612958077@7799dc9a,IntegerType,List(Some(Schema(IntegerType,false))),Some(plus1),false,true)

scala> nsp.sql("select plus1(id),username from users order by id desc").show
+---------+--------+
|plus1(id)|username|
+---------+--------+
|        4|     Jim|
|        3|    jack|
|        2|    Bret|
+---------+--------+


2. DSL mode

                       
#Define function
scala> val prefix_name =udf(
     | (name:String)=>{ "user: "+name})

prefix_name: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3567/441398584@40f06edc,StringType,List(Some(Schema(StringType,true))),None,true,true)

#Nested field name
scala> userds.select($"id",prefix_name('username).as("newname")).orderBy(desc("id")).show()
+---+----------+
| id|   newname|
+---+----------+
|  3| user: Jim|
|  2|user: jack|
|  1|user: Bret|
+---+----------+

Java writing

	#Receive parameter name and a function interface
spark.udf().register("prefix_name", new UDF1<String, String>() {

           @Override
           public String call(String s) throws Exception {
               return "user: "+s;
           }
       }, DataTypes.StringType);


        try{
            dataset.createTempView("users");
	#Same as SQL
            spark.sql("select id,prefix_name(username) as new_name from users order by id desc").show();
        }catch (Exception e){
            e.printStackTrace();
        }

+---+----------+
| id|  new_name|
+---+----------+
|  3| user: Jim|
|  2|user: jack|
|  1|user: Bret|
+---+----------+

4, Spark on Hive

SparkSQL integrates Hive, loads and reads Hive table data for analysis, which is called Spark on Hive;

The underlying partition engine of Hive framework can change MapReduce to Spark, which is called Hive on Spark;

Spark on Hive is equivalent to spark using Hive data.

Hive on Spark is equivalent to hive changing its calculation from MR to SparkRDD

Spark on Hive

1,Sparksql

Spark sql is an interactive mode provided by spark. It uses sql statements for processing. By default, it uses the data in hive.

You need to put the mysql driver under the jar directory or specify the driver.

spark-sql --master local[*]

#Specify drive
spark-sql  --master local[*] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar

2. Using spark on hive in IDEA

Add the dependency of spark operation hive.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>3.0.0-preview2</version>
    </dependency>

core-site. xml,hdfs-site. xml,hive-site. All XML files are copied to the Rsource directory

You can use Spark to operate Hive data.

 	val conf = new SparkConf().setAppName("JDBCDemo$").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf)
			 .enableHiveSupport()//!!!! External hives are not supported by default. Here, the method needs to be called to support external hives
			.getOrCreate()

    import spark.implicits._


    spark.sql("use spark_sql_hive") 


    spark.sql(
      """
        |select t.id,t.name,t.email from
        |(select cast('id' as INT) ,id ,name,email from user where id >1001) t
      """.stripMargin).show()

Tags: Hadoop

Posted by bgoulette612 on Fri, 03 Jun 2022 15:49:05 +0530