Big data final review

The information comes from a handsome man: 20 multiple-choice questions, 2 points for one question, 2~3 big questions: mapreduce solution, spark RDD, hdfs (multiple-choice questions), hbase (data table selection design problems, operation problems)

1, Some basic concepts

1.python Foundation

# 1).For single line notes“#”, multi line comments wrap the contents with a pair of '' or ''.
# 2). Input and output of Python:
	a = int(input())
    b = int(input())
	print("%d + %d = %d" % (a, b, a + b))
# 3). Nesting and looping:
--------------------------------
	#Judgement statement
	if x <= 0 :
		x = -1
--------------------------------
	#Traverse by index
	for x in range(list):
		print(list(x))
--------------------------------
	#Direct traversal value
	for x in list:
		print(x)
--------------------------------
	#while Loop 
	n = 5
	while n > 0 :
		print(" I love xr ! ")
		n=n-1
--------------------------------
# 4). Function:
	def move(x, y, step, angle=0):
    nx = x + step * math.cos(angle)
    ny = y - step * math.sin(angle)
    return nx, ny
    
    The result of command line call is as follows:
    >>> x, y = move(100, 100, 60, math.pi / 6)
	>>> print(x, y)
	151.96152422706632 70.0
# 5). list:
	Keep 8 decimal places: list.append('{:.8}.format(4*pi)')

# 6). tuple:

# 7).pymysql linked database
	# Import pymysql module
	import pymysql
	# Connect to database
	conn = pymysql.connect(host='localhost', port=3306,
                       user='root', passwd='root',
                       charset='utf8', db = 'mydb')
	# Get a cursor object that can execute SQL statements
	cursor = conn.cursor()
	# Define the SQL statement to execute
	sql = """
	CREATE TABLE USER1 (
		id INT auto_increment PRIMARY KEY ,
		name CHAR(10) NOT NULL UNIQUE,
		age TINYINT NOT NULL
	)ENGINE=innodb DEFAULT CHARSET=utf8;  #Note: charset='utf8'cannot be written as utf-8
		"""
	# Execute SQL statement
	cursor.execute(sql)
	# Close cursor object
	cursor.close()
	# Close database connection
	conn.close()

2. distributed file system HDFS

When the size of a data set exceeds the storage capacity of an independent physical computer, it is necessary to partition it and store it on several separate computers. Managing the file system stored across multiple computers in the network is called distributed file system.

Hadoop comes with a distributed file system called HDFS (Hadoop distributed file system). Sometimes called DFS, they are the same thing.

NameNode and DataNode
HDFS has two types of nodes to manage the data of the cluster: one namenode (management node) and multiple datanode s (work nodes). Namenode manages the namespace of the file system. It maintains the system number and all the files and directories in the whole tree. These information is permanently stored on the local disk in two forms: namespace image file and editing log file. Namenode also records the data node information of each block in each file, but it does not permanently save the block location information, This information will be reconstructed according to the node information when the system is started.

The client accesses the entire system on behalf of the user by interacting with the namenode and datanode. The client provides a file system structure similar to POSIX (portable operating system interface), so users can realize functions without knowing namenode and datanode when programming.
Datanodes are the working nodes of the file system. They store and retrieve data blocks as needed, and regularly send a list of their stored data blocks to the namenode.

3. big data computing framework MapReduce

What is MapReduce?
The task is to mine and analyze the data log of China's Meteorological Center in recent years. The size of the data log is 3T, allowing you to analyze and calculate the maximum temperature of each year. If you only have one computer now, how to deal with it? I think you should be able to read these data and compare them with the current maximum temperature. After comparing all the data, we can get the maximum temperature. However, we all know from our experience that it must be very time-consuming to process so much data.

If I gave you three machines now, what would you do? You should think of the following figure: the best processing method is to cut the data into three pieces, calculate and process the data (Map) respectively, send it to a machine for merging, calculate the merge d data, reduce and output.

This is a relatively complete MapReduce process.

4. distributed database (Hbase)

# Start hbase
start-hbase.sh
# Enter hbase shell window
hbase shell
# Create a table
create 'test','data'
# Check whether the table is created successfully
list
# Add data
put 'test','row1','data:1','value1'
put 'test','row2','data:2','value2'
# get data
get 'test','row1'
# View all data
scan 'test'

