海量数据处理:如何从10亿个数中,找出最大的10000个数?(top K问题)

问题:有 10 亿个不重复的数字,内存中只能放进 1 万个数,怎么找到最大的 1 万个数字?

此题多数互联网公司都有提及,这里简单总结一下。我将在下面介绍一些基本的海量数据处理的方法,供大家参考。需要明确的一点是,现实情况复杂多变,所以对于海量数据处理这样大的主题,是不可能用一篇博客就说清楚的。但方法论可以一通百通,我们通过一些已经被无数次实验证明有效的方法,就能大致理解对此类问题的解决思路,之后在面临新的问题时,至少能找到一个大致的方向。

首先,被问到这题应该先询问数据规模与数据分布。如果数据规模比较小,在千数量级,采用O(nlgn)的快排或大顶堆取前K个即可。如果数据为整形,且分布范围不大,可以考虑计数排序,在线性时间中求解。

其次,如果不是上面讨论的情况,就是大规模一般情况。数据集可能在亿数量级,此时有可能数据都没有办法全部加载到内存中,此时通常比较好的方案是:分治、Trie树/hash、小顶堆,即先将数据集按照Hash方法分解成多个小数据集,然后使用Trie树或Hash统计每个小数据集中的query词频,之后用小顶堆求出每个数据集中出现频率最高的前K个数,最后在所有top K中求出最终的top K。

方法一:将数据全部排序

将数据全部排序,然后在排序后的集合中进行查找,最快的排序算法的时间复杂度一般为O(nlogn),如快速排序。但是在32位的机器上,每个int类型占4个字节,10亿个数就要占用3.7G的存储空间,对于一些可用内存小于3.7G的计算机而言,很显然是不能一次将全部数据读入内存进行排序的。其实即使内存能够满足要求寻找出最大的10000个数即可,而排序却是将所有的元素都排序了,做了很多的无用功。只能锁这是一个方法,但是实际可行性不高。

方法二:局部淘汰法

维护一个10000长度的数组a[],先读取源数据中的前10000个放入数组,对该数组进行升序排序,再依次读取源数据第10000个以后的数据,和数组中最小的元素(a[0])比较,如果小于a[0]直接pass,大于的话,就丢弃最小的元素a[0],利用二分法找到其位置,然后该位置前的数组元素整体向前移位,直到源数据读取结束。最后遍历完这10亿个数,得到的结果容器中保存的数即为最终结果了。此时的时间复杂度为O(n+m^2),其中m为容器的大小,即10000。

这比方法一效率会有很大的提高,但是当K的值较大的时候,长度为K的数据整体移位,也是非常耗时的。

方法三:常规解法,利用最小堆的原理(时间复杂度为O(N*logK))(掌握)

首先读入前10000个数来创建大小为10000的最小堆,建堆的时间复杂度为O(mlogm)(m为数组的大小即为10000),最小堆的堆顶元素就是最大K个数中的最小的一个。之后读取后续数字,每次扫描一个数据X,如果X比堆顶元素小,则不需要改变原来的堆。如果X比堆顶元素大,那么用X替换堆顶元素,在替换之后,X可能破坏了最小堆的结构,需要调整堆来维持堆的性质。调整过程时间复杂度为O(logK)。 全部的时间复杂度为O(N*logK)。

这种方法当数据量比较大的时候,比较方便。因为对所有的数据只会遍历一次。甚至还可以运用多线程技术,把所有10亿个数据分组存放,比如分别放在1000个文件中。这样处理就可以分别在每个文件的10^6个数据中找出最大的10000个数,合并到一起在再找出最终的结果。

下面我使用堆排序的思想写一个寻找数组中最大的K个数的模板,仅供参考:

