Optimal and Elegant Distributed Uniform Sampling

Introduction

Sampling uniformly random subsets of data is an important tool in a data team's tool box. Uniform sampling can extract representative data sets for labeling, training or testing models, or for manually examining large data sets. Typically we know how much data we want—based on the human resources available to label, computational resources available to train, the margin of error in test set evaluation, or time to peruse the data. Also we usually don't care about how much data is available so long as there's at least as much as we want.

Sometimes there is too much data to store or process on one machine. When sampling big data, we would like to use distributed algorithms to spread the work across a cluster. In this article we will discuss how to efficiently sample a fixed amount of data from a data set of unknown size in a distributed setting.

For mathematical convenience all arrays will be indexed starting with 1.

Reservoir Sampling

In the single threaded context, an algorithm called reservoir sampling solves this problem optimally. This algorithm has several nice properties:

  1. it does not need to know the size of the data set in advance;
  2. it uses only as much memory as is necessary to store the sample;
  3. it performs one pass over the data, drawing one random natural number per item.

The key to achieving the above properties is that as the data streams through the algorithm, it maintains a uniform sample of data seen so far. Whenever the stream of data ends, the algorithm returns what it has.

The algorithm first allocates an array of capacity \(c\), which is the number of items to sample. It reads the first \(c\) items into the array. For each subsequent item \(x\) encountered, let \(s\) be the number of items the sampler has seen up to and including \(x\). It chooses a random integer \(r\) from \(1\) to \(s\) (inclusive) and if \(r \le c\), replaces the \(r\)th item in the array with \(x\). The algorithm's state consists of the sample so far and the number of items seen.

A simple calculation shows that after the sampler has seen s items, the probability that each of those items is in the buffer is \(\frac{c}{s}\); we leave the proof as an exercise to the reader. Additionally, whether an item is selected is independent of whether all previous items were selected. Therefore the sample is uniform.

Reservoir sampling is ideal in a single thread but it must encounter and choose items one at a time so it doesn't parallelize. Unfortunately, that means it's unsuitable for sampling very big data sets.

Problem Statement

To make the problem of distributed sampling more concrete, we will consider sampling by using the aggregate operation defined in Apache Spark. This operation aggregates a partitioned data set down to a single value, which will be our sample. It takes three parameters: an initial aggregate value for each partition, an operation to fold an item into an aggregate value, and an operation to combine two aggregate values. The aggregate operation first folds data in each partition into an initial aggregate value to aggregate the partition and then arbitrarily pairwise combines the partition aggregates together.

Prior Work

Variations of distributed sampling have been studied in the literature. Prior work has been done on sampling in the setting where there is a central coordinator server ([1], [2], [3], among others). The downside of this approach is that it requires a custom protocol for communication between the coordinator and other servers, so it does not integrate well with frameworks like Apache Spark.

Methods to uniformly sample a set by partitioning it and sampling the partitions separately have been studied ([6], [7]). They sample from hypergeometric distributions to determine the sizes of subsets to split a set of items into. They run in linear time with high probability. However, they are more complex than our approach and don't fit the aggregate pattern, making them harder to run on a cluster.

The problem of merging two uniform samples computed in parallel has been studied ([8]). This approach is optimal, up to a constant factor, in time and space. However it leaves out an important detail: how to seek and delete at arbitrary indices in lists in constant time. We know of no data structures that can do that and believe that none can exist, in the same vein as [9]. A more tailored approach is to shuffle lists into queues and pop from their heads but this approach requires additional randomness and space and is more complex than our approach.

Another approach to merging two uniform samples is weighted sampling ([4], [5]). These approaches can be used to merge uniform samples in an aggregate operation. However, [4] suffers from technical difficulties that require extra complexity to avoid and [5] requires extra space and more mathematical calculations than our approach.

On the other hand, our method can be implemented simply, with optimal time and randomness, in-place, and with no additional subroutines or data structures. We also believe subjectively that our approach has a nice symmetry with reservoir sampling, making it aesthetically pleasing.

Our Method

Reservoir sampling could be used in an aggregate operation were it not for the requirement to merge two aggregates. The initial state would be an empty reservoir sampler with capacity \(c\) and folding an item into a sampler would sample it according to the reservoir sampling algorithm. The rest of this section will be devoted to figuring out how to complete the puzzle by merging two reservoir samplers.

Precisely, merging two samplers means taking two arrays of capacity \(c\), the first array uniformly sampled from some quantity \(s_1\) of items and the second array uniformly sampled from some \(s_2\) items, and producing an array of capacity \(c\) uniformly sampled from \(s_1 + s_2\) items. Note that if a reservoir sampler has not yet seen more than \(c\) items, i.e. \(s \le c\), it has taken every item it has seen with probability \(1\) so it can be merged into another sampler by simply reservoir sampling its sample into the other sampler. Therefore the only nontrivial case is when both samplers' arrays are full and both \(s_1\) and \(s_2 > c\).

Suppose we sample items one at a time from one sampler into the other. Specifically, consider iteratively replacing a random item in sampler 1's sample with item \(i\) in sampler 2's sample with some probability \(q(i)\), where \(i\) varies from \(1\) to the capacity of the samplers, \(c\). Note that it isn't even clear if there exists a function \(q\) that results in a uniform sample over all seen items. In this section we define the problem precisely, propose a function \(q\), and prove that it produces a uniform sample. We will see that there is a simple algorithm using this \(q\) that, like reservoir sampling, performs one pass over the items in sampler 2, draws one random natural number per element of sampler 2, and uses no additional space.