# Delete data
delete 'test','row1'
# Delete table
disable 'test' #Set the table to disabled first
drop 'test'
1) Overall architecture of Hbase distributed environment

Zookeeper can provide collaborative services for HBase and is an important component of HBase. Zookeeper can monitor the health status of HBase in real time and make corresponding treatment.

HMaster is the main service of HBase. It is responsible for monitoring all hregionservers in the cluster and managing tables and regions, such as creating, modifying, and removing tables.

HRegionServer is an instance of RegionServer. It is responsible for serving and managing multiple HRegion instances and directly responding to user read-write requests.

HRegion is the basic unit for dividing tables. When a table is just created, there is only one Region. However, with the increase of records, the table will become larger and larger. HRegionServer will track the size of the Region in real time. When the Region increases to a certain value, it will perform a split operation, and one Region will be cut into two regions.

In general, the deployment of a distributed HBase database requires the cooperation of various components. HBase manages distributed applications through Zookeeper, which is equivalent to an administrator. HBase stores data in HDFS (distributed file system) and stores data through HDFS. Therefore, the overall idea of building a distributed HBase database is also here, that is, to integrate various services.

2). Build a happybase environment to prepare for writing python programs to access data in HBase

Happybase is mainly used to operate hbase. First, we need to install the happybse environment, then start hdfs and hbase, and finally test Python happybase.

To connect to the HBase database using happybase:
import happybase
happybase.Connection(host='localhost', port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b'_', compat='0.98', transport='buffered', protocol='binary')
Get connection instance
host: hostname
port: port
Timeout: timeout
autoconnect: whether the connection is opened directly
table_prefix: prefix used to construct table names
table_prefix_separator: used for table_prefix separator
compat: compatibility mode
Transport: transport mode
protocol: protocol

# Create a table
connection.create_table(
    'my_table',
    {
        'cf1': dict(max_versions=10),
        'cf2':dict(max_versions=1,block_cache_enabled=False),
        'cf3': dict(),  # use defaults
    }
)
At this point, we will pass connection.tables()View available tableļ¼ŒThe result is['my_table']
Created table Namely my_table Contains 3 column families: cf1,cf2,cf3

5. big data computing framework Spark

1). Installing and configuring the Scala development environment

Why should we install and configure scala?
Because the Spark framework bottom layer is developed in Scala, and the code written in scala is much simpler than java, installing and configuring the scala environment is the preparatory work we should complete before learning Spark.

2). Spark architecture

As above, we need to know what Spark is first?
Apache Spark is a fast and universal computing engine designed for large-scale data processing. We know MapReduce. Spark is a general parallel framework similar to Hadoop MapReduce. Spark has all the advantages of Hadoop MapReduce. It can even better use MapReduce algorithms that need iteration, such as data mining and machine learning.

Basic concepts:

Application: Spark application written by the user, including one Driver and multiple executors.

Driver: the driver in Spark runs the main function of the above Application and creates a SparkContext. The purpose of creating a SparkContext is to prepare the running environment of the Spark Application. In Spark, a SparkContext is responsible for communicating with the ClusterManager for resource Application, task allocation and monitoring. When the Executor is partially running, the driver is also responsible for closing the SparkContext.

Executor: a process running on the WorkerNode, which is responsible for running tasks.

RDD: elastic distributed data set, an abstract concept of distributed memory, provides a highly restricted shared memory model.

DAG: directed acyclic graph, which reflects the dependency between RDD S.

Task: the unit of work running on the Executor.

Job: a job contains multiple RDDS and various operations acting on the corresponding RDDS.

Stage: is the basic scheduling unit of a Job. A Job is divided into multiple groups of tasks. Each group of tasks is called stage, or TaskSet, which represents a Task set composed of a group of associated tasks without Shuffle dependencies.

Cluster Manager: refers to an external service that obtains resources on a cluster. There are currently three types:

Standalon e: Spark's native resource management. The Master is responsible for resource allocation;
Apache Mesos: a resource scheduling framework with good compatibility with Hadoop MR;
Hadoop Yarn: mainly refers to the ResourceManager in Yarn.

