Hadoop performance optimization

Hadoop performance optimization

Small file problem

HDFS and MapReduce are designed for large files. They are inefficient in processing small files and consume memory resources. Each small file will occupy a block, generate an InputSplit, and generate a map task. In this way, the start time of the map task is very long, and the execution time of the task is very short. The solution is to use containers to organize small files. HDFS provides two containers: SequenceFile and MapFile.

  1. SequenceFile
    Sequencefile is a binary file provided by Hadoop. This binary file directly serializes <key, value> pairs into a file
    Generally, this kind of file merging can be used for small files, that is, the file name is used as a key, and the file content is serialized into a large file as a value
    Note: sequencefile requires a process of merging files. The files are large, and the merged files will not be easy to view. You must view each small file through traversal
    SequenceFile
    package org.example.mapreduce;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    
    import java.io.File;
    import java.nio.charset.StandardCharsets;
    
    public class SmallFileSeq {
    
        public static void main(String[] args) throws Exception {
            write("D:\\samllFile", "/SequenceFile");
        }
    
        /**
         * Generate SequenceFile file
         * @param inputDir Directory where small files are stored locally
         * @param outPutFile hdfs directory for output compressed files
         * @throws Exception
         */
        private static void write(String inputDir, String outPutFile) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
    
            // Delete if output file exists
            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(new Path(outPutFile), true);
    
    
            // Construct an option array with three elements
            // 1. output path
            // 2. key type
            // 3. value type
            SequenceFile.Writer.Option[] option = new SequenceFile.Writer.Option[]{
                    SequenceFile.Writer.file(new Path(outPutFile)),
                    SequenceFile.Writer.keyClass(Text.class),
                    SequenceFile.Writer.valueClass(Text.class)
            };
    
            // Create a Write instance
            SequenceFile.Writer writer = SequenceFile.createWriter(conf, option);
    
            // Specify the directory of files to compress
            File inputDirPath = new File(inputDir);
            if (inputDirPath.isDirectory()) {
                for (File file : inputDirPath.listFiles()) {
                    // Get all files, and directly read all small files into memory
                    String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
                    // Get file name
                    String fileName = file.getName();
                    writer.append(new Text(fileName), new Text(content));
                }
            }
    
            writer.close();
        }
    
        /**
         * Read SequenceFile
         * @param inputFile
         * @throws Exception
         */
        private static void read(String inputFile) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
            // Create reader
            SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
    
            // Read data circularly
            Text key = new Text();
            Text value = new Text();
            while (reader.next(key, value)) {
                System.out.println("file name : " + key);
                System.out.println("Document content : " + value);
            }
    
            reader.close();
        }
    }

    In the mapreduce task, you need to specify the input processing class and reassign the key of the map function

    Operate MapReduce of SequenceFile
    public class WordCountJobSeq {
        public static void main(String[] args) {
            try {
                if (args.length != 2) {
                    System.exit(1);
                }
    
                // Create a configuration class
                Configuration conf = new Configuration();
                // Create a task
                Job job = Job.getInstance(conf);
                
                
    
                // ...  ellipsis
                
    
                
                // By default, the Text processing class is used. When processing SequenceFile, you need to specify the processing class
                job.setInputFormatClass(SequenceFileInputFormat.class);
    
                
    
                // Submit job
                job.waitForCompletion(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // The first parameter of the generic type and the first parameter of the map function should be LongWriter by default to represent the byte offset per line
        // When reading SequenceFile, it should be of type Text, and the value should be the file name in the compressed package
        public static class  MyMapper extends Mapper<Text, Text, Text, LongWritable> {
            @Override
            protected void map(Text key, // By default, the key is the offset bytes. When reading SequenceFile, the key is the Text type and the content is the file name
                               Text value,
                               Mapper<Text, Text, Text, LongWritable>.Context context
            ) throws IOException, InterruptedException {
                // ...
            }
        }
    
        // Same as reduce
        public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(
                    Text key,
                    Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context
            ) throws IOException, InterruptedException {
                 // ...
            }
        }
    }
  2. MapFile
    MapFile is the sequencefile after sorting. MapFile consists of two parts, index and data
    As the data index of the file, index mainly records the key value of each Record and the offset position of the Record in the file
    When the MapFile is accessed, the index file will be loaded into memory. Through the index mapping relationship, you can quickly locate the file location of the specified Record
    Advantages: high retrieval efficiency
    Disadvantages: some memory is consumed to store index index
    MapFile
    package org.example.mapreduce;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.MapFile;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    
    import java.io.File;
    import java.nio.charset.StandardCharsets;
    
    public class SmallFileMap {
    
        public static void main(String[] args) throws Exception {
            write("D:\\samllFile", "/MapFile");
            read("/MapFile");
        }
    
        /**
         * Generate MapFile file
         * @param inputDir Directory where small files are stored locally
         * @param outPutDir The hdfs directory of the output compressed file has two files, one index and one data file
         * @throws Exception
         */
        private static void write(String inputDir, String outPutDir) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
    
            // Delete if output file exists
            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(new Path(outPutDir), true);
    
    
            // Construct an option array with two elements
            // 1. key type
            // 2. value type
            SequenceFile.Writer.Option[] options = new SequenceFile.Writer.Option[]{
                    MapFile.Writer.keyClass(Text.class),
                    MapFile.Writer.valueClass(Text.class)
            };
    
            // Create a Write instance
            MapFile.Writer writer = new MapFile.Writer(conf, new Path(outPutDir), options);
    
            // Specify the directory of files to compress
            File inputDirPath = new File(inputDir);
            if (inputDirPath.isDirectory()) {
                for (File file : inputDirPath.listFiles()) {
                    // Get all files, and directly read all small files into memory
                    String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
                    // Get file name
                    String fileName = file.getName();
                    writer.append(new Text(fileName), new Text(content));
                }
            }
    
            writer.close();
        }
    
        /**
         * Read MapFile
         * @param inputFile Read MapFile file path
         * @throws Exception
         */
        private static void read(String inputFile) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
            // Create reader
            MapFile.Reader reader = new MapFile.Reader(new Path(inputFile), conf);
    
            // Read data circularly
            Text key = new Text();
            Text value = new Text();
            while (reader.next(key, value)) {
                System.out.println("file name : " + key);
                System.out.println("Document content : " + value);
            }
    
            reader.close();
        }
    }

 

