Hadoop series
Note: If you think the blog is good, don’t forget to like and bookmark it. I will update the content related to artificial intelligence and big data every week. Most of the content is original, Python Java Scala SQL code, CV NLP recommendation system, etc., Spark Flink Kafka Hbase Hive Flume, etc. ~ What is written is pure dry goods, interpretation of various top conference papers, and progress together.
Today, I will continue to share with you the basic introduction of MapReduce 7
#educatedvalleyIT LearningTechnical Support
foreword
1. MapReduce will split a large computing task into small tasks, let these small tasks be processed in different computers, and finally summarize the results of these small tasks as a whole.
2. MapReduce is divided into two stages, a Map stage is responsible for task splitting, and the Reduce stage is responsible for task aggregation
3. The entire MapReduce workflow can be divided into three stages: map, shuffle, and reduce.
The author uses another simple case to illustrate how to use MapReduce to realize the Join operation between large and small tables. Also called MapJoin
1. What is MapJoin?
1. Store the small table in the distributed cache, and then read the small table data in the distributed cache to the Map collection of each MapTask locally
2. Read the large table through the normal MapReduce process, and then join with the data in the Map collection
3. Map-side join is generally used for small tables to join large tables, and Map-side join does not have Reduce
2. Use steps
1.1. Data preparation
The first commodity table (small table)
The second order table (large table)
The task is very simple. Both tables have the field of product ID. Requirements: Associate the two tables through the product ID.
2.Map stage
By rewriting the setup method, the small table in the cache is put into the map collection, and then the large table is used for association.
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; public class Mapper_demo extends Mapper<LongWritable, Text,Text,NullWritable> { HashMap<String, String> goodsMap = new HashMap<>(); /** * setup The method will be executed before the map method is executed, and it will only be executed once, mainly for initialization * @param context * @throws IOException * @throws InterruptedException */ //Read the small table from the distributed cache and store it in the Map collection @Override protected void setup(Context context) throws IOException, InterruptedException { //1: Get the input stream of the file in the distributed cache BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt"))); String line = null; while ((line = bufferedReader.readLine()) != null){ String[] array = line.split("\\|"); goodsMap.put(array[0], array[2]); } /* {100101,6 Sichuan jelly oranges, about 180g/piece} {100102,Xianfeng fruit Zigui navel orange Chinese red} */ } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //1: get K2 String[] array = value.toString().split("\\|"); String k2 = array[1]; String v2 = array[0] + "\t" + array[2]; //2: Join K2 and Map collections String mapValue = goodsMap.get(k2); context.write(new Text(v2 + "\t" + mapValue), NullWritable.get()); } }
2.Driver operation entry
There is no Reducer stage, and the Driver is directly written for testing.
The main thing here is to put the small table into the cache, and the large table reads data normally and goes to the map stage.
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class Driver_demo { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "Reducer_Join"); job.setJarByClass(Driver_demo.class); FileInputFormat.addInputPath(job,new Path("hdfs://node1:8020/input/goods/itheima_order_goods.txt")); job.setMapperClass(Mapper_demo.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //Put small tables into distributed cache job.addCacheFile(new URI("hdfs://node1:8020/input/goods/itheima_goods.txt")); //2.5 Specify the output path Path outPath = new Path("hdfs://node1:8020/output/goods_map_join"); FileOutputFormat.setOutputPath(job,outPath); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node1:8020"), new Configuration()); boolean exists = fileSystem.exists(outPath); if (exists){ fileSystem.delete(outPath,true); } //3. Submit yarn for execution boolean bl = job.waitForCompletion(true); //quit System.exit(bl ? 0 : 1); } }
Summarize
The previous case is that Reduce Join is mainly a Join operation between large tables. This case is Map Join, which is mainly used for large tables to join small tables. Improve efficiency.