3). Spark operation process

  1. Build the running environment of Spark Application and start the SparkContext;
  2. SparkContext applies to the resource manager (which can be Standalone, Mesos, Yan) to run the Executor resource;
  3. The Executor applies for a Task from SparkContext;
  4. SparkContext builds DAG diagrams, decomposes the DAG diagrams into stages, encapsulates the stages into tasksets and sends them to the Task Scheduler. Finally, the Task Scheduler sends the tasks to the Executor for running;
  5. The Task runs on the Executor, and all resources are released after running.

4). RDD understanding detection

In Spark, all operations on data are nothing more than creating RDDS, converting existing RDDS, and invoking RDD operations for evaluation. Simply put, RDD is a collection that stores the data in the collection on different machines. Is that clear?
If you don't know, let's take another example,

# How to create a Spark RDD using set parallelization?
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List from 1 to 8
    data = [1,2,3,4,5,6,7,8]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects the contents of the RDD. rdd.collect() is a Spark Action operator, which will be described in detail in the following content. Its main function is to collect the data content of RDD
    sum = rdd.collect()

    # 5. print the contents of rdd
    print(sum)

    # 6. stop SparkContext
    sc.stop()
    #********** End **********#

Now that we know what RDD is, let's talk about its five features.

  1. A Partition is the basic unit of a data set. For RDD, each shard will be processed by a computing task and determine the granularity of parallel computing. You can specify the number of RDD slices when creating an RDD. If it is not specified, the default value will be used. The default value is the number of CPU cores allocated by the program.
  1. A function that evaluates each partition. RDD calculation in Spark is based on slices. Each RDD implements the compute function to achieve this goal. The compute function composes the iterators without saving the results of each calculation.
  1. Dependencies between RDDS. Each transformation of RDD will generate a new RDD, so a pipeline like dependency will be formed between RDDS. When some partition data is lost, Spark can recalculate the lost partition data through this dependency instead of recalculating all partitions of RDD.
  1. A Partitioner, that is, the partition function of RDD. Currently, Spark implements two types of sharding functions: hash based HashPartitioner and range based RangePartitioner. Only RDDS with key value can have a Partitioner, and the value of the Partitioner of RDDS without key value is None. The Partitioner function determines not only the number of partitions of the RDD itself, but also the number of partitions when the parent RDD Shuffle is output.
  1. A list that stores the preferred location for accessing each Partition. For an HDFS file, this list saves the location of the block where each Partition is located. According to the concept of "mobile data is not as good as mobile computing", Spark will try its best to allocate computing tasks to the storage location of data blocks to be processed when scheduling tasks.

5). SparkSQL

We started to write Spark SQL. Where do we start? The answer is SparkSession.
SparkSession is the entry to Spark SQL. To create a basic SparkSession, simply use SparkSession builder().

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

With SparkSession, the next step is to create a DataFrame
Using SparkSession, you can create a DataFrame from an existing RDD, Hive table, or Spark data source (json, parquet, jdbc, orc, libsvm, csv, text) and other format files
The following example creates a DataFrame for reading a Json file.

df =spark.read.json("/people.json")

people.json data is as follows:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

Using DataFrame

#Print Schema information
df.printSchema()
#Select name column
df.select("name").show()

adopt SQL Statement mode
#First register df as a view
df.createOrReplaceTempView("p")
#Through spark SQL ("SQL statement") execute SQL statement
sqlDF = spark.sql("SELECT name FROM p")
sqlDF.show()

# Write and save to the specified path
df.select("name", "age").write.format("parquet").save("F:\\test\\anamesAndAges")
# Overwrite the original data and save it to the f:\test path
df.select("name", "age").write.mode("overwrite").format("parquet").save("F:\\test")

2, Original title of educator

1.python Foundation

--------------------------------------------------------------
# Celsius conversion: Fahrenheit to Celsius
def Table_For(min,max):
    #Please print the list here with a for loop
    #   Please add implementation code here   #
    # ********** Begin *********#
    print("Fahrenheit degree\t\t centigrade\t\t Approximate centigrade")
    print("****************************************")
    for i in range(min, max + 10, 10):
        b = (i - 32)/1.8
        c = (i - 30) / 2
        print("%d\t\t%.1f\t\t%.1f" % (i,b, c))
    return 0
