Merge K Sorted Iterators

This is an interesting quick example of an algorithm for merging K sorted iterators into a single sorted iterator. It's a common enough task - there's even a leetcode question for it. More practically it can be useful for reading over multiple runs of data like in a Log-structured merge-tree powered database.

One important note upfront: If you actually had reason to do this in a production application, there's probably something already out there and you shouldn't jump to writing your own. This example is in Rust, and itertools::kmerge would almost certainly be better than rolling your own like I am doing here. I'm also making things a little more difficult than they need to be by not using a peekable iterator, but that's a minor detail.

The general idea of the algorithm is to have a single struct containing a list of all iterators to be merged. That struct is itself an iterator, and to yield its next value it selects the next single item in order from the iterators it contains. The tricky part is that (generally, see note above about peekable iterator) once something is taken out of an iterator it can't be put back. This poses a problem because there has to be way to decide which iterator is "next" to yield its value.

Buffered Iterator Wrapper

A way to solve this is to add a wrapper around an iterator with a buffer that can hold a single item from the iterator. The items in all the merged iterator's buffers can then be compared and the next one up can be yielded. The item is simultaneously take out of the buffer, yielded to the caller, and the next item from that iterator is taken out to re-populate the buffer. This wrapper is very simple, and could look something like this:

struct IterBuf<T: Ord, I: Iterator<Item = T>> {
    iter: I,
    buf: Option<T>,
}

Efficiently Selecting the Next Item

Checking every single buffer probably won't be too bad. More than likely the number of distinct iterators is relatively small compared to the number of items they contain. But it is still possible to do better than checking every single one each time an item must be yielded. A way to keep things sorted is needed, and a heap is a good data structure for this. The heap will contain our buffered iterators and sort them based on the current value in the buffer. When the next item is needed, a buffered iterator is popped from the heap and its buffered value taken, yielded, and replaced. The buffered iterator can then simply be put back on the heap. The next time a buffered iterator is popped from the heap it may or may not be that one, but it is guaranteed to be the next in sorted order.

In slightly more technical terms: If N is the number of total items in all the iterators and M is the number of iterators, checking every iterator to yield every item will take N*M operations. Popping from a heap is trivial (constant time) and inserting back into a heap is log(M), so using a heap to yield every item will take N*log(M) operations. This can be quite a bit faster if there is a large number of iterators.

Here's the full code. Using a heap in Rust requires its items to implement traits related to ordering and a lot of the logic is related to that. Also pay attention to the handling of empty iterators: Instead of putting them back in the heap they can just be thrown away.

use std::collections::BinaryHeap;

struct MergedIterator<T: Ord, I: Iterator<Item = T>> {
    items: BinaryHeap<IterBuf<T, I>>,
}

impl<T: Ord, I: Iterator<Item = T>> MergedIterator<T, I> {
    fn new() -> Self {
        MergedIterator {
            items: BinaryHeap::new(),
        }
    }

    fn add(&mut self, mut new: I) {
        self.items.push(IterBuf {
            buf: new.next(),
            iter: new,
        })
    }
}

impl<T: Ord, I: Iterator<Item = T>> Iterator for MergedIterator<T, I> {
    type Item = T;

    fn next(&mut self) -> Option<Self::Item> {
        match self.items.pop() {
            Some(mut next) => {
                if let Some(val) = next.buf {
                    // Advance the selected iterator and put it back in the heap if it has anything left.
                    next.buf = next.iter.next();
                    if next.buf.is_some() {
                        self.items.push(next);
                    }

                    return Some(val);
                }

                None // All iterators are empty
            }
            None => None, // Empty heap
        }
    }
}

struct IterBuf<T: Ord, I: Iterator<Item = T>> {
    iter: I,
    buf: Option<T>,
}

// Making a min heap, so the comparisons here are reversed from what you might expect.
impl<T: Ord, I: Iterator<Item = T>> Ord for IterBuf<T, I> {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        match (&self.buf, &other.buf) {
            (None, None) => std::cmp::Ordering::Equal,
            (None, Some(_)) => std::cmp::Ordering::Less,
            (Some(_), None) => std::cmp::Ordering::Greater,
            (Some(this_one), Some(other_one)) => other_one.cmp(&this_one),
        }
    }
}

impl<T: Ord, I: Iterator<Item = T>> PartialOrd for IterBuf<T, I> {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl<T: Ord, I: Iterator<Item = T>> Eq for IterBuf<T, I> {}

impl<T: Ord, I: Iterator<Item = T>> PartialEq for IterBuf<T, I> {
    fn eq(&self, other: &Self) -> bool {
        self.buf == other.buf
    }
}

fn main() -> anyhow::Result<()> {
    let first = (1..5).into_iter();
    let second = (1..10).into_iter();
    let third = (8..13).into_iter();

    let mut merged = MergedIterator::new();

    for item in [first, second, third] {
        merged.add(item)
    }

    while let Some(next) = merged.next() {
        println!("{}", next);
    }

    Ok(())
}

rust

911 Words

2023-02-24