/**
*  使用小顶堆实现寻找最大的K个元素
*/
public int[] getTopKth(int[] arr,int k){
	if(arr==null||arr.length==0||k>arr.length){
	   return null;
	}
	//1.将原数组中前K个数构成一个小顶堆
	int[] topK=new int[k];
	for(int i=0;i<k;i++){
	   topK[i]=arr[i];
	}
	//2.构建初始堆
	for(int i=topK.length;i>=0;i--){
	   heapify(arr,i,k);
	}
	
	//3.从原数组第k个数开始,如果循环的数比堆顶元素还小,直接pass,如果比堆顶元素要大,就将此数放在堆顶,同时进行一次以它为起始点的树的调整。
	for(int i=k;i<arr.length;i++){
	    int tmp=arr[i];
		//如果堆顶元素比当前元素小,那么以当前元素替换堆顶元素,之后调整堆为小顶堆
		if(tmp>topK[0]){
		  topK[0]=tmp;
		  heapify(arr,0,k);
		}
	}
	//返回最大的K个数
	return topK;
}

/**
*  堆排序核心:调整数组成为小顶堆
* arr: 堆化数组
* start: 调整开始的地方
* len:调整的长度
*/
private void heapify(int[] arr,int start,int len){
   //根据二叉树的规则,定义三个变量root表示根节点,left表示根节点的左孩子,right表示根节点的右孩子
   int root=start,left=2*start+1,right=2*start+2;
   if(left<len&&arr[root]>arr[left]){
        root=left;
   }
   if(right<len&&arr[root]>arr[right]){
        root=right;
   }
   //说明有左孩子或右孩子比根节点小,需要调整,
   if(root!=start){
       arr[start]^=arr[root];
	   arr[root]^=arr[start];
	   arr[start]^=arr[root];
	   heapify(arr,root,len);
   }
}

