Wednesday, February 11, 2009

Streams

I just got a question via email that threw me back to my days at Netscape/Mozilla implementing nsIStreamConverter. The question is "how do I process streamed data?" There are many answers, but I thought I'd provide a fairly generic one here, along with some pseudo code (a nice mix of C, Python, and Javascript, for your viewing pleasure). But first, I want to try and do a high-level explanation of streams as they can be confusing to folks who are used to dealing exclusively in discrete, bounded, data chunks.

It's important to note that all I/O is stream based at some level; network and disk. When you read a disk from file, the bytes are streamed off the disk, into a lower-level socket API, then presented to you in your application via some "read()" function/method. Many languages do some convenience demarcation for you and allow things like "readline()" so you can easily read a line from a file that is broken apart by EOL markers. If you find yourself on the other end of a raw byte "read()" routine (whether off of a socket or a file; a basic file descriptor), then you're dealing with "streams" and you'll need some incarnation of the following code if you're trying to parse the data.

Some "streams" can be consumed and acted upon in small chunks (either byte by byte, or in chunks), or in large chunks. Some streams are binary, and some are text based. Today's web deals in lots of "text" based streams, so the below example follows that lead.

Hopefully you find this useful.

Imagine some data source providing data to your routine; get_data_from_stream() in this example. Then imagine you want to act on the data as it comes in.


// this is the local buffer we'll use to
// accumulate data from the stream
buffer = ''

// when processing a stream, you need
// to know when you have enough
// data to process, sometimes this is
// token based (a string, or a
// character), sometimes it is after a
// certain number of bytes (in
// which case this token is irrelevant).
// In this case, I want to do
// something once I've reached the end
// of an RSS entry. this
// processor handles "entries".
demarcationToken = ""

while ( data = get_data_from_stream() ) {
// take the data from the stream, and
// append it to the local buffer. this
// allows us to grow the local buffer
// until we have enough data to digest
buffer += data

// determine whether or not our processing
// specific demarcation exists yet
if ( tokenPosition = buffer.contains(demarcationToken) ) {
// we found a point in the buffer that will allow
// us to do some processing

// define the chunk to process, as the buffer
// up to the end of the demarcationToken.
chunk = buffer.subString(0, tokenPosition + len(demarcationToken))

// now do something with it
do_something_with_the_chunk(chunk)

// reset the buffer to the position beyond the chunk you just
// processed.
buffer = buffer[tokenPosition + len(demarcationToken)]

// rinse and repeat
} // end if
} // end while

No comments: