The information comes from a handsome man: 20 multiple-choice questions, 2 points for one question, 2~3 big questions: mapreduce solution, spark RDD, hdfs (multiple-choice questions), hbase (data table selection design problems, operation problems)
1, Some basic concepts
1.python Foundation
# 1).For single line notes“#”, multi line comments wrap the contents with a pair of '' or ''.
# 2). Input and output of Python: a = int(input()) b = int(input()) print("%d + %d = %d" % (a, b, a + b))
# 3). Nesting and looping: -------------------------------- #Judgement statement if x <= 0 : x = -1 -------------------------------- #Traverse by index for x in range(list): print(list(x)) -------------------------------- #Direct traversal value for x in list: print(x) -------------------------------- #while Loop n = 5 while n > 0 : print(" I love xr ! ") n=n-1 --------------------------------
# 4). Function: def move(x, y, step, angle=0): nx = x + step * math.cos(angle) ny = y - step * math.sin(angle) return nx, ny The result of command line call is as follows: >>> x, y = move(100, 100, 60, math.pi / 6) >>> print(x, y) 151.96152422706632 70.0
# 5). list: Keep 8 decimal places: list.append('{:.8}.format(4*pi)')
# 6). tuple:
# 7).pymysql linked database # Import pymysql module import pymysql # Connect to database conn = pymysql.connect(host='localhost', port=3306, user='root', passwd='root', charset='utf8', db = 'mydb') # Get a cursor object that can execute SQL statements cursor = conn.cursor() # Define the SQL statement to execute sql = """ CREATE TABLE USER1 ( id INT auto_increment PRIMARY KEY , name CHAR(10) NOT NULL UNIQUE, age TINYINT NOT NULL )ENGINE=innodb DEFAULT CHARSET=utf8; #Note: charset='utf8'cannot be written as utf-8 """ # Execute SQL statement cursor.execute(sql) # Close cursor object cursor.close() # Close database connection conn.close()
2. distributed file system HDFS
When the size of a data set exceeds the storage capacity of an independent physical computer, it is necessary to partition it and store it on several separate computers. Managing the file system stored across multiple computers in the network is called distributed file system.
Hadoop comes with a distributed file system called HDFS (Hadoop distributed file system). Sometimes called DFS, they are the same thing.
NameNode and DataNode
HDFS has two types of nodes to manage the data of the cluster: one namenode (management node) and multiple datanode s (work nodes). Namenode manages the namespace of the file system. It maintains the system number and all the files and directories in the whole tree. These information is permanently stored on the local disk in two forms: namespace image file and editing log file. Namenode also records the data node information of each block in each file, but it does not permanently save the block location information, This information will be reconstructed according to the node information when the system is started.
The client accesses the entire system on behalf of the user by interacting with the namenode and datanode. The client provides a file system structure similar to POSIX (portable operating system interface), so users can realize functions without knowing namenode and datanode when programming.
Datanodes are the working nodes of the file system. They store and retrieve data blocks as needed, and regularly send a list of their stored data blocks to the namenode.
3. big data computing framework MapReduce
What is MapReduce?
The task is to mine and analyze the data log of China's Meteorological Center in recent years. The size of the data log is 3T, allowing you to analyze and calculate the maximum temperature of each year. If you only have one computer now, how to deal with it? I think you should be able to read these data and compare them with the current maximum temperature. After comparing all the data, we can get the maximum temperature. However, we all know from our experience that it must be very time-consuming to process so much data.
If I gave you three machines now, what would you do? You should think of the following figure: the best processing method is to cut the data into three pieces, calculate and process the data (Map) respectively, send it to a machine for merging, calculate the merge d data, reduce and output.
This is a relatively complete MapReduce process.
4. distributed database (Hbase)
# Start hbase start-hbase.sh # Enter hbase shell window hbase shell # Create a table create 'test','data' # Check whether the table is created successfully list # Add data put 'test','row1','data:1','value1' put 'test','row2','data:2','value2' # get data get 'test','row1' # View all data scan 'test' # Delete data delete 'test','row1' # Delete table disable 'test' #Set the table to disabled first drop 'test'
1) Overall architecture of Hbase distributed environment
Zookeeper can provide collaborative services for HBase and is an important component of HBase. Zookeeper can monitor the health status of HBase in real time and make corresponding treatment.
HMaster is the main service of HBase. It is responsible for monitoring all hregionservers in the cluster and managing tables and regions, such as creating, modifying, and removing tables.
HRegionServer is an instance of RegionServer. It is responsible for serving and managing multiple HRegion instances and directly responding to user read-write requests.
HRegion is the basic unit for dividing tables. When a table is just created, there is only one Region. However, with the increase of records, the table will become larger and larger. HRegionServer will track the size of the Region in real time. When the Region increases to a certain value, it will perform a split operation, and one Region will be cut into two regions.
In general, the deployment of a distributed HBase database requires the cooperation of various components. HBase manages distributed applications through Zookeeper, which is equivalent to an administrator. HBase stores data in HDFS (distributed file system) and stores data through HDFS. Therefore, the overall idea of building a distributed HBase database is also here, that is, to integrate various services.
2). Build a happybase environment to prepare for writing python programs to access data in HBase
Happybase is mainly used to operate hbase. First, we need to install the happybse environment, then start hdfs and hbase, and finally test Python happybase.
To connect to the HBase database using happybase:
import happybase
happybase.Connection(host='localhost', port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b'_', compat='0.98', transport='buffered', protocol='binary')
Get connection instance
host: hostname
port: port
Timeout: timeout
autoconnect: whether the connection is opened directly
table_prefix: prefix used to construct table names
table_prefix_separator: used for table_prefix separator
compat: compatibility mode
Transport: transport mode
protocol: protocol
# Create a table connection.create_table( 'my_table', { 'cf1': dict(max_versions=10), 'cf2':dict(max_versions=1,block_cache_enabled=False), 'cf3': dict(), # use defaults } ) At this point, we will pass connection.tables()View available tableļ¼The result is['my_table'] Created table Namely my_table Contains 3 column families: cf1,cf2,cf3
5. big data computing framework Spark
1). Installing and configuring the Scala development environment
Why should we install and configure scala?
Because the Spark framework bottom layer is developed in Scala, and the code written in scala is much simpler than java, installing and configuring the scala environment is the preparatory work we should complete before learning Spark.
2). Spark architecture
As above, we need to know what Spark is first?
Apache Spark is a fast and universal computing engine designed for large-scale data processing. We know MapReduce. Spark is a general parallel framework similar to Hadoop MapReduce. Spark has all the advantages of Hadoop MapReduce. It can even better use MapReduce algorithms that need iteration, such as data mining and machine learning.
Basic concepts:
Application: Spark application written by the user, including one Driver and multiple executors.
Driver: the driver in Spark runs the main function of the above Application and creates a SparkContext. The purpose of creating a SparkContext is to prepare the running environment of the Spark Application. In Spark, a SparkContext is responsible for communicating with the ClusterManager for resource Application, task allocation and monitoring. When the Executor is partially running, the driver is also responsible for closing the SparkContext.
Executor: a process running on the WorkerNode, which is responsible for running tasks.
RDD: elastic distributed data set, an abstract concept of distributed memory, provides a highly restricted shared memory model.
DAG: directed acyclic graph, which reflects the dependency between RDD S.
Task: the unit of work running on the Executor.
Job: a job contains multiple RDDS and various operations acting on the corresponding RDDS.
Stage: is the basic scheduling unit of a Job. A Job is divided into multiple groups of tasks. Each group of tasks is called stage, or TaskSet, which represents a Task set composed of a group of associated tasks without Shuffle dependencies.
Cluster Manager: refers to an external service that obtains resources on a cluster. There are currently three types:
Standalon e: Spark's native resource management. The Master is responsible for resource allocation;
Apache Mesos: a resource scheduling framework with good compatibility with Hadoop MR;
Hadoop Yarn: mainly refers to the ResourceManager in Yarn.
3). Spark operation process
- Build the running environment of Spark Application and start the SparkContext;
- SparkContext applies to the resource manager (which can be Standalone, Mesos, Yan) to run the Executor resource;
- The Executor applies for a Task from SparkContext;
- SparkContext builds DAG diagrams, decomposes the DAG diagrams into stages, encapsulates the stages into tasksets and sends them to the Task Scheduler. Finally, the Task Scheduler sends the tasks to the Executor for running;
- The Task runs on the Executor, and all resources are released after running.
4). RDD understanding detection
In Spark, all operations on data are nothing more than creating RDDS, converting existing RDDS, and invoking RDD operations for evaluation. Simply put, RDD is a collection that stores the data in the collection on different machines. Is that clear?
If you don't know, let's take another example,
# How to create a Spark RDD using set parallelization? # -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List from 1 to 8 data = [1,2,3,4,5,6,7,8] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects the contents of the RDD. rdd.collect() is a Spark Action operator, which will be described in detail in the following content. Its main function is to collect the data content of RDD sum = rdd.collect() # 5. print the contents of rdd print(sum) # 6. stop SparkContext sc.stop() #********** End **********#
Now that we know what RDD is, let's talk about its five features.
- A Partition is the basic unit of a data set. For RDD, each shard will be processed by a computing task and determine the granularity of parallel computing. You can specify the number of RDD slices when creating an RDD. If it is not specified, the default value will be used. The default value is the number of CPU cores allocated by the program.
- A function that evaluates each partition. RDD calculation in Spark is based on slices. Each RDD implements the compute function to achieve this goal. The compute function composes the iterators without saving the results of each calculation.
- Dependencies between RDDS. Each transformation of RDD will generate a new RDD, so a pipeline like dependency will be formed between RDDS. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD.
- A Partitioner, that is, the partition function of RDD. Currently, Spark implements two types of sharding functions: hash based HashPartitioner and range based RangePartitioner. Only RDDS with key value can have a Partitioner, and the value of the Partitioner of RDDS without key value is None. The Partitioner function determines not only the number of partitions of the RDD itself, but also the number of partitions when the parent RDD Shuffle is output.
- A list that stores the preferred location for accessing each Partition. For an HDFS file, this list saves the location of the block where each Partition is located. According to the concept of "mobile data is not as good as mobile computing", Spark will try its best to allocate computing tasks to the storage location of data blocks to be processed when scheduling tasks.
5). SparkSQL
We started to write Spark SQL. Where do we start? The answer is SparkSession.
SparkSession is the entry to Spark SQL. To create a basic SparkSession, simply use SparkSession builder().
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
With SparkSession, the next step is to create a DataFrame
Using SparkSession, you can create a DataFrame from an existing RDD, Hive table, or Spark data source (json, parquet, jdbc, orc, libsvm, csv, text) and other format files
The following example creates a DataFrame for reading a Json file.
df =spark.read.json("/people.json")
people.json data is as follows:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
Using DataFrame
#Print Schema information df.printSchema() #Select name column df.select("name").show() adopt SQL Statement mode #First register df as a view df.createOrReplaceTempView("p") #Through spark SQL ("SQL statement") execute SQL statement sqlDF = spark.sql("SELECT name FROM p") sqlDF.show() # Write and save to the specified path df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges") # Overwrite the original data and save it to the f:\test path df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")
2, Original title of educator
1.python Foundation
-------------------------------------------------------------- # Celsius conversion: Fahrenheit to Celsius def Table_For(min,max): #Please print the list here with a for loop # Please add implementation code here # # ********** Begin *********# print("Fahrenheit degree\t\t centigrade\t\t Approximate centigrade") print("****************************************") for i in range(min, max + 10, 10): b = (i - 32)/1.8 c = (i - 30) / 2 print("%d\t\t%.1f\t\t%.1f" % (i,b, c)) return 0 -------------------------------------------------------------- # Reading files and storing data as files def solve(file): sum = [] ans ={} with open(file, encoding='utf-8') as file_obj: lines = file_obj.readlines() for line in lines[2:]: k = line.rstrip().split('\t') # # print(k) sum.append(k) print(sum) solve('src/step1/constants.txt') #********** End **********#
2.hdfs (multiple choice)
choice question? I think it might be some command sentences.
1). Command line under Linux
# create folder mkdir /app # create a file touch hello.txt # Switch to /opt directory cd /opt # Unzip the compressed file tar -zxvf jdk-8u171-linux-x64.tar.gz # move file mv jdk1.8.0_171/ /app # Edit profile vim /etc/profile # Make the configuration effective source /etc/profile # Format HDFS file hadoop namenode -format # Start hadoop start-dfs.sh # Validate Hadoop
2). Command line H under HDFS
# Create folder in HDFS hadoop fs -mkdir /test # Check whether the creation is successful hadoop fs -ls / # Upload files to HDFS hadoop fs -put hello.txt /test # see file hadoop fs -cat /test/hello.txt
3.hbase (data table selection, design and operation)
1) Use python code to add, delete, and view data to the HBase table
1, Add data table = connection.table('my_table') #Get the table object first cloth_data = {'cf1:content': 'jeans', 'cf1:price': '299', 'cf1:rating': '98%'} hat_data = {'cf1:content': 'cap', 'cf1:price': '88', 'cf1:rating': '99%'} # Using put, only one row of data can be stored at a time. If the row key already exists, it becomes modified data table.put(row='www.test1.com', data=cloth_data) table.put(row='www.test2.com', data=hat_data) # Using batch to insert multiple rows of data at once bat = table.batch() bat.put('www.test5.com', {'cf1:price': 999, 'cf2:title': 'Hello Python', 'cf2:length': 34, 'cf3:code': 'A43'}) bat.put('www.test6.com', {'cf1:content': 'razor', 'cf1:price': 168, 'cf1:rating': '97%'}) bat.send() # Use the context manager to manage batch, so you don't need to send data manually, that is, bat send() with table.batch() as bat: bat.put('www.test5.com', {'cf1:price': '999', 'cf2:title': 'Hello Python', 'cf2:length': '34', 'cf3:code': 'A43'}) bat.put('www.test6.com', {'cf1:content': u'razor', 'cf1:price': '168', 'cf1:rating': '97%'})
2, Delete data with table.batch() as bat: bat.delete('www.test1.com')
3, Retrieving data # Global scan a table for key, value in table.scan(): print key, value # Retrieve a row of data row = table.row('www.test4.com') print row # Retrieving multiline data rows = table.rows(['www.test1.com', 'www.test4.com'])print rows
4.mapReduce solution
1) Count the number of occurrences of each word in two text files.
# mapper.py #! /usr/bin/python3 import sys def main(): # Accept data rows from the standard input stream, and call the mapper function for each row to process for line in sys.stdin: line = line.strip() mapper(line) # Each line is divided into words, represented by word # hadoop streaming requires that key value pairs be output in the form of "key \t value" def mapper(line): words = line.split(' ') for word in words: if len(word.strip()) == 0: continue print("%s\t%s" % (word, 1)) if __name__ == '__main__': main() ------------------------------------------------------------- #Reduce.py #! /usr/bin/python3 import sys from operator import itemgetter # Sum values and output in the form of "word frequency". def reducer(k, values): print("%s\t%s" % (k, sum(values))) def main(): current_key = None values = [] _key, _value = '', 0 for line in sys.stdin: line = line.strip() _key, _value = line.split('\t', 1) _value = eval(_value) if current_key == _key: values.append(_value) else: if current_key: reducer(current_key, values) values = [] values.append(_value) current_key = _key # Don't forget the last key value pair if current_key == _key: reducer(current_key, values) if __name__ == '__main__': main()
2) Merge the two files and eliminate duplicates
# mapper.py #! /usr/bin/python3 import sys def main(): for line in sys.stdin: line = line.strip() mapper(line) def mapper(line): ########## Begin ############### num,st = line.split(' ') print("%s\t%s" %(num,st)) ########### End ############# if __name__ == '__main__': main() -------------------------------------------------------------- # Reduce.py #! /usr/bin/python3 import sys def reducer(k, values): ############ Begin ################ for value in sorted(list(set(values))): print("%s\t%s" %(k,value)) ############ End ################ def main(): current_key = None values = [] akey, avalue = None, None for line in sys.stdin: line = line.strip() akey, avalue = line.split('\t') if current_key == akey: values.append(avalue) else: if current_key: reducer(current_key, values) values = [] values.append(avalue) current_key = akey if current_key == akey: reducer(current_key, values) if __name__ == '__main__': main()
3) Seek to mine the parent-child generation relationship and give the table of grandson generation relationship
# mapper.py #! /usr/bin/python3 import sys def main(): for line in sys.stdin: line = line.strip() if line.startswith('child'): pass else: mapper(line) def mapper(line): ############### Begin ############ child, parent = line.split(' ') print("%s\t-%s" % (child, parent)) print("%s\t+%s" % (parent, child)) ############### End ############# if __name__ == '__main__': main() # Reduce.py #! /usr/bin/python3 import sys def reducer(k, values): ############## Begin ################ grandparents = [] grandson = [] for v in values: if v.startswith('-'): grandparents.append(v[1:]) else: grandson.append(v[1:]) for i in grandson: for j in grandparents: print("%s\t%s" % (i, j)) ############## End ################# def main(): current_key = None values = [] akey, avalue = None, None print("grand_child\tgrand_parent") for line in sys.stdin: line = line.strip() try: akey, avalue = line.split('\t') except: continue if current_key == akey: values.append(avalue) else: if current_key: reducer(current_key, values) values = [] values.append(avalue) current_key = akey if current_key == akey: reducer(current_key, values) if __name__ == '__main__': main()
4) Data cleaning
#! /usr/bin/python3 # mapper.py import sys from dbhelper import DBHelper import codecs import time # Get the "province / City Code: Province / city name" item and save it in the dictionary regions; # Get the "phone number: name" item and save it in the dictionary userphones. regions = DBHelper.get_region() userphones = DBHelper.get_userphones() def main(): # Correctly output utf-8 encoded Chinese characters sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach()) for line in sys.stdin: line = line.strip() mapper(line) def mapper(line): # Output a string like "Deng Er, Zhang Qian, 13666666615151889601, 2018-03-29 10:58:122018-03-29 10:58:42,30, Heilongjiang Province, Shanghai" # The reduce phase is not required for this question, and the content required by the question can be output. The form of "key \t value" is not required. ########## begin ############## items = line.split(',') caller = userphones.get(items[0]) receiver = userphones.get(items[1]) start_time = int(items[2]) end_time = int(items[3]) region_caller = regions.get(items[4]) region_receiver = regions.get(items[5]) print(caller,receiver,sep = ',',end = ',') print(','.join(items[:2]), end = ',') print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),end=',') print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)),end=',') print(str(end_time - start_time), end = ',') print(region_caller, region_receiver, sep = ',') ########### End ################# if __name__ == '__main__': main() --------------------------------------------------------------- # dbhelper.py import pymysql import sys import codecs class DBHelper: def get_connection(): # Establish a connection "conn" to the mysql server according to the credentials provided in the title. Note that the character set is specified as "utf8mb4" ######## Begin ############ conn = pymysql.connect( host = 'localhost', user = 'root', password = '123123', db = 'mydb', port = 3306, charset = 'utf8mb4' ) ######## End ############ return conn @classmethod def get_region(cls): conn = cls.get_connection() regions = dict() with conn.cursor() as cur: #Query all provincial and municipal codes and names from the database and save them to the dictionary regions. ############ Begin ################### sqltxt = 'select CodeNum, Address from allregion;' cur.execute(sqltxt) for row in cur.fetchall(): regions[row[0]] = row[1].strip() ############ End ################# conn.close() return regions @classmethod def get_userphones(cls): conn = cls.get_connection() userphones = dict() with conn.cursor() as cur: #Query all phone numbers and corresponding names from the database and save them to the dictionary userphones. ############ Begin ################### sqltxt = 'select phone, trueName from userphone;' cur.execute(sqltxt) for row in cur.fetchall(): userphones[row[0]] = row[1] ############ End ################# conn.close() return userphones def main(): sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach()) region = DBHelper.get_region() users = DBHelper.get_userphones() ''' for k, v in region.items(): print(k, ':', v) print('-------------') for k, v in users.items(): print(k, ':', v) ''' if __name__ == '__main__': main()
5.SparkRDD
Spark submit calculation of PI
1). Read external dataset create RDD
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == '__main__': #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # Text file RDD can use the textFile method to create SparkContext. This method requires a URI file (on the machine with the local path, or a URI such as hdfs://, s3a://) and reads it as a collection of rows # 2. read the local file. The URI is: /root/wordcount txt distFile = sc.textFile("/root/wordcount.txt") # 3. use rdd Collect() collects the contents of the RDD. rdd.collect() is a Spark Action operator, which will be described in detail in the following content. Its main function is to collect the data content of RDD s = distFile.collect() # 4. print the contents of rdd print(s) # 5. stop SparkContext sc.stop() #********** End **********#
2). map operator is used to convert an even number to the square of the number; An odd number is converted to its cube.
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local","Simple App") # 2. create a List from 1 to 5 list1 = [1,2,3,4,5] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(list1) # 4. use rdd Collect() collects the elements of RDD. print(rdd.collect()) """ use map Operator, will rdd Data for (1, 2, 3, 4, 5) Perform conversion according to the following rules: Requirements: Even number is converted to the square of the number Odd number converted to cube of the number """ # 5. use the map operator to complete the above requirements rdd_map = rdd.map(lambda x : x*x if x%2 == 0 else x*x*x) # 6. use rdd Collect() collects the elements that complete the map transformation print(rdd_map.collect()) # 7. stop SparkContext sc.stop() #********** End **********#
3). Use the mapPartitions operator to combine the string and the length of the string into a tuple.
# -*- coding: UTF-8 -*- from pyspark import SparkContext #********** Begin **********# def f(iterator): list1 = [] for x in iterator: length = len(x) list1.append((x,length)) return list1 #********** End **********# if __name__ == "__main__": #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. a List with contents of ("dog", "salmon", "salmon", "rat", "elephant") data = ["dog", "salmon", "salmon", "rat", "elephant"] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects the elements of RDD. print(rdd.collect()) """ use mapPartitions Operator, will rdd Data for ("dog", "salmon", "salmon", "rat", "elephant") Perform conversion according to the following rules: Requirements: Combines a string with its length into a tuple, for example: dog --> (dog,3) salmon --> (salmon,6) """ # 5. use mapPartitions operator to complete the above requirements partitions = rdd.mapPartitions(f) # 6. use rdd Collect() collects elements that complete mapPartitions conversion print(partitions.collect()) # 7. stop SparkContext sc.stop() #********** End **********#
4). Use the filter operator to filter out all odd numbers in rdd(1, 2, 3, 4, 5, 6, 7, 8).
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List from 1 to 8 data = [1,2,3,4,5,6,7,8] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects the elements of RDD. print(rdd.collect()) """ use filter Operator, will rdd Data for (1, 2, 3, 4, 5, 6, 7, 8) Perform conversion according to the following rules: Requirements: Filter out rdd Odd numbers in """ # 5. use filter operator to complete the above requirements rdd_filter = rdd.filter(lambda x:x%2==0) # 6. use rdd Collect() collects elements that complete the filter transformation print(rdd_filter.collect()) # 7. stop SparkContext sc.stop() #********** End **********#
5) . Use the flatMap operator to merge RDD elements: ([1,2,3], [4,5,6]) -- > (1,2,3,4,5,6)
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List of [[1, 2, 3], [4, 5, 6], [7, 8, 9]] data = [[1,2,3],[4,5,6],[7,8,9]] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects the elements of RDD. print(rdd.collect()) """ use flatMap Operator, will rdd Data for ([1, 2, 3], [4, 5, 6], [7, 8, 9]) Perform conversion according to the following rules: Requirements: merge RDD For example: ([1,2,3],[4,5,6]) --> (1,2,3,4,5,6) ([2,3],[4,5],[6]) --> (1,2,3,4,5,6) """ # 5. use filter operator to complete the above requirements flat_map = rdd.flatMap(lambda x:x) # 6. use rdd Collect() collects elements that complete the filter transformation print(flat_map.collect()) # 7. stop SparkContext sc.stop() #********** End **********#
6). Use the distinct operator to de duplicate the data in the rdd.
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": #********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List containing (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects elements of RDD print(rdd.collect()) """ use distinct Operator, will rdd Data for (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) Perform conversion according to the following rules: Requirements: Element de duplication, for example: 1,2,3,3,2,1 --> 1,2,3 1,1,1,1, --> 1 """ # 5. use the distinct operator to complete the above requirements distinct = rdd.distinct() # 6. use rdd Collect() collects the elements that complete the distinct transformation print(distinct.collect()) # 7. stop SparkContext sc.stop() #********** End **********#
7). Use the sortBy operator to sort (ascending) the data in rdd
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List containing (1, 3, 5, 7, 9, 8, 6, 4, 2) data = [1, 3, 5, 7, 9, 8, 6, 4, 2] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects elements of RDD print(rdd.collect()) """ use sortBy Operator, will rdd Data for (1, 3, 5, 7, 9, 8, 6, 4, 2) Perform conversion according to the following rules: Requirements: Element sorting, for example: 5,4,3,1,2 --> 1,2,3,4,5 """ # 5. use sortBy operator to complete the above requirements by = rdd.sortBy(lambda x: x) # 6. use rdd Collect() collects the elements that complete the sortBy transformation print(by.collect()) # 7. stop SparkContext sc.stop() #********** End **********#
8). Use the sortByKey operator to sort (ascending) the data in the rdd.
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List with contents of [(B',1),('A',2),('C',3)] data=[('B',1),('A',2),('C',3)] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects elements of RDD print(rdd.collect()) """ use sortByKey Operator, will rdd Data for ('B', 1), ('A', 2), ('C', 3) Perform conversion according to the following rules: Requirements: Element sorting, for example: [(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)] """ # 5. use sortByKey operator to complete the above requirements key = rdd.sortByKey() # 6. use rdd Collect() collects the elements that complete the sortByKey transformation print(key.collect()) # 7. stop SparkContext sc.stop() # ********** End **********#
9). Use mapValues operator to convert the even number to the square of the number and the odd number to the cube of the number
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List with [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)] data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects elements of RDD print(rdd.collect()) """ use mapValues Operator, will rdd Data for ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) Perform conversion according to the following rules: Requirements: Element( key,value)of value Do the following: Even number is converted to the square of the number Odd number converted to cube of the number """ # 5. use mapValues operator to complete the above requirements values = rdd.mapValues(lambda x: x*x if x%2==0 else x*x*x) # 6. use rdd Collect() collects the elements that complete the mapValues transformation print(values.collect()) # 7. stop SparkContext sc.stop() # ********** End **********#
10). Use the reduceByKey operator to accumulate the data in RDD (key value type).
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List with [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. use rdd Collect() collects elements of RDD print(rdd.collect()) """ use reduceByKey Operator, will rdd Data for[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] Perform conversion according to the following rules: Requirements: Element( key-value)of value Accumulation operation, for example: (1,1),(1,1),(1,2) --> (1,4) (1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4) """ # 5. use the reduceByKey operator to complete the above requirements # ruduceBy = rdd.reduceByKey(lambda x,y:x+y).collect() # 6. use rdd Collect() collects the elements that complete the reduceByKey conversion print(rdd.reduceByKey(lambda x,y:x+y).collect()) # 7. stop SparkContext sc.stop() # ********** End **********#
11). Actions - Common operators
# -*- coding: UTF-8 -*- from pyspark import SparkContext if __name__ == "__main__": # ********** Begin **********# # 1. initialize SparkContext, which is the entry of Spark Program sc = SparkContext("local", "Simple App") # 2. create a List with contents of [1, 3, 5, 7, 9, 8, 6, 4, 2] data = [1, 3, 5, 7, 9, 8, 6, 4, 2]; # 3. create rdd through SparkContext parallelization rdd = sc.parallelize(data) # 4. collect all elements of rdd and print out print(rdd.collect()) # 5. count the number of rdd elements and print out print(rdd.count()) # 6. get the first element of rdd and print it print(rdd.first()) # 7. get the first three elements of rdd and print out print(rdd.take(3)) # 8. aggregate all elements of rdd and print out print(rdd.reduce(lambda x,y:x+y)) # 9. stop SparkContext sc.stop() # ********** End **********#
6. SparkSQL
1). Statistics of fighter flight performance using Spark SQL
# coding=utf-8 from pyspark.sql import SparkSession #**********Begin**********# #Create SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.sql.crossJoin.enabled", "true") \ .master("local") \ .getOrCreate() #Read data in /root/jun.json df =spark.read.json("/root/jun.json") #Create view df.createOrReplaceTempView("table1") #According to the statistics, the world's top three fighter planes sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`Maximum flight speed`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `name` from table1 order by SPEED desc LIMIT 3") #Save results sqlDF.write.format("csv").save("/root/airspark") #**********End**********# spark.stop()
2). Use Spark SQL to calculate the proportion of fighter developed by each R & D unit
# coding=utf-8 from pyspark.sql import SparkSession #**********Begin**********# #Create SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.sql.crossJoin.enabled", "true") \ .master("local") \ .getOrCreate() #Read data in /root/jun.json df =spark.read.json("/root/jun.json").coalesce(1) #Create view df.createOrReplaceTempView("table1") #Calculate the proportion of fighter planes developed by various R & D units in all fighter planes in the world sqlDF = spark.sql("select concat(cast(round(count(`R & D unit`)*100/(select count(`R & D unit`) from table1 where `R & D unit` is not null and `name` is not null ),2) as float),'%'),`R & D unit` from table1 where `R & D unit` is not null and `name` is not null group by `R & D unit`") #Save results sqlDF.write.format("csv").save("/root/airspark") #**********End**********# spark.stop()
3). Taxi track data cleaning
# -*- coding: UTF-8 -*- from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.master("local").appName("demo").getOrCreate() #**********begin**********# df = spark.read.option("header", True).option("delimiter", "\t").csv("/root/data.csv") df.createOrReplaceTempView("table1") DataFrame = spark.sql( """ select\ regexp_replace(TRIP_ID,'[$@]','') TRIP_ID,regexp_replace(CALL_TYPE,'[$@]','') CALL_TYPE,regexp_replace(ORIGIN_CALL,'[$@]','') ORIGIN_CALL,regexp_replace(TAXI_ID,'[$@]','') TAXI_ID,regexp_replace(ORIGIN_STAND,'[$@]','') ORIGIN_STAND,regexp_replace(TIMESTAMP,'[$@]','') TIMESTAMP, regexp_replace(POLYLINE,'[$@,-.\\\\[\\\\]]','') POLYLINE\ from table1 """) DataFrame.show() #**********end**********# spark.stop()