0%

随机抽样/洗牌算法--Knuth-Durstenfeld Shuffle && 蓄水池算法

随机抽样/洗牌算法–Knuth-Durstenfeld Shuffle && 蓄水池算法

洗牌算法有很多种, 蓄水池算法则是适合大量数据场景或者数据流下的等概率抽样

一种洗牌算法 Knuth-Durstenfeld Shuffle :

对于一个已知长度n的数组, 可以从数组结尾的位置开始, 每次随机一个[0,i)范围(数组下标从0开始到n-1结束)内的随机数x, i为当前数据在数组中的下标,交换i位置和x位置的数据, 然后i-1并继续重复之前的过程直到到达数组的头部即下标为0的位置以后结束循环.

证明:

设P(n) 为位置n放入i号数据的概率
第n位置的概率是 1/n
第n-1位置的概率是 1/(n-1) * (1 - P(n)) = 1/(n-1) * (1- 1/n) = 1/n
第n-k位置的概率是 1/(n-k) * (1- P(n))(1- P(n-1)) … (1 - P(n-k+1))= 1/n

算法复杂度: O(n) 空间复杂度 O(1)
缺点: 需要事先知道确定的数据长度. 如果数据很长或者是个接近无限的数据流,计算一次数据的长度很耗时/遍历一遍所有数据(数据量大到内存放不下, 只能流式读取),

在这种限定下 : 给定一个数据流,数据流长度N很大,且N直到处理完所有数据之前都不可知,请问如何在只遍历一遍数据(O(N))的情况下,能够随机选取出m个不重复的数据。 这种情况就需要使用蓄水池算法.

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 原地随机
func shuffle(data []interface{}) {
l:= len(data)
var rnd int
var n *big.Int
var err error
for i:=l-1;i>0;i-- {
n, err = rand.Int(rand.Reader,big.NewInt(int64(i)))
if err!=nil {
panic(err)
}
rnd = int(n.Int64())
data[i], data[rnd] = data[rnd], data[i]
}
}

// 不改变原数据
func shuffleCopy(data []interface{}) []interface{}{
l:= len(data)
res := make([]interface{},l)
copy(res,data)
var rnd int
var n *big.Int
var err error
for i:=l-1;i>0;i-- {
n, err = rand.Int(rand.Reader,big.NewInt(int64(i)))
if err!=nil {
panic(err)
}
rnd = int(n.Int64())
res[i], res[rnd] = res[rnd], res[i]
}
return res
}

蓄水池算法

算法思路如下:

  1. 准备: 创建一个m大小的数据池, 当作蓄水池. 通过i对已经处理的数据进行计数
  2. 如果接收的数据量小于m,则依次放入蓄水池。(先将前m个数据放入其中)
  3. 当接收到第i个数据时,i >= m,在[0, i]范围内取以随机数d,若d的落在[0, m-1]范围内,则用接收到的第i个数据替换蓄水池中的第d个数据。(当池子满了, 就在当前计数范围内随机一个数字d, 如果小于池子的容量, 就用数据i替换池子中编号d对应的数据)
  4. 重复步骤3, 直到遍历完所有数据N。

当处理完所有的数据时,蓄水池中的每个数据都是以m/N的概率获得的。这个算法不需要事先获知N的大小.

证明:

抄自: https://www.jianshu.com/p/7a9ea6ece2af
假设数据开始编号为1.第i个接收到的数据最后能够留在蓄水池中的概率=第i个数据进入过蓄水池的概率*之后第i个数据不被替换的概率(第i+1到第N次处理数据都不会被替换)。

  1. 当i<=m时,数据直接放进蓄水池,所以第i个数据进入过蓄水池的概率=1。
  2. 当i>m时,在[1,i]内选取随机数d,如果d<=m,则使用第i个数据替换蓄水池中第d个数据,因此第i个数据进入过蓄水池的概率=m/i。
  3. 当i<=m时,程序从接收到第m+1个数据时开始执行替换操作,第m+1次处理会替换池中数据的为m/(m+1),会替换掉第i个数据的概率为1/m,则第m+1次处理替换掉第i个数据的概率为(m/(m+1))(1/m)=1/(m+1),不被替换的概率为1-1/(m+1)=m/(m+1)。依次,第m+2次处理不替换掉第i个数据概率为(m+1)/(m+2)…第N次处理不替换掉第i个数据的概率为(N-1)/N。所以,之后第i个数据不被替换的概率=m/(m+1)(m+1)/(m+2)(N-1)/N=m/N。
  4. 当i>m时,程序从接收到第i+1个数据时开始有可能替换第i个数据。则参考上述第3点,之后第i个数据不被替换的概率=i/N。
  5. 结合第1点和第3点可知,当i<=m时,第i个接收到的数据最后留在蓄水池中的概率=1m/N=m/N。结合第2点和第4点可知,当i>m时,第i个接收到的数据留在蓄水池中的概率=m/ii/N=m/N。

综上可知,每个数据最后被选中留在蓄水池中的概率为m/N。

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func waterPoolSample(data []interface{}, count int) []interface{} {
if len(data) < count {
count = len(data)
}
pool := make([]interface{},count)
copy(pool, data)
var rnd int
var n *big.Int
var err error
for i := count; i < len(data); i++ {
n, err = rand.Int(rand.Reader,big.NewInt(int64(i)))
if err!=nil {
panic(err)
}
rnd = int(n.Int64())
if rnd >= count {
continue
}
pool[rnd] = data[i]
}
return pool
}

分布式蓄水池:

参考 https://www.jianshu.com/p/7a9ea6ece2af

如果需要抽出m个数据

  1. 假设有K台机器,将大数据集分成K个数据流,每台机器使用单机版蓄水池抽样处理一个数据流,抽样m个数据,并最后记录处理的数据量为N1, N2, …, Nk, …, NK(假设m<Nk)。N1+N2+…+NK=N。(首先对子数据集Nk执行蓄水池抽样算法, K个执行节点就K个水池, 这样总共抽出m*k个数据)
  2. 取[1, N]一个随机数d(N由第一步中计算得来),若d<N1,则在第一台机器的蓄水池中等概率不放回地(1/m)选取一个数据;若N1<=d<(N1+N2),则在第二台机器的蓄水池中等概率不放回地选取一个数据;一次类推,重复m次,则最终从N大数据集中选出m个数据。(在最终的m*k个数据中随机抽取m个数据)

第2步m/N的概率验证如下:

  1. 第k台机器中的蓄水池数据被选取的概率为m/(Nk)。
  2. 从第k台机器的蓄水池中选取一个数据放进最终蓄水池的概率为Nk/N。
  3. 第k台机器蓄水池的一个数据被选中的概率为1/m。(不放回选取时等概率的)
  4. 重复m次选取,则每个数据被选中的概率为m(m/NkNk/N*1/m)=m/N。