# How to process 3 billion data in minutes with java

scene description

There is a 10G file data, which contains integers between 18 and 70, which respectively represent the statistics of the number of people aged 18-70. Assuming that the age range is evenly distributed, it represents the age of all users in the system. Find out the number with the most repetitions. There is a computer with 4G memory and 2-core CPU. Please write an algorithm to implement it

```        23,31,42,19,60,30,36,........
```

Simulation data

In Java, an integer occupies 4 bytes, simulating 10G is about 3 billion data, and the append mode is used to write 10G data to the hard disk. Write one line for every 1 million records, about 4M lines, 10G about 2500 lines of data.

```package bigdata;

import java.io.*;
import java.util.Random;

/**
* @Desc:
* @Author: bingbing
* @Date: 2022/5/4 0004 19:05
*/
public class GenerateData {
private static Random random = new Random();

public static int generateRandomData(int start, int end) {
return random.nextInt(end - start + 1) + start;
}

/**
* Generate 10G of 1-1000 data in D drive
*/
public void generateData() throws IOException {
File file = new File("D:\ User.dat");
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}

int start = 18;
int end = 70;
long startTime = System.currentTimeMillis();
BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));
for (long i = 1; i < Integer.MAX_VALUE * 1.7; i++) {
String data = generateRandomData(start, end) + ",";
bos.write(data);
//Every 1 million records into a row, 1 million data is about 4M
if (i % 1000000 == 0) {
bos.write("\n");
}
}
System.out.println("write complete! Total time spent:" + (System.currentTimeMillis() - startTime) / 1000 + " s");
bos.close();
}

public static void main(String[] args) {
GenerateData generateData = new GenerateData();
try {
generateData.generateData();
} catch (IOException e) {
e.printStackTrace();
}

}
}
```

The above code adjustment parameters are executed twice, and the data of 10 G is collected in the User.dat file of the D disk. After preparing 10G data, then write how to process this data.

Scenario Analysis

10G data is much larger than the current running memory, and cannot be loaded into the memory for reading in full. If full loading is adopted, the memory will explode directly and can only be read by line. The bufferedReader's readLine() in Java presses line to read the contents of the file

First, let's write a method to read the 30E data in a single thread and print it every 100 lines:

```    private static void readData() throws IOException {

String line;
long start = System.currentTimeMillis();
int count = 1;
while ((line = br.readLine()) != null) {
//            SplitData.splitLine(line);
if (count % 100 == 0) {
System.out.println("read 100 lines,total time: " + (System.currentTimeMillis() - start) / 1000 + " s");
System.gc();
}
count++;
}
running = false;
br.close();

}
```

It takes about 20 seconds to read 10G of data line by line, basically every 100 lines, 1E multi-data takes 1S, the speed is quite fast: Data processing

| Idea 1: Processing through a single thread

Through single thread processing, initialize a countMap, where key is the age and value is the number of occurrences. Divide the data read in each row by "," and then save each item in the countMap. If it exists, the value of the value Key + 1

```    for (int i = start; i <= end; i++) {
try {
File subFile = new File(dir + "\" + i + ".dat");
if (!file.exists()) {
subFile.createNewFile();
}
countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
```

```     public static void splitLine(String lineData) {
String[] arr = lineData.split(",");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement();
}
}
```

Find the age with the most ages by comparing and print it:

```  private static void findMostAge() {
Integer targetValue = 0;
String targetKey = null;
Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
while (entrySetIterator.hasNext()) {
Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
Integer value = entry.getValue().get();
String key = entry.getKey();
if (value > targetValue) {
targetValue = value;
targetKey = key;
}
}
System.out.println("The age with the largest number is:" + targetKey + "The quantity is:" + targetValue);
}
```

## Full code:

```package bigdata;

import org.apache.commons.lang3.StringUtils;

import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @Desc:
* @Author: bingbing
* @Date: 2022/5/4 0004 19:19
*/
public class HandleMaxRepeatProblem_v0 {

public static final int start = 18;
public static final int end = 70;

public static final String dir = "D:\dataDir";

public static final String FILE_NAME = "D:\ User.dat";

/**
* total quantity
*/
private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>();

/**
* sign to start consumption
*/
private static volatile boolean startConsumer = false;

/**
* Consumer Runtime Guarantee
*/
private static volatile boolean consumerRunning = true;

/**
* Divide the data according to "," and write it into countMap
*/
static class SplitData {

public static void splitLine(String lineData) {
String[] arr = lineData.split(",");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
countMap.computeIfAbsent(str, s -> new AtomicInteger(0)).getAndIncrement();
}
}

}

/**
*  init map
*/

static {
File file = new File(dir);
if (!file.exists()) {
file.mkdir();
}

for (int i = start; i <= end; i++) {
try {
File subFile = new File(dir + "\" + i + ".dat");
if (!file.exists()) {
subFile.createNewFile();
}
countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {

try {
} catch (IOException e) {
e.printStackTrace();
}

}).start();

}

private static void readData() throws IOException {

String line;
long start = System.currentTimeMillis();
int count = 1;
while ((line = br.readLine()) != null) {
//Read row by row and write data to map
SplitData.splitLine(line);
if (count % 100 == 0) {
System.out.println("read 100 lines,total time: " + (System.currentTimeMillis() - start) / 1000 + " s");
try {
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
}
findMostAge();

br.close();
}

private static void findMostAge() {
Integer targetValue = 0;
String targetKey = null;
Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
while (entrySetIterator.hasNext()) {
Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
Integer value = entry.getValue().get();
String key = entry.getKey();
if (value > targetValue) {
targetValue = value;
targetKey = key;
}
}
System.out.println("The age with the largest number is:" + targetKey + "The quantity is:" + targetValue);
}

//Clean up and find the largest number of characters
findMostAge();
System.exit(-1);
}

}
```

## Test result: It took a total of 3 minutes to read and count all the data. The memory consumption is 2G-2.5G, the CPU utilization is too low, and only fluctuates between 20%-25%: To improve CPU utilization, multithreading can be used. Next we use multithreading to solve this low CPU utilization problem.

| Idea 2: Divide and Conquer

Use multiple threads to consume the read data. The producer and consumer mode is used to consume data, because it is relatively fast when reading, and the data processing capability of a single thread is relatively poor, so the performance of the first idea is blocked in the data fetching side, and it is synchronous, so the entire The performance of the link will become very poor.

The so-called divide and conquer method is to divide and conquer, that is, to divide and process massive data. Initialize n threads according to the capability of the CPU, and each thread consumes a queue, so that the thread will not have the problem of preempting the queue when consuming.

At the same time, in order to ensure thread safety and the integrity of the producer-consumer model, a blocking queue is used, and the LinkedBlockingQueue provided in Java is a blocking queue. ## ①Initialize the blocking queue

Use linkedList to create a list of blocking queues:

```    private static List<LinkedBlockingQueue<String>> blockQueueLists = new LinkedList<>();
```

The number of blocking queues initialized in the static block and the capacity of a single blocking queue are 256. As mentioned above, the 30E data is about 2500 lines. If there are 20 queues, then each queue is 125. Therefore, the capacity can be designed as 256:

```    //Each queue capacity is 256
for (int i = 0; i < threadNums; i++) {
}
```

## ② Producer

In order to implement the load function, first define a count counter to record the number of rows:

```    private static AtomicLong count = new AtomicLong(0);
```

Calculate the subscript of the queue according to the number of lines: long index=count.get()%threadNums.

The following algorithm implements the polling of the queues in the queue list:

```   static class SplitData {

public static void splitLine(String lineData) {
//            System.out.println(lineData.length());
String[] arr = lineData.split("\n");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
long index = count.get() % threadNums;
try {
//Block if full
blockQueueLists.get((int) index).put(str);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.getAndIncrement();

}
}
```

## ③ Consumers

### Queue thread privatization: The consumer obtains the specified queue according to the index when starting the thread, thus realizing the thread privatization of the queue.

```    private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
//If a queue is shared, the number of threads should not be too many, and preemption is prone to occur.
System.out.println("start spending...");
for (int i = 0; i < threadNums; i++) {
final int index = i;
//Each thread is responsible for a queue, so that there will be no thread preempting the queue.
while (consumerRunning) {
startConsumer = true;
try {
String str = blockQueueLists.get(index).take();
countNum(str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

}
```

### Splitting strings by multiple sub-threads: Since the strings arriving from the queue are very large, if a single thread is used to call split(",") to split, the performance will also be blocked in this place.

```    //According to the size of arr, use multi-threading to split the string
private static void countNum(String str) {
int[] arr = new int;
arr = str.length() / 3;
//System.out.println("The split string is start position: " + arr + ", end position is: " + arr);
for (int i = 0; i < 3; i++) {
final String innerStr = SplitData.splitStr(str, arr);
//System.out.println("The split string is start position: " + arr + ", end position is: " + arr);
String[] strArray = innerStr.split(",");
for (String s : strArray) {
countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();
}
}).start();
}
}
```

### If endIndex exceeds the maximum length of the string, assign the last character to arr.