Data skew problem

In order to improve efficiency, multiple Reduce processes are started in parallel. At this time, data can not be partitioned into different processes. You can use job Getpartitionerclass() and job Setpartitionerclass() queries and sets the partition class. By default, hashpartitioner class

job.getPartitionerClass();

// getPartitionerClass source code
  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }

// Partition classes use this by default
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

When the MapReduce program is executed, most of the Reduce nodes are completed, but one or several Reduce nodes run very slowly, resulting in a long processing time for the whole program. Specifically, the rule phase is stuck.

Example: for a wordcount task, the data is ten numbers from 0 to 9, of which 910w are "5", and 90w are the other nine numbers. This is when the data is skewed. To speed up, start multiple reduce tasks. In the reduce phase, the number "5" will be assigned to the same reduce task for execution, and the execution speed of this reduce task will be slower than other tasks.

resolvent:

  1. Increase the number of Reduce tasks (effective when the data skew is not too serious)
  2. Scatter skewed data
        public static class  MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key,
                               Text value,
                               Mapper<LongWritable, Text, Text, LongWritable>.Context context
            ) throws IOException, InterruptedException {
                // Split word
                String[] words = value.toString().split(" ");
                // Iteratively separated words
                for (String word : words) {
                    // Encapsulate the iterated words in the form of <k2, v2>
                    
                    /*****************************************************************************/
                    
                    if ("5".equals(word)) { // The data is inclined to the character "5", and the data is scattered here
                        word = "5" + "_" + new Random().nextInt(10); // Change 5 to 5_ 0 ~ 5_ nine
                    }
                    
                    // The key will also change in the obtained results, and then you need to use mapreduce to aggregate the data
    
                    /*****************************************************************************/
                    
                    Text keyOut = new Text(word);
                    LongWritable valueOut = new LongWritable(1L);
                    // Write <k2, v2>
                    context.write(keyOut, valueOut);
                }
            }
        }

    The result is similar to this, which needs another program to process.

Tags: Hadoop

Posted by delphi123 on Sat, 04 Jun 2022 04:11:54 +0530