記錄敗者,勝者參加下一輪比賽,當新的元素到達的時候,log2(K)調(diào)整就可以選出勝者,下面的log2(K)代進1式,就可以抵消掉k的影響,從而與k無關,那么我們就可以,通過增大k,減少IO次數(shù),并且不會降低內(nèi)部歸并的效率。
算法實現(xiàn)
1、一共k路,b[i]對應第i路,比如一共5路 b[0],b[1],b[2],b[3],b[4]對應5路的首元素
2、ls[i],是一棵敗者樹ls[0],ls[1],ls[2],ls[3],ls[4],分別存儲上面的b[i]的下標i,這樣就利用元素的下標,組織了成它們的樹關系
初始化時,ls[i]=i;
然后從最后一個結點開始調(diào)整,t=(s+k)/2表示的是s的父結點, 然后逐漸向上向父結點比較t=t/2,最后到達ls[0],它是冠軍,經(jīng)過所有的結點調(diào)整,就成了一棵敗者樹,最后輸出b[ls[0]],
k路合并,根據(jù)ls[0],可以知道是從哪路輸出,然后再從哪路輸入
一個java 的模仿實現(xiàn),可運行
點擊(此處)折疊或打開
package my.sort;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
/**
* 基于大數(shù)據(jù)量的外排序算法,分為二路歸并和多路歸并
* @author java2king
* @link http://blog.csdn.net/Java2King
*
*/
public class ExternalSort {
public static int ITEM_COUNT = 10000000; //總數(shù)
public static int BUFFER_SIZE = 1024*4*1000;// 一次緩沖讀取
public static int FILE_COUNT = 1024*1000*1*4;// 每個文件的記錄數(shù)1
public static File MAIN_FILE = new File("mainset");//要排序的文件
/**
* 二路歸并
* @param file
* @return
* @throws IOException
*/
public File sort(File file) throws IOException {
ArrayList<File> files = split(file);
return process(files);
}
/**
* 多路歸并
* @param file
* @throws IOException
*/
public void mSort(File file) throws IOException{
ArrayList<File> files = split(file);
multipleMerge(files);
}
// recursive method to merge the lists until we are left with a
// single merged list
private File process(ArrayList<File> list) throws IOException {
if (list.size() == 1) {
return list.get(0);
}
ArrayList<File> inter = new ArrayList<File>();
for (Iterator<File> itr = list.iterator(); itr.hasNext();) {
File one = itr.next();
if (itr.hasNext()) {
File two = itr.next();
inter.add(merge(one, two));
} else {
// return one;
inter.add(one);
}
}
return process(inter);
}
/**
* Splits the original file into a number of sub files.
*/
private ArrayList<File> split(File file) throws IOException {
ArrayList<File> files = new ArrayList<File>();
int[] buffer = new int[FILE_COUNT];//FILE_COUNT每個文件的記錄數(shù)
FileInputStream fr = new FileInputStream(file);
BufferedInputStream bin = new BufferedInputStream(fr,BUFFER_SIZE);
DataInputStream din=new DataInputStream(bin);
boolean fileComplete = false;
while (!fileComplete) {
int index = buffer.length;
for (int i = 0; i < buffer.length && !fileComplete; i++) {
try {
buffer[i] = din.readInt();
} catch (Exception e) {
fileComplete = true;
index = i;
}
}
if (index != 0 && buffer[0] > -1) {
Arrays.sort(buffer, 0, index);
File f = new File("set" + new Random().nextInt());
// File temp = File.createTempFile("josp", ".tmp", f);
FileOutputStream writer = new FileOutputStream(f);
BufferedOutputStream bOutputStream = new BufferedOutputStream(writer);
DataOutputStream dout=new DataOutputStream(bOutputStream);
for (int j = 0; j < index; j++) {
dout.writeInt(buffer[j]);
}
dout.close();
bOutputStream.close();
writer.close();
files.add(f);
}
}
din.close();
bin.close();
fr.close();
return files;
}
/**
* 多路歸并
* @param list
* @throws IOException
*/
private void multipleMerge(ArrayList<File> list) throws IOException
{
int fileSize = list.size();
if(fileSize == 1)
{
return;
}
ArrayList<DataInputStream> dinlist = new ArrayList<DataInputStream>();
int[] ext = new int[fileSize];//比較數(shù)組
// File output = new File("multipleMerged");
FileOutputStream os = new FileOutputStream(MAIN_FILE);
BufferedOutputStream bout = new BufferedOutputStream(os);
DataOutputStream dout = new DataOutputStream(bout);
for (int i = 0; i < fileSize; i++)
{
try {
dinlist.add(i, new DataInputStream(new BufferedInputStream(
new FileInputStream(list.get(i)), BUFFER_SIZE)));
} catch (Exception e) {
e.printStackTrace();
}
}
int index = 0;
for (int i = 0; i < fileSize; i++) {
try {
ext[i] = dinlist.get(i).readInt();
} catch (Exception e) {
System.err.println("file_" + i + "為空");
ext[i] = -1;
}
}
int count = fileSize;
int[] sum = new int[fileSize];
while (count > 1)
{
index = getMinIndex(ext);
dout.writeInt(ext[index]);
sum[index]++;
try {
ext[index] = dinlist.get(index).readInt();
} catch (Exception e) {
ext[index] = -1;
count--;
dinlist.get(index).close();
// System.err.println(index + "空,寫進:" +sum[index]);
}
}
int sIndex = getSIndex(ext);
dout.writeInt(ext[sIndex]);
while (true) {
try {
dout.writeInt(dinlist.get(sIndex).readInt());
} catch (Exception e) {
dinlist.get(sIndex).close();
break;
}
}
dout.close();
}
//找到剩下的最后一個文件輸入流
public int getSIndex(int[] ext){
int result = 0;
for (int i = 0; i < ext.length; i++) {
if(ext[i]!= -1){
result = i;
break;
}
}
return result;
}
//找到數(shù)據(jù)中最小的一個
public int getMinIndex(int[] ext){
int min = 2147483647;
int index = -1;
for (int i = 0; i < ext.length; i++) {
if(ext[i] != -1 && ext[i] < min){
min = ext[i];
index = i;
}
}
return index;
}
/**
* 二路歸并
*
* @param one
* @param two
* @return
* @throws IOException
*/
private File merge(File one, File two) throws IOException {
FileInputStream fis1 = new FileInputStream(one);
FileInputStream fis2 = new FileInputStream(two);
BufferedInputStream bin1 = new BufferedInputStream(fis1,BUFFER_SIZE);
BufferedInputStream bin2 = new BufferedInputStream(fis2,BUFFER_SIZE);
DataInputStream din1=new DataInputStream(bin1);
DataInputStream din2=new DataInputStream(bin2);
File output = new File("merged" + new Random().nextInt());
FileOutputStream os = new FileOutputStream(output);
BufferedOutputStream bout = new BufferedOutputStream(os);
DataOutputStream dout=new DataOutputStream(bout);
int a = -1;//= din1.readInt();
int b = -1;//= din2.readInt();
boolean finished = false;
boolean emptyA = false;//
int flag = 0;
while (!finished) {
if (flag != 1) {
try {
a = din1.readInt();
} catch (Exception e) {
emptyA = true;
break;
}
}
if (flag != 2) {
try {
b = din2.readInt();
} catch (Exception e) {
emptyA = false;
break;
}
}
if(a > b){
dout.writeInt(b);
flag = 1;
}else if( a < b){
dout.writeInt(a);
flag = 2;
}else if(a == b){
dout.write(a);
dout.write(b);
flag = 0;
}
}
finished = false;
if(emptyA){
dout.writeInt(b);
while(!finished){
try {
b = din2.readInt();
} catch (Exception e) {
break;
}
dout.writeInt(b);
}
}else{
dout.writeInt(a);
while(!finished){
try {
a = din1.readInt();
} catch (Exception e) {
break;
}
dout.writeInt(a);
}
}
dout.close();
os.close();
bin1.close();
bin2.close();
bout.close();
return output;
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
Random random = new Random(System.currentTimeMillis());
FileOutputStream fw = new FileOutputStream(MAIN_FILE);
BufferedOutputStream bout = new BufferedOutputStream(fw);
DataOutputStream dout=new DataOutputStream(bout);
//ITEM_COUNT = 10000000; //總數(shù)
for (int i = 0; i < ITEM_COUNT; i++) {
int ger = random.nextInt();
ger = ger < 0 ? -ger : ger;
dout.writeInt(ger);
}
dout.close();
bout.close();
fw.close();
ExternalSort sort = new ExternalSort();
System.out.println("Original:");
long start = System.currentTimeMillis();
sort.mSort(MAIN_FILE);
long end = System.currentTimeMillis();
System.out.println((end - start)/1000 + "s");
recordFile((end - start)/1000 ,true);
}
private static void recordFile(long time,boolean isBuffer)
throws FileNotFoundException, IOException {
BufferedWriter bw = new BufferedWriter(new FileWriter("log",true));
bw.write("FILE_COUNT = "+FILE_COUNT+";對"+ ITEM_COUNT + "條數(shù)據(jù) "+ ITEM_COUNT*4/(1024*1204) +"MB排序耗時:" + time + "s ");
if(isBuffer){
bw.write(" 使用緩沖:"+BUFFER_SIZE*4/(1024*1204) +"MB");
}
bw.newLine();
bw.close();
}
}