--------------------------------------------------------------
# Reading files and storing data as files
def solve(file):
    sum = []
    ans ={}
    with open(file, encoding='utf-8') as file_obj:
        lines = file_obj.readlines()
    for line in lines[2:]:
        k = line.rstrip().split('\t')
#        # print(k)
        sum.append(k)
    print(sum)
 solve('src/step1/constants.txt')
#********** End **********#

2.hdfs (multiple choice)

choice question? I think it might be some command sentences.

1). Command line under Linux

# create folder
mkdir /app
# create a file
touch hello.txt
# Switch to /opt directory
cd /opt
# Unzip the compressed file
tar -zxvf jdk-8u171-linux-x64.tar.gz
# move file
mv jdk1.8.0_171/ /app
# Edit profile
vim /etc/profile
# Make the configuration effective
source /etc/profile

# Format HDFS file
hadoop namenode -format
# Start hadoop
start-dfs.sh
# Validate Hadoop

2). Command line H under HDFS

# Create folder in HDFS
hadoop fs -mkdir /test
# Check whether the creation is successful
hadoop fs -ls /
# Upload files to HDFS
hadoop fs -put hello.txt /test
# see file
hadoop fs -cat /test/hello.txt

3.hbase (data table selection, design and operation)

1) Use python code to add, delete, and view data to the HBase table
1, Add data
table = connection.table('my_table')       #Get the table object first

cloth_data = {'cf1:content': 'jeans', 'cf1:price': '299', 'cf1:rating': '98%'}
hat_data = {'cf1:content': 'cap', 'cf1:price': '88', 'cf1:rating': '99%'}
# Using put, only one row of data can be stored at a time. If the row key already exists, it becomes modified data
table.put(row='www.test1.com', data=cloth_data)
table.put(row='www.test2.com', data=hat_data)
# Using batch to insert multiple rows of data at once
bat = table.batch()
bat.put('www.test5.com', {'cf1:price': 999, 'cf2:title': 'Hello Python', 'cf2:length': 34, 'cf3:code': 'A43'})
bat.put('www.test6.com', {'cf1:content': 'razor', 'cf1:price': 168, 'cf1:rating': '97%'})
bat.send()
# Use the context manager to manage batch, so you don't need to send data manually, that is, bat send()
with table.batch() as bat:
    bat.put('www.test5.com', {'cf1:price': '999', 'cf2:title': 'Hello Python', 'cf2:length': '34', 'cf3:code': 'A43'})
    bat.put('www.test6.com', {'cf1:content': u'razor', 'cf1:price': '168', 'cf1:rating': '97%'})
2, Delete data
with table.batch() as bat:
bat.delete('www.test1.com')
3, Retrieving data
 # Global scan a table
 for key, value in table.scan():
    print key, value
 # Retrieve a row of data
 row = table.row('www.test4.com') print row
 # Retrieving multiline data
rows = table.rows(['www.test1.com', 'www.test4.com'])print rows

4.mapReduce solution

1) Count the number of occurrences of each word in two text files.
# mapper.py
#! /usr/bin/python3
import sys
def main():
    # Accept data rows from the standard input stream, and call the mapper function for each row to process
    for line in sys.stdin:
        line = line.strip()
        mapper(line)
# Each line is divided into words, represented by word
# hadoop streaming requires that key value pairs be output in the form of "key \t value"
def mapper(line):
    words = line.split(' ')
    for word in words:
        if len(word.strip()) == 0:
            continue
        print("%s\t%s" % (word, 1))
if __name__ == '__main__':
    main()
-------------------------------------------------------------
#Reduce.py
#! /usr/bin/python3
import sys
from operator import itemgetter
# Sum values and output in the form of "word frequency".
def reducer(k, values):
    print("%s\t%s" % (k, sum(values)))
