Conditional “tee” with Python

This post describes the conditional tee ("ctee") module I wrote to split a sequence into two generators, according to a filter function.

The problem

David Beazley has a great article about generator pipelining using Python. This is a technique for handling (potentially very large) streams of data in a flexible yet efficient way. As an example, here's a code snippet he gives for summing the total bytes from a log file:

wwwlog = open("access-log")
bytecolumn = (line.rsplit(None,1)[1] for line in wwwlog)
bytes = (int(x) for x in bytecolumn if x != '-')

print "Total", sum(bytes)

The code above first opens a log file, then gets the byte column for each entry. The byte value (if any) is then calculated for each row. Finally, the generator is consumed (or "pumped"), yielding the sum.

Since the entire file is never loaded into active memory, you could run this on quite huge log files, or even add a few steps and run it on collections of log files, without blowing up your memory. Another feature of this technique is that it's very flexible: you can add steps, combine steps into atomic actions, rearrange them, and so on.

This works great, as long as your pipe doesn't branch. If you want to split your pipe — say, dividing a stream of integers into one stream of even numbers and another of odds, things get a little complicated. One really elegant way to handle this situation is with the itertools.tee function. tee takes an iterable sequence, and returns n "copies" of that sequence that can be iterated independently.

Using itertools.tee

import itertools

lines = open("numbers.txt")
numbers = (int(line) for line in lines)

first, second = itertools.tee(numbers)

evens = (i for i in first if not i % 2)
odds = (i for i in second if i % 2)

print "Evens total:", sum(evens)
print "Odds total:", sum(odds)

The code first opens a file containing a bunch of random integers, and creates a generator that's a stream of integers. It then uses itertools.tee to make two copies of that generator (first and second), and applies generator expressions to create two streams: one of even numbers, and one of odd numbers. The built-in sum function is then used to consume each tee.

Here's the number file that I used for this code. It's a list of 1,000 random integers between 1 and 1,000,000.

That's fine in this case, where the filter expression is relatively inexpensive. But what if we have an expensive filter, like testing whether the number is prime, or making a database query? It could really hurt our performance if we have to perform the same test twice. Ideally, we'd just like to perform the test once for each element in our pipeline.

There are lots of ways to handle that situation. One common way is the "continuation-passing" style, where you pass some data, a filter condition, and one or more functions to perform depending on the results of the test.

This works, but it disrupts the pipeline. That costs us the flexibility and dynamic nature of the generator paradigm.

I wrote the conditional tee (ctee) module for cases when you want to use a generator pipeline, but you need to split the sequence into two generators, and the filter condition is expensive. It creates a pair of instances of the ConditionalTee class, which are linked to each other.

Using conditional tee

Here's the meat of the code. The module can be downloaded here (ctee.zip).

from Queue import Queue

class ConditionalTee(object):
    """A conditional tee class"""

    def __init__(self, sequence, condition):
        self.sequence = sequence
        self.condition = condition
        self.othertee = None
        self.q = Queue()

    def next(self):
        """
        Get the next item that matches the condition.
        Adds items to the queue of the other sequence until
        one matching this condition is reached.
        "
""
        if not self.q.empty():
            return self.q.get()

        item = self.sequence.next()
        while not self.condition(item):
            self.othertee.q.put(item)
            item = self.sequence.next()
        return item

    def __iter__(self):
        """We are an iterator"""
        return self

def ctee(sequence, condition):
    """
    Creates two sequences from sequence: one where
    condition holds, and the other where it doesn't
    sequence -> (x for x in sequence if condition(x)),
            (x for x in sequence if not condition(x))
    "
""

    yes_iter = ConditionalTee(sequence, condition)
    nocond = lambda x : not condition(x)
    no_iter = ConditionalTee(sequence, nocond)
    yes_iter.othertee = no_iter
    no_iter.othertee = yes_iter

    return yes_iter, no_iter

The ConditionalTee class takes a sequence and a filter condition as arguments to its __init__ method. The __init__ method also creates an empty queue member, and an othertee member that's initialized to None.

When the next method of a ConditionalTee instance is called, it first looks for any items in its queue. If there is an item on the queue, it returns the first one. Otherwise, it iterates through its sequence; it keeps adding any items that don't match to the queue of its othertee member, until it either finds an item that matches or raises a StopIteration exception.

The ctee function also takes a sequence and a filter condition as arguments. It creates two ConditionalTee instances, and sets their othertee members to each other, then returns the two instances as a pair.

Here's some sample code using ctee:

lines = open("numbers.txt")
nums = (int(line) for line in lines)
iseven = lambda x : not x % 2
evens, odds = ctee.ctee(nums, iseven)

There's still a problem if you "pump" each of these generators in succession, though: if the amount of data is large, the other generator class is going to accumulate a huge queue of data. It would be better to pump each generator expression alternately, taking an item and processing it from each generator in turn, in order to avoid building up a big queue.

Here's a function that'll do that:

Pumping generators alternately instead of consecutively

def diagonalize(sequences):
    """
    Takes each sequence in turn, retrieving one item from that
    sequence and performing action on it, until all sequences
    are exhausted.
    sequence is a sequence of (iterable, action) pairs.
    "
""
    sequences = [(iter(s), a) for (s, a) in sequences]
    while sequences:
        for sequence, action in sequences:
            try:
                item = sequence.next()
                action(item)
            except StopIteration:
                # remove the exhausted sequence from the list
                sequences = [(s, a)
                    for (s, a) in sequences
                    if s != sequence]

This takes a sequence of (sequence, action) pairs. It iterates through each pair, taking the next item in the sequence and applying the action to it. If the sequence raises StopIteration, it's removed from the list of sequences. The list comprehension at the start of the function is to make sequence test False when empty, and to ensure each sequence in it is an iterable (i.e. supporting next).

Here's an example of using this function:

lines = open("numbers.txt")
numbers = (int(line) for line in lines)

evens, odds = ctee(numbers, lambda x : not x % 2)

evenout = open("evens.txt", "w")
oddout = open("odds.txt", "w")

def writeline(out, item):
    print >> out, item

evenaction = lambda x : writeline(evenout, x)
oddaction = lambda x : writeline(oddout, x)

diagonalize(((evens, evenaction), (odds, oddaction)))

This code will write all the even numbers to "evens.txt", and all the odd numbers to "odds.txt".

You might ask, how is this different from the continuation passing style? And you'd have a point; this is essentially continuation passing.

The thing is that here, the pumping only happens at the end. You can still go on wrapping all sorts of other filtering and transforming generators around your two conditional tees; the sequence won't actually be processed until you start pumping the generators at the end, so you won't build up enormous queues of data.

2 comments to Conditional “tee” with Python

  • Nice!
    Note that tee caches previous results. So if you iterate on just one of the resulting streams, you would still incur the memory overhead.
    Similarly, a conditional tee would require you to cache all false-condition elements when iterating on the true-condition sequence, and vice versa.

    Also, this reminds me very much of my “classify_to_dict” function (see http://www.algorithm.co.il/blogs/index.php/programming/python/small-python-utility-functions/ , now renamed in my code to just “classify”).

    Had I written classify_to_dict to return generators, it would be very similar indeed. (Although returning a dict as a generator is not really possible today)

  • @lorg

    Quite right about accumulating large queues of data. The diagonalize function in the post alleviates that somewhat, but in pathological cases (like sorted input, or where one of the cases is missing), you’ll still build up a queue. Much like with choosing sorting algorithms, you’ll have to choose your algorithm based somewhat on what kind of data you’ll have.

Leave a Reply

 

 

 

You can use these HTML tags

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>