Jun 9, 2019 Tags: data, devblog, ruby
When working on projects that involve data analysis, I regularly find myself implementing a poor man’s MapReduce (and a plugin system, but that’s a subject for another blog post). My conceptual model is pretty simple:
These requirements inevitably bring me back to good old Unix pipelines:
So, I end up writing half-baked pipeline implementations. Normally, I do this the old fashioned way:
pipe(2)
, fork(2)
and dup2(2)
, or whatever the equivalent proxies are in my managed language of
choice. However, this gets really tedious, especially in a language like Ruby: a lot of low-level
process and stream management feels bolted-on and unidiomatic.
To get to the point: I’ve found a better way to dig my own grave — Open3’s pipeline
methods.
This blog post will provide some short examples of using pipeline
and pipeline_rw
to
create your own terrible processing pipelines.
Open3
is a severely underrated (and underdocumented) member of the Ruby stdlib: its
popen
and capture
methods are almost always better suited for process management than the
methods provided by either
Process or
Kernel, and it isn’t full of cute (and even more severely
undocumented) block and operator abuse like
Shell.
Anyways, despite my love for (and frequent use of) Open3’s popen
methods, I had never really paid
attention to the other things it provides. pipeline
and pipeline_rw
are two of those things.
Open3.pipeline
Open3.pipeline
is beautifully simple:
1
2
3
require "open3"
Open3.pipeline *cmds, opts
That’s it. For example. here’s how you’d pipeline three commands together:
1
Open3.pipeline "foo", "bar", "baz"
Performing the full round-trip with stdin
and stdout
should remind you of Process.spawn
:
1
Open3.pipeline "foo", "bar", "baz", in: STDIN, out: STDOUT
Internally, pipeline
sends each command to Process.spawn
. That means you can do things like
this:
1
Open3.pipeline "foo", ["bar", umask: 0777, env: { "VERBOSE" => "0" }], "baz"
Open3.pipeline_rw
pipeline_rw
is almost identical to pipeline
, but with two key differences:
IO
objects for the first stdin
, last stdout
,
and a list of “wait threads”, which are really just wrappers around Process::Status
with
a pid
method to get the running PID and a value
method to get the whole Process::Status
instance.pipeline_rw
does not wait for the pipeline to finish. You need to call Process.wait
or another method in that family to synchronize with the parent process.pipeline_rw
makes feeding from a program-generated source easy:
1
2
3
4
5
6
Open3.pipeline_rw "foo", "bar", "baz" do |stdin, stdout, _ts|
stdin.write data
stdin.close
out = stdout.read
end
Process.waitall
A blockless version is also available:
1
2
3
4
5
stdin, stdout, = Open3.pipeline_rw "foo", "bar", "baz"
stdin.write data
stdin.close
out = stdout.read
Process.waitall