def main():
    current_key = None
    values = []
    _key, _value = '', 0
    for line in sys.stdin:
        line = line.strip()
        _key, _value = line.split('\t', 1)
        _value = eval(_value)
        if current_key == _key:
            values.append(_value)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(_value)
            current_key = _key
    # Don't forget the last key value pair
    if current_key == _key:
        reducer(current_key, values)
if __name__ == '__main__':
    main()
2) Merge the two files and eliminate duplicates
# mapper.py
#! /usr/bin/python3
import sys

def main():
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    ########## Begin  ###############
    num,st = line.split(' ')
    print("%s\t%s" %(num,st))
   ###########  End    #############
if __name__ == '__main__':
    main()
--------------------------------------------------------------
# Reduce.py
#! /usr/bin/python3
import sys

def reducer(k, values):
    ############  Begin   ################
    for value in sorted(list(set(values))):
        print("%s\t%s" %(k,value))
    ############   End    ################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    for line in sys.stdin:
        line = line.strip()
        akey, avalue = line.split('\t')
        
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
  
            values.append(avalue)
            current_key = akey
    
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()
        
3) Seek to mine the parent-child generation relationship and give the table of grandson generation relationship
# mapper.py
#! /usr/bin/python3
import sys
def main():
    for line in sys.stdin:
        line = line.strip()
        if line.startswith('child'):
            pass
        else:
            mapper(line)
           
def mapper(line):
    ###############  Begin   ############
    child, parent = line.split(' ')
    print("%s\t-%s" % (child, parent))
    print("%s\t+%s" % (parent, child))
    
    ###############   End    #############

if __name__ == '__main__':
    main()

# Reduce.py
#! /usr/bin/python3
import sys

def reducer(k, values):
    ##############    Begin    ################
    grandparents = []
    grandson = []
    for v in values:
        if v.startswith('-'):
            grandparents.append(v[1:])
        else:
            grandson.append(v[1:])
    for i in grandson:
        for j in grandparents:
            print("%s\t%s" % (i, j))
    ##############   End      #################

def main():
    current_key = None
    values = []
    akey, avalue = None, None
    print("grand_child\tgrand_parent")
    for line in sys.stdin:
        line = line.strip()
        try:
            akey, avalue = line.split('\t')
        except:
            continue
        if current_key == akey:
            values.append(avalue)
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(avalue)
            current_key = akey
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()

4) Data cleaning
#! /usr/bin/python3
# mapper.py
import sys
from dbhelper import DBHelper
import codecs
import time

# Get the "province / City Code: Province / city name" item and save it in the dictionary regions;
# Get the "phone number: name" item and save it in the dictionary userphones.
regions = DBHelper.get_region()
userphones = DBHelper.get_userphones()

def main():
    # Correctly output utf-8 encoded Chinese characters
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    for line in sys.stdin:
        line = line.strip()
        mapper(line)

def mapper(line):
    # Output a string like "Deng Er, Zhang Qian, 13666666615151889601, 2018-03-29 10:58:122018-03-29 10:58:42,30, Heilongjiang Province, Shanghai"
    # The reduce phase is not required for this question, and the content required by the question can be output. The form of "key \t value" is not required.
    ##########  begin      ##############
    items = line.split(',')
    caller = userphones.get(items[0])
    receiver = userphones.get(items[1])
    start_time = int(items[2])
    end_time = int(items[3])
    region_caller = regions.get(items[4])
    region_receiver = regions.get(items[5])
    print(caller,receiver,sep = ',',end = ',')
    print(','.join(items[:2]), end = ',')
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)),end=',')
    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)),end=',')
    print(str(end_time - start_time), end = ',')
    print(region_caller, region_receiver, sep = ',')    
    

    ###########  End  #################

if __name__ == '__main__':
    main()
---------------------------------------------------------------
# dbhelper.py
import pymysql
import sys
import codecs