```        /**
* Divide the string according to the x coordinate, if the cut character is not ",", then move the coordinate forward or backward one place.
*
* @param line
* @param arr  Store x1,x2 coordinates
* @return
*/
public static String splitStr(String line, int[] arr) {

int startIndex = arr;
int endIndex = arr;
char start = line.charAt(startIndex);
char end = line.charAt(endIndex);
if ((startIndex == 0 || start == ',') && end == ',') {
arr = endIndex + 1;
arr = arr + line.length() / 3;
if (arr >= line.length()) {
arr = line.length() - 1;
}
return line.substring(startIndex, endIndex);
}

if (startIndex != 0 && start != ',') {
startIndex = startIndex - 1;
}

if (end != ',') {
endIndex = endIndex + 1;
}

arr = startIndex;
arr = endIndex;
if (arr >= line.length()) {
arr = line.length() - 1;
}
return splitStr(line, arr);
}
```

### Full code:

```package bigdata;

import cn.hutool.core.collection.CollectionUtil;
import org.apache.commons.lang3.StringUtils;

import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Desc:
* @Author: bingbing
* @Date: 2022/5/4 0004 19:19
*/
public class HandleMaxRepeatProblem {

public static final int start = 18;
public static final int end = 70;

public static final String dir = "D:\dataDir";

public static final String FILE_NAME = "D:\ User.dat";

private static final int threadNums = 20;

/**
* key is age, value is a list of all rows, using queue
*/
private static Map<Integer, Vector<String>> valueMap = new ConcurrentHashMap<>();

/**
* queue for storing data
*/

/**
* total quantity
*/
private static Map<String, AtomicInteger> countMap = new ConcurrentHashMap<>();

private static Map<Integer, ReentrantLock> lockMap = new ConcurrentHashMap<>();

private static AtomicLong count = new AtomicLong(0);

/**
* sign to start consumption
*/
private static volatile boolean startConsumer = false;

/**
* Consumer Runtime Guarantee
*/
private static volatile boolean consumerRunning = true;

/**
* Split the data according to "," and write it to the file
*/
static class SplitData {

public static void splitLine(String lineData) {
//            System.out.println(lineData.length());
String[] arr = lineData.split("\n");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
long index = count.get() % threadNums;
try {
//Block if full
blockQueueLists.get((int) index).put(str);
} catch (InterruptedException e) {
e.printStackTrace();
}
count.getAndIncrement();

}
}

/**
* Divide the string according to the x coordinate, if the cut character is not ",", then move the coordinate forward or backward one place.
*
* @param line
* @param arr  Store x1,x2 coordinates
* @return
*/
public static String splitStr(String line, int[] arr) {

int startIndex = arr;
int endIndex = arr;
char start = line.charAt(startIndex);
char end = line.charAt(endIndex);
if ((startIndex == 0 || start == ',') && end == ',') {
arr = endIndex + 1;
arr = arr + line.length() / 3;
if (arr >= line.length()) {
arr = line.length() - 1;
}
return line.substring(startIndex, endIndex);
}

if (startIndex != 0 && start != ',') {
startIndex = startIndex - 1;
}

if (end != ',') {
endIndex = endIndex + 1;
}

arr = startIndex;
arr = endIndex;
if (arr >= line.length()) {
arr = line.length() - 1;
}
return splitStr(line, arr);
}

public static void splitLine0(String lineData) {
String[] arr = lineData.split(",");
for (String str : arr) {
if (StringUtils.isEmpty(str)) {
continue;
}
int keyIndex = Integer.parseInt(str);
ReentrantLock lock = lockMap.computeIfAbsent(keyIndex, lockMap -> new ReentrantLock());
lock.lock();
try {
} finally {
lock.unlock();
}

//                boolean wait = true;
//                for (; ; ) {
//                    if (!lockMap.get(Integer.parseInt(str)).isLocked()) {
//                        wait = false;
//                        valueMap.computeIfAbsent(Integer.parseInt(str), integer -> new Vector<>()).add(str);
//                    }
//// currently blocked until the lock is released
//                    if (!wait) {
//                        break;
//                    }
//                }

}
}

}

/**
*  init map
*/

static {
File file = new File(dir);
if (!file.exists()) {
file.mkdir();
}

//Each queue capacity is 256
for (int i = 0; i < threadNums; i++) {
}

for (int i = start; i <= end; i++) {
try {
File subFile = new File(dir + "\" + i + ".dat");
if (!file.exists()) {
subFile.createNewFile();
}
countMap.computeIfAbsent(i + "", integer -> new AtomicInteger(0));
//                lockMap.computeIfAbsent(i, lock -> new ReentrantLock());
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {

try {
} catch (IOException e) {
e.printStackTrace();
}

}).start();

try {
//start spending
startConsumer();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}).start();

//monitor
monitor();
}).start();

}

/**
* Check if the stack is empty every 60s
*/
private static void monitor() {
AtomicInteger emptyNum = new AtomicInteger(0);
while (consumerRunning) {
try {
} catch (InterruptedException e) {
e.printStackTrace();
}
if (startConsumer) {
//If the size of all stacks is 0, then terminate the process
AtomicInteger emptyCount = new AtomicInteger(0);
for (int i = 0; i < threadNums; i++) {
if (blockQueueLists.get(i).size() == 0) {
emptyCount.getAndIncrement();
}
}
emptyNum.getAndIncrement();
//If the specified number of consecutive checks are empty, then stop consumption
if (emptyNum.get() > 12) {
consumerRunning = false;
System.out.println("end of consumption...");
try {
} catch (Exception e) {
System.out.println(e.getCause());
} finally {
System.exit(-1);
}
}
}
}

}
}

private static void readData() throws IOException {

String line;
long start = System.currentTimeMillis();
int count = 1;
while ((line = br.readLine()) != null) {
//Read row by row and write data to the queue
SplitData.splitLine(line);
if (count % 100 == 0) {
System.out.println("read 100 lines,total time: " + (System.currentTimeMillis() - start) / 1000 + " s");
try {
System.gc();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
count++;
}

br.close();
}

//Clean up and find the largest number of characters
Integer targetValue = 0;
String targetKey = null;
Iterator<Map.Entry<String, AtomicInteger>> entrySetIterator = countMap.entrySet().iterator();
while (entrySetIterator.hasNext()) {
Map.Entry<String, AtomicInteger> entry = entrySetIterator.next();
Integer value = entry.getValue().get();
String key = entry.getKey();
if (value > targetValue) {
targetValue = value;
targetKey = key;
}
}
System.out.println("The age with the largest number is:" + targetKey + "The quantity is:" + targetValue);
System.exit(-1);
}

/**
*
* @throws FileNotFoundException
* @throws UnsupportedEncodingException
*/
private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException {
//If a queue is shared, the number of threads should not be too many, and preemption is prone to occur.
System.out.println("start spending...");
for (int i = 0; i < threadNums; i++) {
final int index = i;
//Each thread is responsible for a queue, so that there will be no thread preempting the queue.
while (consumerRunning) {
startConsumer = true;
try {
String str = blockQueueLists.get(index).take();
countNum(str);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

}

//According to the size of arr, use multi-threading to split the string
private static void countNum(String str) {
int[] arr = new int;
arr = str.length() / 3;
//System.out.println("The split string is start position: " + arr + ", end position is: " + arr);
for (int i = 0; i < 3; i++) {
final String innerStr = SplitData.splitStr(str, arr);
//System.out.println("The split string is start position: " + arr + ", end position is: " + arr);
String[] strArray = innerStr.split(",");
for (String s : strArray) {
countMap.computeIfAbsent(s, s1 -> new AtomicInteger(0)).getAndIncrement();
}
}).start();
}
}

/**
* The background thread consumes the data in the map and writes it to each file. If it is not consumed, the memory process will burst.
*/
private static void startConsumer0() throws FileNotFoundException, UnsupportedEncodingException {
for (int i = start; i <= end; i++) {
final int index = i;
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(dir + "\" + i + ".dat", false), "utf-8"));
int miss = 0;
int countIndex = 0;
while (true) {
//Every 1 million prints
int count = countMap.get(index).get();
if (count > 1000000 * countIndex) {
System.out.println(index + "The number of years is:" + countMap.get(index).get());
countIndex += 1;
}
if (miss > 1000) {
try {
bw.close();
} catch (IOException e) {

}
}
break;
}

Vector<String> lines = valueMap.computeIfAbsent(index, vector -> new Vector<>());
//write to file
try {

if (CollectionUtil.isEmpty(lines)) {
miss++;
} else {
//100 batches
if (lines.size() < 1000) {
continue;
}
//Start processing at 1000
ReentrantLock lock = lockMap.computeIfAbsent(index, lockIndex -> new ReentrantLock());
lock.lock();
try {
Iterator<String> iterator = lines.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
sb.append(iterator.next());
}
try {
bw.write(sb.toString());
bw.flush();
} catch (IOException e) {
e.printStackTrace();
}
//clear the vector
valueMap.put(index, new Vector<>());
} finally {
lock.unlock();
}

}
} catch (InterruptedException e) {

}
}
}).start();
}

}
}
```

### Test Results:

Memory and CPU initial usage size: After startup, the runtime is stable at 11.7, and the stable CPU utilization is above 90%. The total time is reduced from 180S to 103S, the efficiency is increased by 75%, and the results obtained are also consistent with single-threaded processing! problems encountered

If you find that the GC suddenly strikes and starts not working when it is running, it may be that there is too much garbage in the heap of the JVM, and it is not recycled, which leads to a sudden increase in memory. Solution: After reading a certain amount, you can pause the main thread for a few seconds and manually call the GC.

Tip: The thread creation in this demo is manually created, and the thread pool is used in actual development!

Posted by Roddy87 on Wed, 31 Aug 2022 02:09:59 +0530