方法四:分治法+快速排序原理(时间复杂度O(N*logK)(掌握)

假设N个数存储在数组S中,从数组中随机找最大的K个数,将数组分成两部分SMax和SMin。SMax中的元素大于等于X,SMin中的元素小于X。 出现如下两种情况:

  • (1)若SMax组的个数大于或等于K,则继续在SMax分组中找取最大的K个数字 。
  • (2)若SMin组中的数字小于K ,其个数为T,则继续在SMin中找取 K-T个数字 。

一直这样递归下去,不断把问题分解成小问题,平均时间复杂度为O(N*logK)。

上面这个思路具体可以参考剑指 Offer 40. 最小的k个数这道题,这道题是寻找最小的K个数,但是思想都是一样的。这里我给出个模板,仅供参考:

/**
*  快排思想寻找无序数组最大的K个数
*/
pulbic int[] getTopKth(int[] arr,int k){
    if(arr==null||arr.length==0){
	   return null;
	}
	int low=0,high=arr.length-1;
	//分界元素和下标值
	int pivot=partition(arr,low,high);
	while(pivot!=k-1){
	   if(pivot<k-1){
	       low=pivot+1;
	   }
	   if(pivot>k-1){
	      high=pivot-1;
	   }
	   pivot=partition(arr,low,high);
	}
	//将前K个最大的元素返回
	return Arrays.copyOf(arr,k);
}

/**
*  快排核心:主要就是划分数组为两部分:以pivot为界,左边的元素都比他大,右边的元素都比他小(降序排序)
*/
private int partition(int[] arr,int low,int high){
    int pivot=arr[low];
	while(low<high){
	   while(low<high&&arr[high]<=pivot){
	       high--;
	   }
	   arr[low]=arr[high];
	   while(low>high&&arr[high]>=pivot){
	       low++;
	   }
	   arr[high]=arr[low];
	}
	//枢纽元素归位
	arr[low]=pivot;
	return low;
}

上面的思路在数据规模比较小的时候完全可以和方法一相互替换使用,但是当数据量比较大的时候,这个方法不能直接使用,但是方法一可以直接使用,而此方法不能直接使用的原因就是数据没有办法一次性加载到内存中。因此我们可以考虑配合分治法,即:将10亿个数据分成100份,每份1000万个数据,找到每份数据中最大的10000个,最后在剩下的 100 * 10000(100万) 个数据里面找出最大的10000个。如果100万数据选择足够理想,那么可以过滤掉10亿数据里面99%的数据。

一些经常被提及的该类问题

这里再列举一些常见的海量数据处理的问题以及给出对应的解决方案,仅供参考:

(1)有10个文件,每个文件1GB,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。按照query的频度排序。

这个还是典型的Top k问题,解决方案如下:

方案1:顺序读取10个文件,按照hash(query)%10的结果将query写入到另外10个文件中。这样新生成的文件每个的大小大约也1G(假设hash函数是足够离散的)。

找一台内存在2G左右的机器,依次用hash_map(query, query_count)来统计每个query出现的次数。利用快速/堆/归并排序按照出现次数进行排序。将排序好的query和对应的query_cout输出到文件中。这样得到了10个排好序的文件(记为)。最后对这10个文件进行归并排序即可。

方案2:一般query的总量是有限的,只是重复的次数比较多而已,可能对于所有的query,一次性就可以加入到内存了。这样,我们就可以采用trie树/hash_map等直接来统计每个query出现的次数,然后按出现次数做快速/堆/归并排序就可以了。

方案3:与方案1类似,但在做完hash分成多个文件后,可以采用大数据分布式的架构来处理(比如MapReduce),最后再进行合并。

(2)有一个1GB大小的文件,里面的每一行是一个词,词的大小不超过16个字节,内存限制大小是1MB。返回频数最高的100个词。

(3)给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url? 方案1:分治法。粗略计算得知,每个文件的大小为50*10^8×64=320G,单单一个文件就远远大于内存限制的4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。具体策略如下:

  1. 遍历文件a,对每个url求取hash(url)%1000,然后根据所取得的值将url分别存储到1000个小文件(记为a0,a1,...,a999)中。这样每个小文件的大约为300M(假设hash函数足够离散)。

  2. 遍历文件b,采取和a相同的方式将url分别存储到1000小文件(记为b0,b1,...,b999)。这样处理后,所有可能相同的url都在对应的小文件(a0-b0,a1-b1,...,a999-b999)中,不对应的小文件不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。

  3. 求每对小文件中相同的url时,可以把其中一个小文件的url存储到hash_set中。然后遍历另一个小文件的每个url,看其是否在刚才构建的hash_set中,如果是,那么就是共同的url,存到文件里面就可以了。

方案2:使用布隆过滤器。如果允许有一定的错误率,可以使用Bloom filter,4G内存大概可以表示340亿bit。将其中一个文件中的url使用Bloom filter映射为这340亿bit,然后挨个读取另外一个文件的url,检查是否与Bloom filter,如果是,那么该url应该是共同的url。

(4)现有两个各有20亿行的文件,每一行都只有一个数字,求这两个文件中数字的交集?

这个题的如果换成给定两个整型数组让求两个数组(nums1,nums2)的交集该如何做呢? 这是LeetCode第349题,解决方法有很多,这里我提供两种解法,仅供参考:

解法1:由于给定的数据有限,所以可以不用考虑内存装不装的下的问题,因此这里可以直接先把nums1中的所有元素全部添加到集合set1中,然后遍历nums2,判断在nums2中的元素在集合set1中是否存在,如果存在,说明有交集,就把他添加到结果中。

解法2:对两个数组先升序排序,排序之后设置两个指针分别从nums1和nums2数组首地址开始遍历数组,比那里过程中会有以下3种情况: 1. nums1[i]<nums2[j],此时应该将i继续向后移动,j不动,因为nums1[i]已经比nums2[j]小了,在他们之间不可能找到相等的数 2. nums1[i]>nums2[j],情况正好相反,此时移动j就好了,i不动,道理类似 3. nums1[i]=nums2[j] ,出现了交集,将此值加入结果,并且i,j同时向后移动

下面分别给出代码实现,仅供参考!

/**
*  方法1:利用set集合求两个数组的交集
*/
public int[] intersection(int[] nums1,int[] nums2){
   if(nums1==null||nums1.length==0||
      nums2==null||nums2.length==0){
	     return null;
    }
	//将nums1放到set集合中
	Set<Integer> set=new HashSet<>();
	for(int num:nums1){
	   set.add(num);
	}
	//遍历nums2并与set比对
	List<Integer> ans=new ArrayList<>();
	for(int i=0;i<set.length;i++){
	   if(set.contains(nums2[i])){
	       list.add(nums2[i]);
	   }
	}
	return list.stream().mapToInt(Integer::intValue).toArray();
}

/**
*   方法2:升序排序+双指针
*/
pulbic int[] intersection(int[] nums1,int[] nums2){
   if(nums1==null||nums1.length==0||
      nums2==null||nums2.length==0){
	     return null;
   }
   //分别对两个数组排序
   Arrays.sort(nums1);
   Arrays.sort(nums2);
   int i=0,j=0;
   List<Integer> ans=new ArrayList<>();
   while(i<nums1.length&&j<nums2.length){
       if(nums1[i]<nums2[j]){
	      i++;
	   }else if(nums1[i]>nums2[j]){
	      j++;
	   }else{
	     //nums1[i]==nums2[j]
		 if(ans.size()==0||nums1.get(i-1)!=nums1[i]){
		     ans.add(nums1[i]);
		 }
		 i++;
		 j++;
	   }
   }
   return ans.stream().mapToInt(Integer::intValue).toArray();
}

有了这两种解题思路之后,尤其是方法2的思路之后,现在我们遇到最大的困难就是大文件无法全部加载到内存中,这个问题的解决办法还是分治法,即先使用外排序分别对两个文件进行分隔排序,比如分隔成1000个小文件,总之排序完成会得到两个有序的文件,之后同时遍历这两个文件就可以了,每次遍历加载文件一部分数据,遍历时采取的方法就如方法2所示。

(5)海量日志数据,提取出某日访问网站次数最多的那个IP。

粗略估算一下便知,就拿32位的IPV4来说,最多可以有2^32(42亿)种取值情况,所以不能完全加载到内存中处理;可以考虑采用分而治之的思想:

  1. 按照IP地址的Hash(IP) % 1024值,把海量IP日志分别存储到1024个小文件中,每个小文件最多包含420万个IP地址(hash函数理想的情况下)
  2. 对于每一个小文件,可以构建一个IP为key,出现的次数为value的HashMap,同时记录当前出现次数最多的那个IP地址,通过HashMap可以很容的得到该文件中次数最个IP
  3. 每个文件一个最大次数Ip,汇总起来可以得到1024个IP,再依据常规的排序算法得出总体上出现次数最多的IP。

(6)10亿个整数(有重复),找出重复次数最多的100个整数。

解决思路:

  1. 通过hash取模的方式将大文件分成电脑可以直接完全加载计算的小份文件,比如10亿个数分成1000份;
  2. 在每一小份文件中排序,统计出出现次数最多的100个数;
  3. 将每个文件中出现次数最多的100个数拿出来放到一起使用上文所说的快排思路或小顶堆取出其前100个即可。

(7)海量数据排序问题:一个文件中有9亿条不重复的9位整数,对这个文件中数字进行排序

方法一、数据库排序法。将文本文件导入到数据库中,让文本文件导入到数据库中,让数据库进行索引排序操作后提取数据到文件.该种方法虽然操作简单,方便,但是运算速度较慢,而且对数据库设备要求比较高.

方法二、分治法。通过Hash法将9亿条数据分为20段,每一段大约5000万条,大约需要占用500万*4B = 200MB空间,在文件中依次搜索0~5000万,50000001~1亿 ,,,,,,,将排序的结果存入文件,该方法要装满9位整数,一共需要20次,所以一共要进行20次排序,需要对文件进行20次读操作.该方法虽然缩小了每次使用的内存空间大小,但是编码复杂,速度也慢.

方法三、位图法。考虑到最大的9位整数为999999999,由于9亿条数据是不重复的,如果这些数据都还是正整数的话,那么可以申请一个10亿个元素的位数组bitset表示0~999999999,结点中用0表示没有这个数,1表示存在这个数,判断0或1只用一个bit存储就够了,而声明一个可以包含9位整数的bit数组,一共需要10亿bit/8/1024/1024≈120MB内存,把内存中的数组全部初始化为0,读取文件中的数据,并将数据放入内存.比如读到一个数据为314332897这个数据,那就先在内存中找到314332897这个bit,并将bit值置为1,遍历整个bit数组,将bit为1的数组下标存入文件,最终得到排序的内容.

参考

留言区

还能输入500个字符