class DBHelper:
    def get_connection():
        # Establish a connection "conn" to the mysql server according to the credentials provided in the title. Note that the character set is specified as "utf8mb4"
        ########  Begin   ############
        conn = pymysql.connect(
            host = 'localhost',
            user = 'root',
            password = '123123',
            db = 'mydb',
            port = 3306,
            charset = 'utf8mb4'
        )
        ########  End    ############    
        return conn

    @classmethod
    def get_region(cls):
        conn = cls.get_connection()
        regions = dict()
        with conn.cursor() as cur:
            #Query all provincial and municipal codes and names from the database and save them to the dictionary regions.
            ############  Begin ###################
            sqltxt = 'select CodeNum, Address from allregion;'
            cur.execute(sqltxt)
            for row in cur.fetchall():
                regions[row[0]] = row[1].strip()
     
            ############  End    #################
        conn.close()
        return regions

    @classmethod
    def get_userphones(cls):
        conn = cls.get_connection()
        userphones = dict()
        with conn.cursor() as cur:
        #Query all phone numbers and corresponding names from the database and save them to the dictionary userphones.
        ############  Begin ###################
            sqltxt = 'select phone, trueName from userphone;'
            cur.execute(sqltxt)
            for row in cur.fetchall():
                userphones[row[0]] = row[1]    
            ############  End    #################
        conn.close()
        return userphones

def main():
    sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
    region = DBHelper.get_region()
    users = DBHelper.get_userphones() 
    '''
    for k, v in region.items():
        print(k, ':', v)
    print('-------------')

    for k, v in users.items():
        print(k, ':', v)
    '''
if __name__ == '__main__':
    main()

5.SparkRDD

Spark submit calculation of PI

1). Read external dataset create RDD

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == '__main__':
    #********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")
    
    # Text file RDD can use the textFile method to create SparkContext. This method requires a URI file (on the machine with the local path, or a URI such as hdfs://, s3a://) and reads it as a collection of rows
    # 2. read the local file. The URI is: /root/wordcount txt
    distFile = sc.textFile("/root/wordcount.txt")
    
    # 3. use rdd Collect() collects the contents of the RDD. rdd.collect() is a Spark Action operator, which will be described in detail in the following content. Its main function is to collect the data content of RDD
    s = distFile.collect()
    
    # 4. print the contents of rdd
    print(s)

    # 5. stop SparkContext
    sc.stop()

    #********** End **********#

2). map operator is used to convert an even number to the square of the number; An odd number is converted to its cube.

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local","Simple App")

    # 2. create a List from 1 to 5
    list1 = [1,2,3,4,5] 

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(list1)

    # 4. use rdd Collect() collects the elements of RDD.
    print(rdd.collect())

    """
    use map Operator, will rdd Data for (1, 2, 3, 4, 5) Perform conversion according to the following rules:
    Requirements:
        Even number is converted to the square of the number
        Odd number converted to cube of the number
    """
    # 5. use the map operator to complete the above requirements
    rdd_map = rdd.map(lambda x : x*x if x%2 == 0 else x*x*x)
    
    # 6. use rdd Collect() collects the elements that complete the map transformation
    print(rdd_map.collect())

    # 7. stop SparkContext
    sc.stop()

    #********** End **********#

3). Use the mapPartitions operator to combine the string and the length of the string into a tuple.

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

#********** Begin **********#
def f(iterator):
    list1 = []
    for x in iterator:
        length = len(x)
        list1.append((x,length))
    return list1

#********** End **********#

if __name__ == "__main__":
    #********** Begin **********#
    
    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. a List with contents of ("dog", "salmon", "salmon", "rat", "elephant")
    data = ["dog", "salmon", "salmon", "rat", "elephant"]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects the elements of RDD.
    print(rdd.collect())

    """
    use mapPartitions Operator, will rdd Data for ("dog", "salmon", "salmon", "rat", "elephant") Perform conversion according to the following rules:
    Requirements:
        Combines a string with its length into a tuple, for example:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """

    # 5. use mapPartitions operator to complete the above requirements
    partitions = rdd.mapPartitions(f)

    # 6. use rdd Collect() collects elements that complete mapPartitions conversion
    print(partitions.collect())

    # 7. stop SparkContext
    sc.stop()

    #********** End **********#

