Hadoop performance optimization
Small file problem
HDFS and MapReduce are designed for large files. They are inefficient in processing small files and consume memory resources. Each small file will occupy a block, generate an InputSplit, and generate a map task. In this way, the start time of the map task is very long, and the execution time of the task is very short. The solution is to use containers to organize small files. HDFS provides two containers: SequenceFile and MapFile.
- SequenceFile
Sequencefile is a binary file provided by Hadoop. This binary file directly serializes <key, value> pairs into a file
Generally, this kind of file merging can be used for small files, that is, the file name is used as a key, and the file content is serialized into a large file as a value
Note: sequencefile requires a process of merging files. The files are large, and the merged files will not be easy to view. You must view each small file through traversal
SequenceFilepackage org.example.mapreduce; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import java.io.File; import java.nio.charset.StandardCharsets; public class SmallFileSeq { public static void main(String[] args) throws Exception { write("D:\\samllFile", "/SequenceFile"); } /** * Generate SequenceFile file * @param inputDir Directory where small files are stored locally * @param outPutFile hdfs directory for output compressed files * @throws Exception */ private static void write(String inputDir, String outPutFile) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ip:9000"); // Delete if output file exists FileSystem fileSystem = FileSystem.get(conf); fileSystem.delete(new Path(outPutFile), true); // Construct an option array with three elements // 1. output path // 2. key type // 3. value type SequenceFile.Writer.Option[] option = new SequenceFile.Writer.Option[]{ SequenceFile.Writer.file(new Path(outPutFile)), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class) }; // Create a Write instance SequenceFile.Writer writer = SequenceFile.createWriter(conf, option); // Specify the directory of files to compress File inputDirPath = new File(inputDir); if (inputDirPath.isDirectory()) { for (File file : inputDirPath.listFiles()) { // Get all files, and directly read all small files into memory String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8); // Get file name String fileName = file.getName(); writer.append(new Text(fileName), new Text(content)); } } writer.close(); } /** * Read SequenceFile * @param inputFile * @throws Exception */ private static void read(String inputFile) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ip:9000"); // Create reader SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile))); // Read data circularly Text key = new Text(); Text value = new Text(); while (reader.next(key, value)) { System.out.println("file name : " + key); System.out.println("Document content : " + value); } reader.close(); } }
In the mapreduce task, you need to specify the input processing class and reassign the key of the map function
Operate MapReduce of SequenceFilepublic class WordCountJobSeq { public static void main(String[] args) { try { if (args.length != 2) { System.exit(1); } // Create a configuration class Configuration conf = new Configuration(); // Create a task Job job = Job.getInstance(conf); // ... ellipsis // By default, the Text processing class is used. When processing SequenceFile, you need to specify the processing class job.setInputFormatClass(SequenceFileInputFormat.class); // Submit job job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } // The first parameter of the generic type and the first parameter of the map function should be LongWriter by default to represent the byte offset per line // When reading SequenceFile, it should be of type Text, and the value should be the file name in the compressed package public static class MyMapper extends Mapper<Text, Text, Text, LongWritable> { @Override protected void map(Text key, // By default, the key is the offset bytes. When reading SequenceFile, the key is the Text type and the content is the file name Text value, Mapper<Text, Text, Text, LongWritable>.Context context ) throws IOException, InterruptedException { // ... } } // Same as reduce public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce( Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context ) throws IOException, InterruptedException { // ... } } }
- MapFile
MapFile is the sequencefile after sorting. MapFile consists of two parts, index and data
As the data index of the file, index mainly records the key value of each Record and the offset position of the Record in the file
When the MapFile is accessed, the index file will be loaded into memory. Through the index mapping relationship, you can quickly locate the file location of the specified Record
Advantages: high retrieval efficiency
Disadvantages: some memory is consumed to store index index
MapFilepackage org.example.mapreduce; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import java.io.File; import java.nio.charset.StandardCharsets; public class SmallFileMap { public static void main(String[] args) throws Exception { write("D:\\samllFile", "/MapFile"); read("/MapFile"); } /** * Generate MapFile file * @param inputDir Directory where small files are stored locally * @param outPutDir The hdfs directory of the output compressed file has two files, one index and one data file * @throws Exception */ private static void write(String inputDir, String outPutDir) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ip:9000"); // Delete if output file exists FileSystem fileSystem = FileSystem.get(conf); fileSystem.delete(new Path(outPutDir), true); // Construct an option array with two elements // 1. key type // 2. value type SequenceFile.Writer.Option[] options = new SequenceFile.Writer.Option[]{ MapFile.Writer.keyClass(Text.class), MapFile.Writer.valueClass(Text.class) }; // Create a Write instance MapFile.Writer writer = new MapFile.Writer(conf, new Path(outPutDir), options); // Specify the directory of files to compress File inputDirPath = new File(inputDir); if (inputDirPath.isDirectory()) { for (File file : inputDirPath.listFiles()) { // Get all files, and directly read all small files into memory String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8); // Get file name String fileName = file.getName(); writer.append(new Text(fileName), new Text(content)); } } writer.close(); } /** * Read MapFile * @param inputFile Read MapFile file path * @throws Exception */ private static void read(String inputFile) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ip:9000"); // Create reader MapFile.Reader reader = new MapFile.Reader(new Path(inputFile), conf); // Read data circularly Text key = new Text(); Text value = new Text(); while (reader.next(key, value)) { System.out.println("file name : " + key); System.out.println("Document content : " + value); } reader.close(); } }
Data skew problem
In order to improve efficiency, multiple Reduce processes are started in parallel. At this time, data can not be partitioned into different processes. You can use job Getpartitionerclass() and job Setpartitionerclass() queries and sets the partition class. By default, hashpartitioner class
job.getPartitionerClass(); // getPartitionerClass source code public Class<? extends Partitioner<?,?>> getPartitionerClass() throws ClassNotFoundException { return (Class<? extends Partitioner<?,?>>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); } // Partition classes use this by default public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
When the MapReduce program is executed, most of the Reduce nodes are completed, but one or several Reduce nodes run very slowly, resulting in a long processing time for the whole program. Specifically, the rule phase is stuck.
Example: for a wordcount task, the data is ten numbers from 0 to 9, of which 910w are "5", and 90w are the other nine numbers. This is when the data is skewed. To speed up, start multiple reduce tasks. In the reduce phase, the number "5" will be assigned to the same reduce task for execution, and the execution speed of this reduce task will be slower than other tasks.
resolvent:
- Increase the number of Reduce tasks (effective when the data skew is not too serious)
- Scatter skewed data
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context ) throws IOException, InterruptedException { // Split word String[] words = value.toString().split(" "); // Iteratively separated words for (String word : words) { // Encapsulate the iterated words in the form of <k2, v2> /*****************************************************************************/ if ("5".equals(word)) { // The data is inclined to the character "5", and the data is scattered here word = "5" + "_" + new Random().nextInt(10); // Change 5 to 5_ 0 ~ 5_ nine } // The key will also change in the obtained results, and then you need to use mapreduce to aggregate the data /*****************************************************************************/ Text keyOut = new Text(word); LongWritable valueOut = new LongWritable(1L); // Write <k2, v2> context.write(keyOut, valueOut); } } }
The result is similar to this, which needs another program to process.