ENOSUCHBLOG

Programming, philosophy, pedaling.


Pipelines in Ruby with Open3

Jun 9, 2019

Tags: ruby, data, devblog

When working on projects that involve data analysis, I regularly find myself implementing a poor man's MapReduce (and a plugin system, but that's a subject for another blog post). My conceptual model is pretty simple:

  1. I have a big stream of data, unambiguously divided into records.
  2. I want to pass my stream through a series of transforms and filters, preferably in rough increasing order of computational intensity.
  3. I want the series to be record-oriented, such that the only blocker between steps is each individual record, not the entire stream.

These requirements inevitably bring me back to good old Unix pipelines:

  1. In a pipeline executed in the shell, each command runs in parallel.
  2. Input and output are conceptually blocked on the entire input and output streams for each command, but reading individual (line or size-delimited) records allows (in the best case) each command to be doing work for the vast majority of execution time.

So, I end up writing half-baked pipeline implementations. Normally, I do this the old fashioned way: pipe(2), fork(2) and dup2(2), or whatever the equivalent proxies are in my managed language of choice. However, this gets really tedious, especially in a language like Ruby: a lot of low-level process and stream management feels bolted-on and unidiomatic.

To get to the point: I've found a better way to dig my own grave — Open3's pipeline methods. This blog post will provide some short examples of using pipeline and pipeline_rw to create your own terrible processing pipelines.

Open3

Open3 is a severely underrated (and underdocumented) member of the Ruby stdlib: its popen and capture methods are almost always better suited for process management than the methods provided by either Process or Kernel, and it isn't full of cute (and even more severely undocumented) block and operator abuse like Shell.

Anyways, despite my love for (and frequent use of) Open3's popen methods, I had never really paid attention to the other things it provides. pipeline and pipeline_rw are two of those things.

Open3.pipeline

Open3.pipeline is beautifully simple:

require "open3"

Open3.pipeline *cmds, opts

That's it. For example. here's how you'd pipeline three commands together:

Open3.pipeline "foo", "bar", "baz"

Performing the full round-trip with stdin and stdout should remind you of Process.spawn:

Open3.pipeline "foo", "bar", "baz", in: STDIN, out: STDOUT

Internally, pipeline sends each command to Process.spawn. That means you can do things like this:

Open3.pipeline "foo", ["bar", umask: 0777, env: { "VERBOSE" => "0" }], "baz"

Open3.pipeline_rw

pipeline_rw is almost identical to pipeline, but with two key differences:

pipeline_rw makes feeding from a program-generated source easy:

Open3.pipeline_rw "foo", "bar", "baz" do |stdin, stdout, _ts|
  stdin.write data
  stdin.close
  out = stdout.read
end
Process.waitall

A blockless version is also available:

stdin, stdout, = Open3.pipeline_rw "foo", "bar", "baz"
stdin.write data
stdin.close
out = stdout.read
Process.waitall