4). Use the filter operator to filter out all odd numbers in rdd(1, 2, 3, 4, 5, 6, 7, 8).

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List from 1 to 8
    data = [1,2,3,4,5,6,7,8]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects the elements of RDD.
    print(rdd.collect())

    """
    use filter Operator, will rdd Data for (1, 2, 3, 4, 5, 6, 7, 8) Perform conversion according to the following rules:
    Requirements:
        Filter out rdd Odd numbers in
    """
    # 5. use filter operator to complete the above requirements
    rdd_filter = rdd.filter(lambda x:x%2==0)

    # 6. use rdd Collect() collects elements that complete the filter transformation
    print(rdd_filter.collect())

    # 7. stop SparkContext
    sc.stop()

    #********** End **********#

5) . Use the flatMap operator to merge RDD elements: ([1,2,3], [4,5,6]) -- > (1,2,3,4,5,6)

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
       #********** Begin **********#
       
    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")
    # 2. create a List of [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
    data = [[1,2,3],[4,5,6],[7,8,9]]
    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)
    # 4. use rdd Collect() collects the elements of RDD.
    print(rdd.collect())
    """
        use flatMap Operator, will rdd Data for ([1, 2, 3], [4, 5, 6], [7, 8, 9]) Perform conversion according to the following rules:
        Requirements:
            merge RDD For example:
                            ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                            ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
        """
    # 5. use filter operator to complete the above requirements
    flat_map = rdd.flatMap(lambda x:x)
    # 6. use rdd Collect() collects elements that complete the filter transformation
    print(flat_map.collect())
    # 7. stop SparkContext
    sc.stop()
    #********** End **********#

6). Use the distinct operator to de duplicate the data in the rdd.

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List containing (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)
    data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects elements of RDD
    print(rdd.collect())

    """
       use distinct Operator, will rdd Data for (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) Perform conversion according to the following rules:
       Requirements:
           Element de duplication, for example:
                        1,2,3,3,2,1  --> 1,2,3
                        1,1,1,1,     --> 1
       """
    # 5. use the distinct operator to complete the above requirements
    distinct = rdd.distinct()

    # 6. use rdd Collect() collects the elements that complete the distinct transformation
    print(distinct.collect())

    # 7. stop SparkContext
    sc.stop()

    #********** End **********#

7). Use the sortBy operator to sort (ascending) the data in rdd

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List containing (1, 3, 5, 7, 9, 8, 6, 4, 2)
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects elements of RDD
    print(rdd.collect())


    """
       use sortBy Operator, will rdd Data for (1, 3, 5, 7, 9, 8, 6, 4, 2) Perform conversion according to the following rules:
       Requirements:
           Element sorting, for example:
            5,4,3,1,2  --> 1,2,3,4,5
       """
    # 5. use sortBy operator to complete the above requirements
    by = rdd.sortBy(lambda x: x)

    # 6. use rdd Collect() collects the elements that complete the sortBy transformation
    print(by.collect())

    # 7. stop SparkContext
    sc.stop()

    #********** End **********#

8). Use the sortByKey operator to sort (ascending) the data in the rdd.

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List with contents of [(B',1),('A',2),('C',3)]
    data=[('B',1),('A',2),('C',3)]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects elements of RDD
    print(rdd.collect())

    """
       use sortByKey Operator, will rdd Data for ('B', 1), ('A', 2), ('C', 3) Perform conversion according to the following rules:
       Requirements:
           Element sorting, for example:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
       """
    # 5. use sortByKey operator to complete the above requirements
    key = rdd.sortByKey()

    # 6. use rdd Collect() collects the elements that complete the sortByKey transformation
    print(key.collect())

    # 7. stop SparkContext
    sc.stop()

    # ********** End **********#

9). Use mapValues operator to convert the even number to the square of the number and the odd number to the cube of the number

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List with [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]
    data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects elements of RDD
    print(rdd.collect())

    """
           use mapValues Operator, will rdd Data for ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) Perform conversion according to the following rules:
           Requirements:
               Element( key,value)of value Do the following:
                                                Even number is converted to the square of the number
                                                Odd number converted to cube of the number
    """
    # 5. use mapValues operator to complete the above requirements
    values = rdd.mapValues(lambda x: x*x if x%2==0 else x*x*x)

    # 6. use rdd Collect() collects the elements that complete the mapValues transformation
    print(values.collect())

    # 7. stop SparkContext
    sc.stop()

    # ********** End **********#

10). Use the reduceByKey operator to accumulate the data in RDD (key value type).

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List with [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. use rdd Collect() collects elements of RDD
    print(rdd.collect())

    """
          use reduceByKey Operator, will rdd Data for[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] Perform conversion according to the following rules:
          Requirements:
              Element( key-value)of value Accumulation operation, for example:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """
    # 5. use the reduceByKey operator to complete the above requirements
    # ruduceBy = rdd.reduceByKey(lambda x,y:x+y).collect()

    # 6. use rdd Collect() collects the elements that complete the reduceByKey conversion
    print(rdd.reduceByKey(lambda x,y:x+y).collect())

    # 7. stop SparkContext
    sc.stop()

    # ********** End **********#

11). Actions - Common operators

# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1. initialize SparkContext, which is the entry of Spark Program
    sc = SparkContext("local", "Simple App")

    # 2. create a List with contents of [1, 3, 5, 7, 9, 8, 6, 4, 2]
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2];

    # 3. create rdd through SparkContext parallelization
    rdd = sc.parallelize(data)

    # 4. collect all elements of rdd and print out
    print(rdd.collect())

    # 5. count the number of rdd elements and print out
    print(rdd.count())

    # 6. get the first element of rdd and print it
    print(rdd.first())

    # 7. get the first three elements of rdd and print out
    print(rdd.take(3))

    # 8. aggregate all elements of rdd and print out
    print(rdd.reduce(lambda x,y:x+y))

    # 9. stop SparkContext
    sc.stop()

    # ********** End **********#

6. SparkSQL

1). Statistics of fighter flight performance using Spark SQL

# coding=utf-8


from pyspark.sql import SparkSession

#**********Begin**********#

#Create SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()
#Read data in /root/jun.json
df =spark.read.json("/root/jun.json")
#Create view
df.createOrReplaceTempView("table1")
#According to the statistics, the world's top three fighter planes
sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`Maximum flight speed`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `name` from table1 order by SPEED desc LIMIT 3")
#Save results
sqlDF.write.format("csv").save("/root/airspark")

#**********End**********#
spark.stop()

2). Use Spark SQL to calculate the proportion of fighter developed by each R & D unit

# coding=utf-8


from pyspark.sql import SparkSession

#**********Begin**********#

#Create SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.crossJoin.enabled", "true") \
    .master("local") \
    .getOrCreate()
    
#Read data in /root/jun.json
df =spark.read.json("/root/jun.json").coalesce(1)
#Create view
df.createOrReplaceTempView("table1")

#Calculate the proportion of fighter planes developed by various R & D units in all fighter planes in the world
sqlDF = spark.sql("select concat(cast(round(count(`R & D unit`)*100/(select count(`R & D unit`) from table1 where `R & D unit` is not null and `name` is not null ),2) as float),'%'),`R & D unit` from table1 where `R & D unit` is not null and `name` is not null group by `R & D unit`")

#Save results
sqlDF.write.format("csv").save("/root/airspark")
#**********End**********#

spark.stop()

3). Taxi track data cleaning

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.master("local").appName("demo").getOrCreate()
    
    #**********begin**********#
    df = spark.read.option("header", True).option("delimiter", "\t").csv("/root/data.csv")
    df.createOrReplaceTempView("table1")
    DataFrame = spark.sql(
    """
    select\
    regexp_replace(TRIP_ID,'[$@]','')  TRIP_ID,regexp_replace(CALL_TYPE,'[$@]','') CALL_TYPE,regexp_replace(ORIGIN_CALL,'[$@]','') ORIGIN_CALL,regexp_replace(TAXI_ID,'[$@]','') TAXI_ID,regexp_replace(ORIGIN_STAND,'[$@]','') ORIGIN_STAND,regexp_replace(TIMESTAMP,'[$@]','') TIMESTAMP, regexp_replace(POLYLINE,'[$@,-.\\\\[\\\\]]','') POLYLINE\
    from table1
    """)
    DataFrame.show()
    #**********end**********#
    spark.stop()

Posted by awpti on Fri, 03 Jun 2022 16:00:03 +0530