Let \(p_1(i)\) be the probability that an item that sampler 1 saw is in sampler 1's sample after iteration \(i\) of this merging process. \(p_1\) satisfies the recurrence: $$\begin{align} p_1(0) &= \frac{c}{s_1}; \\ p_1(i) &= p_1(i-1)\left(1 - \frac{q(i)}{c}\right). \end{align}$$

Consider the probability that the item that sampler 2 saw is in sampler 1's sample after iteration \(i\) of the merging process. If the item did appear in sampler 1's sample, it must be in some position \(j\) in sampler 2's sample. As this probability is 0 if \(i < j\), we only consider the case that \(i \ge j\). Let \(p_2(j, i)\) be this probability. \(p_2\) satisfies the recurrence: $$\begin{align} p_2(i, i) &= \frac{c}{s_2}q(i); \\ p_2(j, i) &= p_2(j, i-1)\left(1 - \frac{q(i)}{c}\right). \end{align}$$

We want to choose \(q\) such that \(p_1(i) = p_2(j, i)\) for all \(i\) and \(j\). As \(p_1(i)\)'s and \(p_2(j, i)\)'s dependence on \(p_1(i-1)\) and \(p_2(j, i-1)\) are the same, it is sufficient to show that \(p_1(i)\) = \(p_2(i, i)\) for all \(i\).

We propose \(q(i) = \frac{c s_2}{c s_1 + i s_2}\). We prove that this choice of \(q\) gives \(p_1(i) = p_2(i, i)\) for all \(i\) by induction. While the sampling process starts with \(i = 1\), our proof starts with \(i = 0\) for simplicity.

Base case: \(p_1(0) = p_2(0, 0)\). This follows immediately from the definition of \(q\).

Induction case: \(p_1(i-1) = p_2(i-1, i-1) \Rightarrow p_1(i) = p_2(i, i)\). This is equivalent to \(p_1(i-1) = p_2(i-1, i-1) \Rightarrow p_1(i-1) = \frac{p_2(i, i)}{1-\frac{q(i)}{c}}\), which holds iff \(q(i-1) = \frac{q(i)}{1-\frac{q(i)}{c}}\). This is determined by an easy computation: $$\begin{align} \frac{q(i)}{1 - \frac{q(i)}{c}} &= \frac{1}{\frac{1}{q(i)} - \frac{1}{c}} \\ &= \frac{1}{\frac{c s_1 + i s_2}{c s_2} - \frac{s_2}{c s_2}} \\ &= \frac{1}{\frac{c s_1 + (i-1) s_2}{c s_2}} \\ &= q(i-1). \end{align}$$

We have established that every item that either sampler 1 or sampler 2 saw ends up in sampler 1 with the same probability at the end of the merging process so the sample is uniform.

Putting It Together

When the dust settles we're left with the following algorithm. It makes one pass over the items in one sampler, drawing one random integer for each, and uses no additional memory. Once again, arrays are 1-indexed.

class sampler:
    array sample
    int seen

merge(sampler sampler1, sampler sampler2):
    if sampler1.seen < sampler2.seen:
        merge(sampler2, sampler1)
    else if sampler2.seen < c:
        reservoir sample sampler2.sample into sampler1
    else:
        for i = 1 to c:
            roll <- draw random number from 1 to c * sampler1.seen + i * sampler2.seen // inclusive
            if roll <= c * sampler2.seen:
                sampler1.sample[roll mod c] = sampler2.sample[i] // mod is in range [1, c]
        sampler1.seen <- sampler1.seen + sampler2.seen

Conclusion

We have learned an algorithm for uniform sampling in a distributed setting. This algorithm is simple and efficient and works with Apache Spark. It may be seen as an extension of the popular reservoir sampling algorithm. It can be applied out of the box to data sets both small and large.

Code

An implementation of this algorithm is available here.

References

  1. Cormode, G., Muthukrishnan, S., Yi, K., Zhang, Q.. Optimal sampling from distributed streams. Proceedings of the twenty-ninth ACM SIGMOD-SIGACT-SIGART symposium on Principles of database systems, 2010-06-06 (pp. 77-86).
  2. Tirthapura, S., Woodruff, DP. Optimal random sampling from distributed streams revisited. International Symposium on Distributed Computing, 2011-09-20 (pp. 283-297).
  3. Chung, YY, Tirthapura, S. Distinct random sampling from a distributed stream. 2015 IEEE International Parallel and Distributed Processing Symposium, 2015-05-25 (pp. 532-541).
  4. Weighted Random Sampling Over Data Streams [3]
  5. Weighted Random Sampling Over Data Streams [8]
  6. Sanders, P., Lamm, S., Hübschle-Schneider, L., Schrade, E., Dachsbacher, C. Efficient random sampling-parallel, vectorized, cache-efficient, and online. arXiv preprint arXiv:1610.05141, 2016-10-17.
  7. Distributed reservoir sampling for web applications
  8. Distributed/Parallel Reservoir Sampling
  9. Fredman, M., Saks, M. The cell probe complexity of dynamic data structures. STOC '89: Proceedings of the twenty-first annual ACM symposium on Theory of computing, 1989-02 (pp.345–354).