#!/usr/bin/env perl # nfu: Command-line numeric fu | Spencer Tipping # Licensed under the terms of the MIT source code license use v5.14; use strict; use warnings; use utf8; use Fcntl; use Socket; use Time::HiRes qw/time/; use POSIX qw/dup2 mkfifo setsid :sys_wait_h/; use File::Temp qw/tmpnam/; use Math::Trig; use constant VERBOSE_INTERVAL => 50; use constant HADOOP_VERBOSE_INTERVAL => 10000; use constant DELAY_BEFORE_VERBOSE => 500; use constant SQL_INFER_PEEK_LINES => 20; use constant LOG_2 => log(2); # 64-bit hex constants in geohash encoder won't work on 32-bit architectures no warnings 'portable'; our $diamond_has_data = 1; ++$|; # Setup child capture. All we need to do is wait for child pids; there's no # formal teardown. $SIG{CHLD} = sub { local ($!, $?); 1 while waitpid(-1, WNOHANG) > 0; }; # NB: This import is not used in nfu directly; it's here so you can use these # functions inside aggregators. use List::Util qw(first max maxstr min minstr reduce shuffle sum); sub prod { my $p = 1; $p *= $_ for @_; $p; } # Same for this, which is especially useful from aggregators because multiple # values create multiple output rows, not multiple columns on the same output # row. sub row {join "\t", map s/\n//gr, @_} # Order-preserving unique values for strings. This is just too useful not to # provide. sub uniq { local $_; my %seen; my @order; $seen{$_}++ or push @order, $_ for @_; @order; } sub frequencies { local $_; my %freqs; ++$freqs{$_} for @_; %freqs; } sub reductions(&$@) { my ($f, $x, @xs) = @_; my @ys; push @ys, $x = $f->($x, $_) for @xs; @ys; } sub cp { # Cartesian product of N arrays, each passed in as a ref return () if @_ == 0; return map [$_], @{$_[0]} if @_ == 1; my @ns = map scalar(@$_), @_; my @shifts = reverse reductions {$_[0] * $_[1]} 1 / $ns[0], reverse @ns; map { my $i = $_; [map $_[$_][int($i / $shifts[$_]) % $ns[$_]], 0..$#_]; } 0..prod(@ns) - 1; } sub round_to { my ($x, $quantum) = @_; $quantum ||= 1; my $sign = $x < 0 ? -1 : 1; int(abs($x) / $quantum + 0.5) * $quantum * $sign; } sub mean {scalar @_ && sum(@_) / @_} sub log2 {log($_[0]) / LOG_2} sub entropy { local $_; my $s = sum(@_) || 1; my $t = 0; $t -= ($_ / $s) * log($_ / $s) for @_; $t / LOG_2; } sub dot { local $_; my ($u, $v) = @_; my $s = 0; $s += $$u[$_] * $$v[$_] for 0 .. min($#{$u}, $#{$v}); $s; } sub dist { # Euclidean distance (you specify the deltas) local $_; my $s = 0; $s += $_*$_ for @_; sqrt($s); } sub sdist { # Spherical-coordinate great circle distance; you specify theta1, phi1, # theta2, phi2, each in degrees; radius is assumed to be 1. Math from # http://stackoverflow.com/questions/27928/how-do-i-calculate-distance-between-two-latitude-longitude-points local $_; my ($t1, $p1, $t2, $p2) = map $_ / 180 * pi, @_; my $dt = $t2 - $t1; my $dp = $p2 - $p1; my $a = sin($dp / 2) * sin($dp / 2) + cos($p1) * cos($p2) * sin($dt / 2) * sin($dt / 2); 2 * atan2(sqrt($a), sqrt(1 - $a)); } sub edist { # Earth distance between two latitudes/longitudes, in km; up to 0.5% error my ($lat1, $lng1, $lat2, $lng2) = @_; 6371 * sdist $lng1, $lat1, $lng2, $lat2; } sub line_opposite { # Returns true if two points are on opposite sides of the line starting at # (x0, y0) and whose direction is (dx, dy). my ($x0, $y0, $dx, $dy, $x1, $y1, $x2, $y2) = @_; return (($x1 - $x0) * $dy - ($y1 - $y0) * $dx) * (($x2 - $x0) * $dy - ($y2 - $y0) * $dx) < 0; } sub evens(@) {local $_; @_[map $_ * 2, 0 .. $#_ >> 1]} sub odds(@) {local $_; @_[map $_ * 2 + 1, 0 .. $#_ >> 1]} sub parse_wkt { my @rings = map [/([-0-9.]+)\s+([-0-9.]+)/g], split /\)\s*,\s*\(/, $_[0]; { rings => [map [map $_ + 0, @$_], @rings], ylimit => 1 + max(map max(@$_), @rings), bounds => [min(map evens(@$_), @rings), max(map evens(@$_), @rings), min(map odds(@$_), @rings), max(map odds(@$_), @rings)], } } sub in_poly { # Returns true if a point resides in the given parsed polygon. my ($x, $y, $parsed) = @_; my $ylimit = $parsed->{ylimit}; my @bounds = @{$parsed->{bounds}}; return 0 if $x < $bounds[0] || $x > $bounds[1] || $y < $bounds[2] || $y > $bounds[3]; my $hits = 0; for my $r (@{$parsed->{rings}}) { my ($lx, $ly) = @$r[0, 1]; for (my $i = 2; $i < @$r; $i += 2) { my $cx = $$r[$i]; my $cy = $$r[$i + 1]; ++$hits if $lx <= $x && $x < $cx || $lx >= $x && $x > $cx and line_opposite $lx, $ly, $cx - $lx, $cy - $ly, $x, $y, $x, $ylimit; $lx = $cx; $ly = $cy; } } $hits & 1; } sub rect_polar { (dist(@_), atan2($_[0], $_[1]) / pi * 180) } sub polar_rect { ($_[0] * sin($_[1] / 180 * pi), $_[0] * cos($_[1] / 180 * pi)) } sub degrees_radians { $_[0] / 180 * pi } sub radians_degrees { $_[0] / 180 * pi } # JSON support (if available) our $json; if (eval {require JSON}) { JSON->import; no warnings qw(uninitialized); $json = JSON->new->allow_nonref->utf8(1); } elsif (eval {require JSON::PP}) { JSON::PP->import; no warnings qw(uninitialized); $json = JSON::PP->new->allow_nonref->utf8(1); } else { print STDERR "note: no JSON support detected (try 'cpan install JSON')\n"; print STDERR "nfu will soon have its own JSON parser rather than using "; print STDERR "a native library for this. Sorry for the inconvenience."; } # These are callable from evaled code sub expand_filename_shorthands; sub read_file { open my $fh, expand_filename_shorthands $_[0], 1; my $result = join '', <$fh>; close $fh; $result; } sub write_file { open my $fh, '>', $_[0]; $fh->print($_[1]); close $fh; $_[0]; } sub read_lines { local $_; open my $fh, expand_filename_shorthands $_[0], 1; my @result; chomp, push @result, $_ for <$fh>; close $fh; @result; } sub write_lines { local $_; my $filename = shift @_; open my $fh, '>', $filename; $fh->print($_, "\n") for @_; close $fh; $filename; } sub open_file { open my $fh, expand_filename_shorthands $_[0], 1; $fh; } sub json_encode {$json->encode(@_)} sub json_decode {$json->decode(@_)} sub hadoop_counter { printf STDERR "reporter:counter:nfu,%s_%s,%d\n", $_[0] =~ y/,/_/r, $_[1] =~ y/,/_/r, $_[2] // 1; } sub F {my $l = shift @_; (split /\t/, $l)[@_]} our @gh_alphabet = split //, '0123456789bcdefghjkmnpqrstuvwxyz'; our %gh_decode = map(($gh_alphabet[$_], $_), 0..$#gh_alphabet); sub gap_bits { my ($x) = @_; $x |= $x << 16; $x &= 0x0000ffff0000ffff; $x |= $x << 8; $x &= 0x00ff00ff00ff00ff; $x |= $x << 4; $x &= 0x0f0f0f0f0f0f0f0f; $x |= $x << 2; $x &= 0x3333333333333333; return ($x | $x << 1) & 0x5555555555555555; } sub ungap_bits { my ($x) = @_; $x &= 0x5555555555555555; $x ^= $x >> 1; $x &= 0x3333333333333333; $x ^= $x >> 2; $x &= 0x0f0f0f0f0f0f0f0f; $x ^= $x >> 4; $x &= 0x00ff00ff00ff00ff; $x ^= $x >> 8; $x &= 0x0000ffff0000ffff; return ($x ^ $x >> 16) & 0x00000000ffffffff; } sub geohash_encode { local $_; my ($lat, $lng, $precision) = @_; $precision //= 12; my $bits = $precision > 0 ? $precision * 5 : -$precision; my $gh = (gap_bits(int(($lat + 90) / 180 * 0x40000000)) | gap_bits(int(($lng + 180) / 360 * 0x40000000)) << 1) >> 60 - $bits; $precision > 0 ? join '', reverse map $gh_alphabet[$gh >> $_ * 5 & 31], 0 .. $precision - 1 : $gh; } sub geohash_decode { local $_; my ($gh, $bits) = @_; unless (defined $bits) { # Decode gh from base-32 $bits = length($gh) * 5; my $n = 0; $n = $n << 5 | $gh_decode{lc $_} for split //, $gh; $gh = $n; } $gh <<= 60 - $bits; return (ungap_bits($gh) / 0x40000000 * 180 - 90, ungap_bits($gh >> 1) / 0x40000000 * 360 - 180); } # HTTP sub http_send { sprintf "HTTP/1.0 %s\nContent-Length: %d\n\n%s\n", $_[0], length($_[1]), $_[1] } sub httpok { sprintf "HTTP/1.0 200 OK\nContent-Length: %d\n\n%s\n", length($_[0]), $_[0] } sub chunk { sprintf "%x\r\n%s\r\n", length($_[0]), $_[0] } sub endchunk { chunk '' } # Function shorthands BEGIN { *dr = \°rees_radians; *rd = \&radians_degrees; *je = \&json_encode; *jd = \&json_decode; *hc = \&hadoop_counter; *rf = \&read_file; *rl = \&read_lines; *wf = \&write_file; *wl = \&write_lines; *of = \&open_file; *ghe = \&geohash_encode; *ghd = \&geohash_decode; } # File functions sub random128 {join '', map sprintf('%04x', rand 65536), 0 .. 8} sub hadoop_ls; sub hadoop_matched_partfiles; sub shell_quote; sub expand_sql_shorthands; sub expand_sqlite_db; sub expand_postgres_db; sub expand_filename_shorthands { # NB: we prepend a shell comment containing the original $f so anyone # downstream can get it back. This is currently used by Hadoop to reuse data # stored on HDFS if you write it on the command-line. (Otherwise nfu would # hadoop fs -text to download it, then re-upload to a tempfile.) my ($f, $always_make_a_command) = @_; my $result; my $original = ($f =~ s/^/#/mgr) . "\n"; no warnings 'newline'; if (-e $f || $f =~ s/^file://) { # It's really a filename, so push it onto @ARGV. If it's compressed, run it # through the appropriate decompressor first. my $piped = $f =~ s/^(.*\.gz)/cat '$1' | gzip -d/ri =~ s/^(.*\.bz2)/cat '$1' | bzip2 -d/ri =~ s/^(.*\.xz)/cat '$1' | xz -d/ri =~ s/^(.*\.lzo)/cat '$1' | lzop -d/ri; $result = $piped =~ /\|/ ? "$original$piped |" : $piped; } elsif ($f =~ /^https?:\/\//) { # Assume a URL and curl it $f = shell_quote $f; $result = "${original}curl $f |"; } elsif ($f =~ s/^sh://) { # Execute a command and capture stdout $result = "$original$f |"; } elsif ($f =~ /^s3:/) { # Use s3cmd and cat to stdout $result = "${original}s3cmd get '$f' - |"; } elsif ($f =~ s/^hdfs-ls:(?:\/\/)?//) { # Just list a directory. This is used by hdfs: below to provide # indirection. $result = "${original}$0 n:1 -m '" . q{map "hdfs:" . shell_quote($_), hadoop_ls %[q[} . $f . q{]%]} . "' |"; } elsif ($f =~ s/^hdfs:(?:\/\/)?//) { # Use Hadoop commands to read stuff. The command itself needs to be # deferred because the hdfs path might not exist when the pseudofile is # resolved. Indirected through $0 hdfs-ls:X because some HDFS directories # contain enough files that we'll get arg-list-too-long errors if we invoke # hadoop fs -text directly. This workaround lets us stream the output from # hadoop fs -ls. # # Redirecting errors to /dev/null because hadoop fs -text complains loudly # if you close its output stream, and this is really annoying. $f = shell_quote "hdfs-ls:$f"; $result = "${original}$0 $f | xargs hadoop fs -text 2>/dev/null |"; } elsif ($f =~ s/^hdfsjoin://) { $result = "${original}$0 " . shell_quote(map "hdfs:$_", hadoop_matched_partfiles $f, $ENV{map_input_file}) . " |"; } elsif ($f =~ s/^perl://) { # Evaluate a Perl expression $f =~ s/'/'"'"'/g; $result = "${original}perl -e 'print \$_, \"\\n\" for ($f)' |"; } elsif ($f =~ s/^n://) { $result = "${original}perl -e 'print \$_, \"\\n\" for (1..($f))' |"; } elsif ($f =~ s/^id://) { $result = "${original}echo " . shell_quote($f) . " |"; } elsif ($f =~ s/^sql://) { # Run a postgres or sqlite3 query, exporting results as TSV my ($db, $query) = split /:/, $f, 2; $query = expand_sql_shorthands $query; if ($db =~ s/^P//) { # postgres $db = expand_postgres_db $db; $query = shell_quote "COPY ($query) TO STDOUT WITH NULL AS ''"; $result = "${original}psql -c $query $db |"; } elsif ($db =~ s/^S//) { # sqlite3 $db = shell_quote expand_sqlite_db $db; $result = "${original}echo " . shell_quote(".mode tabs\n$query;") . "| sqlite3 $db |"; } else { die "unknown database prefix " . substr($db, 0, 1) . " for pseudofile $original (valid prefixes are P and S)"; } } elsif ($f =~ /(\w*@?[^:]+):(.*)$/) { # Access file over SSH. We need to make sure nfu is running on the remote # end, since the remote file might be gzipped or some such. Because the # remote machine might not have nfu, this involves us piping ourselves over # to that machine. $result = "${original}cat '$0' | ssh -C '$1' perl - '$2' |"; } else { return undef; } $always_make_a_command && $result !~ /\|/ ? "${original}cat '$result' |" : $result; } sub unexpand_filename_shorthands { my ($f) = @_; $f =~ /^#(.*)/ ? $1 : $f; } my %pseudofile_docs = ( 'file.gz' => 'decompress file with gzip -dc', 'file.bz2' => 'decompress file with bzip2 -dc', 'file.xz' => 'decompress file with xz -dc', 'file.lzo' => 'decompress file with lzop -dc', 'http[s]://url' => 'retrieve url with curl', 'sh:stuff' => 'run sh -c "stuff", take stdout', 's3://url' => 'access S3 using s3cmd', 'hdfs:path' => 'read HDFS file(s) with hadoop fs -text', 'hdfs-ls:path' => 'pseudofiles parsed from hadoop fs -ls', 'hdfsjoin:path' => 'mapside join pseudofile (a subset of hdfs:path)', 'sql:db:query' => 'results of query as TSV', 'perl:expr' => 'perl -e \'print "$_\n" for (expr)\'', 'n:number' => 'numbers from 1 to n, inclusive', 'id:X' => 'verbatim text X', 'user@host:x' => 'remote data access (x can be a pseudofile)', ); # Flags our $is_child = 0; our $verbose = 0; our $n_lines = 0; our $n_bytes = 0; our $start_time = undef; our $verbose_command = ''; our @verbose_args; our $verbose_command_formatted = undef; our $inside_hadoop_job = length $ENV{mapred_job_id}; our $verbose_interval = $inside_hadoop_job ? HADOOP_VERBOSE_INTERVAL : VERBOSE_INTERVAL; our $empirical_verbosity = 0; $verbose ||= $inside_hadoop_job; $verbose ||= length $ENV{NFU_ALWAYS_VERBOSE}; our $last_verbose_report = 0; our $verbose_row = 0; # Call it like this: # while (<>) { # be_verbose_as_appropriate length; # ... # } sub be_verbose_as_appropriate { return if $is_child; return unless $verbose; local $_; my ($record_length) = @_; $n_lines += !!$record_length; $n_bytes += $record_length; my $now = time; return unless $record_length == 0 || ($now - $last_verbose_report) * 1000 > $verbose_interval; $last_verbose_report = $now; $verbose_command_formatted //= join ' ', $verbose_command, @verbose_args; $start_time //= $now; my $runtime = $now - $start_time || 0.001; return if $runtime * 1000 < DELAY_BEFORE_VERBOSE; ++$empirical_verbosity; unless ($inside_hadoop_job) { # Print status updates straight to the terminal printf STDERR "\033[%d;1H\033[K%10dl %8.1fl/s %10dk %8.1fkB/s %s", $verbose_row, $n_lines, $n_lines / $runtime, $n_bytes / 1024, $n_bytes / 1024 / $runtime, substr($verbose_command_formatted, 0, 40); } else { # Use Hadoop-specific syntax to update job counters. Smaller units are # better because Hadoop counters are integers, so they'll suffer from # truncation for fractional quantities. $verbose_command_formatted =~ s/[,\n]/_/g; hc $verbose_command_formatted, @$_ for (['lines', $n_lines], ['runtime ms', $runtime * 1000], ['bytes', $n_bytes]); # Reset variables because Hadoop treats them as incremental $n_lines = 0; $n_bytes = 0; $start_time = $now; } } END { be_verbose_as_appropriate 0; print STDERR "\n" if $empirical_verbosity; } # This variable will keep track of any state accumulated from --use or --run # arguments. This is required for --pmap to work correctly. my @evaled_code; sub shell_quote {join ' ', map /[^-\/\w]/ ? "'" . s/(['\\])/'\\$1'/gr . "'" : length $_ ? $_ : "''", @_} sub quote_self {shell_quote $0, @_} my %explosions = ( a => '--average', A => '--aggregate', b => '--branch', c => '--count', C => '--uncount', D => '--drop', e => '--each', E => '--every', f => '--fields', F => '--fieldsplit', g => '--group', G => '--rgroup', h => '--hadoopc', H => '--hadoop', i => '--index', I => '--indexouter', j => '--join', J => '--joinouter', k => '--keep', K => '--remove', l => '--log', L => '--exp', m => '--map', M => '--pmap', n => '--number', N => '--ntiles', o => '--order', O => '--rorder', p => '--plot', P => '--poll', q => '--quant', Q => '--sql', r => '--read', R => '--buffer', s => '--sum', S => '--delta', T => '--take', # v => '--verbose' # handled during option parsing V => '--variance', w => '--with', z => '--intify', ); my %implosions; $implosions{$explosions{$_}} = $_ for keys %explosions; # Minimum number of required arguments for each function. Numeric arguments are # automatically forwarded, so are always optional. my %arity = ( average => 0, aggregate => 1, branch => 1, count => 0, uncount => 0, delta => 0, drop => 0, each => 1, every => 1, fields => 0, fieldsplit => 1, fold => 1, group => 0, rgroup => 0, hadoop => 3, hadoopc => 4, index => 2, indexouter => 2, join => 2, joinouter => 2, keep => 1, log => 0, exp => 0, map => 1, pmap => 1, number => 0, ntiles => 1, order => 0, rorder => 0, plot => 1, poll => 2, read => 0, buffer => 1, sum => 0, quant => 1, remove => 1, sample => 1, take => 0, variance => 0, with => 1, intify => 0, # Commands with no shorthands append => 1, prepend => 1, tee => 1, duplicate => 2, partition => 2, splot => 1, sd => 0, mplot => 1, preview => 0, pipe => 1, entropy => 0, sql => 3, tcp => 1, http => 1, repeat => 2, octave => 1, numpy => 1, ); my %usages = ( average => 'window size (0 for full average) -- running average', aggregate => 'aggregator fn', branch => 'branch (takes a pattern map)', count => 'counts by first column value; like uniq -c', uncount => 'the opposite of --count; repeats each row N times', delta => 'value -> difference from last value', drop => 'number of records to drop', each => 'template; executes with {} set to each value', every => 'n (returns every nth row)', fields => 'string of digits, each a zero-indexed column selector', fieldsplit => 'regexp to use for splitting', fold => 'function that returns true when line should be folded', group => 'sorts ascending, takes optional column list', rgroup => 'sorts descending, takes optional column list', hadoop => 'hadoop streaming: outpath|.|@, mapper|:, reducer|:|_', hadoopc => 'hadoop streaming: ..., combiner|:|_, reducer|:|_', index => 'field index, unsorted pseudofile to join against', indexouter => 'field index, unsorted pseudofile to join against', join => 'field index, sorted pseudofile to join against', joinouter => 'field index, sorted pseudofile to join against', keep => 'row filter fn', log => 'optional base (default e)', exp => 'optional base (default e)', map => 'row map fn', pmap => 'row map fn (executed multiple times in parallel)', number => 'prepends line number to each line', ntiles => 'takes N, produces ntiles of numbers', order => 'sorts ascending by general numeric value', rorder => 'sorts descending by general numeric value', plot => 'gnuplot arguments', poll => 'interval in seconds, command whose output to collect', sum => 'value -> total += value', quant => 'number to round to', read => 'reads pseudofiles from the data stream', buffer => 'creates a pseudofile from the data stream', remove => 'inverted row filter fn', sample => 'row selection probability in [0, 1]', take => 'n to take first n, +n to take last n', variance => 'running variance', with => 'pseudofile to join column-wise onto input', intify => 'convert column to dense integers (linear space)', append => 'pseudofile; appends its contents to current stream', prepend => 'pseudofile; prepends its contents to current stream', tee => 'shell command; duplicates data to stdin of command', duplicate => 'two shell commands as separate arguments', partition => 'partition id fn, shell command (using {})', splot => 'gnuplot arguments', sd => 'running standard deviation', mplot => 'gnuplot arguments per column, separated by ;', preview => '', pipe => 'shell command to pipe through', entropy => 'running entropy of relative probabilities/frequencies', sql => 'create/query SQL table: db[:[+]table], schema|_, query|_', tcp => 'TCP server (emits fifo filenames)', http => 'HTTP adapter for TCP server output', repeat => 'repeat count, pseudofile to repeat', octave => 'pipe through octave; vector is called xs', numpy => 'pipe through numpy; vector is called xs', ); my %env_docs = ( NFU_SORT_BUFFER => 'default 64M; size of in-memory sort for -g and -o', NFU_SORT_PARALLEL => 'default 4; number of concurrent sorts to run', NFU_SORT_COMPRESS => 'default none; compression program for sort tempfiles', NFU_SORT_OPTIONS => 'override all sort options except column spec', NFU_ALWAYS_VERBOSE => 'if set, nfu will be verbose all the time', NFU_NO_PAGER => 'if set, nfu will not use "less" to preview stdout', NFU_PMAP_PARALLELISM => 'number of subprocesses for -M', NFU_MAX_FILEHANDLES => 'default 64; maximum #subprocesses for --partition', NFU_HADOOP_FILES => 'comma-separated files to include with streaming job', NFU_HADOOP_STREAMING => 'absolute location of hadoop-streaming.jar', NFU_HADOOP_OPTIONS => '-D options for hadoop streaming jobs', NFU_HADOOP_COMMAND => 'hadoop executable; e.g. hadoop jar, hadoop fs -ls', NFU_HADOOP_TMPDIR => 'default /tmp; temp dir for hadoop uploads', ); my %gnuplot_aliases = ( '%l' => ' with lines', '%d' => ' with dots', '%i' => ' with impulses', '%v' => ' with vectors ', '%u' => ' using ', '%t' => ' title ', '%p' => ' lc palette ', ); my %fieldsplit_shorthands = ( S => '\s+', W => '\W+', C => ',', ); sub expand_gnuplot_options { my @transformed_opts; for my $opt (@_) { $opt =~ s/$_/$gnuplot_aliases{$_}/g for keys %gnuplot_aliases; push @transformed_opts, $opt; } @transformed_opts; } my %sql_aliases = ( '%\*' => ' select * from ', '%c' => ' select count(1) from ', '%d' => ' select distinct * from ', '%g' => ' group by ', '%j' => ' inner join ', '%l' => ' outer left join ', '%r' => ' outer right join ', '%w' => ' where ', ); sub expand_sql_shorthands { my ($sql) = @_; $sql =~ s/$_/$sql_aliases{$_}/eg for keys %sql_aliases; $sql; } sub expand_sqlite_db { my $tempdir = tmpnam =~ s/\/[^\/]+$//r; return "$tempdir/nfu-$ENV{USER}-sqlite.db" if $_[0] eq '@'; return $_[0]; } sub expand_postgres_db { # Expands a DB descriptor into a series of properly-shellquoted options to # pass to the 'psql' command. my ($host, $user, $db) = ('localhost', $ENV{USER}, $ENV{USER}); if ($_[0] =~ m#^(?:([^\@/:]+)@)?([^\@/:]+)/([^/]+)$#) { # NB: don't try to change this to use $1, $2, etc as arguments to # shell_quote. Perl references arguments, so this will fail horribly. ($user, $host, $db) = ($1 // $ENV{USER}, $2 // 'localhost', $3); shell_quote '-U', $user, '-h', $host, '-d', $db; } elsif ($_[0] =~ m#^(\w+)@(\w+)$#) { # Simple DB connection; assume localhost ($user, $db) = ($1, $2); shell_quote '-U', $user, '-d', $db; } elsif ($_[0] =~ m#^[^\@:/]+$#) { # Really simple connection shell_quote '-d', $_[0]; } elsif ($_[0] eq '@') { # Really simple connection; use username as DB shell_quote '-d', $ENV{USER}; } else { die "not sure how to parse postgres DB '$_[0]'"; } } sub expand_eval_shorthands { my $code = $_[0]; my @pieces = split /%\[(.*?)%\]/, $code; for my $i (0..$#pieces) { unless ($i & 1) { $pieces[$i] =~ s/%(\d+)/\$_[$1]/g; 1 while $pieces[$i] =~ s/([a-zA-Z0-9_\)\}\]?\$]) \. ([\$_a-zA-Z](?:-[0-9\w?\$]|[0-9_\w?\$])*) /$1\->{'$2'}/x; } } join '', @pieces; } sub parse_join_options { my ($f1, $f2, $file) = @_ == 3 ? @_ : ($_[0], 0, $_[1]); ($f1 + 1, $f2 + 1, $file); } sub compile_eval_into_function { my ($code, $name) = @_; $code = expand_eval_shorthands $code; eval "sub {\n$code\n}" or die "failed to compile $name function: $@\n (code was $code)"; } sub stateless_unary_fn { my ($name, $f) = @_; my $arity = $arity{$name}; ($name, sub { my @columns = split //, (@_ > $arity ? shift : undef) // '0'; while (<>) { be_verbose_as_appropriate length; chomp; my @fs = split /\t/; $fs[$_] = $f->($fs[$_], @_) for @columns; print row(@fs), "\n"; } }); } sub stateful_unary_fn { my ($name, $setup, $f) = @_; my $arity = $arity{$name}; ($name, sub { my @columns = split //, (@_ > $arity ? shift : undef) // '0'; my %states; $states{$_} = $setup->(@_) for @columns; while (<>) { be_verbose_as_appropriate length; chomp; my @fs = split /\t/; $fs[$_] = $f->($fs[$_], $states{$_}, @_) for @columns; print row(@fs), "\n"; } }); } sub exec_with_stdin { open my $fh, '|' . shell_quote @_ or die "failed to exec @_"; be_verbose_as_appropriate(length), print $fh $_ while <>; close $fh; } sub exec_with_diamond { if ($verbose || grep /\|/, @ARGV) { # Arguments are specified in filenames and involve processes, so use perl # to forward data. exec_with_stdin @_; } else { # Faster option: just exec the program in-place. This avoids a layer of # interprocess piping. Assume filenames follow arguments. exec @_, @ARGV or die "failed to exec @_ @ARGV"; } } sub sort_options { my ($column_spec) = @_; my @columns = split //, $column_spec // ''; my @options = exists $ENV{NFU_SORT_OPTIONS} ? split /\s+/, $ENV{NFU_SORT_OPTIONS} : ('-S', $ENV{NFU_SORT_BUFFER} || '64M', '--parallel=' . ($ENV{NFU_SORT_PARALLEL} || 4), $ENV{NFU_SORT_COMPRESS} ? ("--compress-program=$ENV{NFU_SORT_COMPRESS}") : ()); return @options, (@columns ? ('-t', "\t", map {('-k', sprintf "%d,%d", $_ + 1, $_ + 1)} @columns) : ()); } sub sort_cmd {join ' ', 'sort', sort_options, @_} sub fifo_for { my ($file, @transforms) = @_; my $fifo_name = tmpnam; mkfifo $fifo_name, 0700 or die "failed to create fifo: $!"; return $fifo_name if fork; my $command = expand_filename_shorthands($file, 1) . join '', map {"$_ |"} @transforms; open my $into_fifo, '>', $fifo_name or die "failed to open fifo $fifo_name for writing: $!"; open my $from_file, $command or die "failed to open file/command $command for reading: $!"; be_verbose_as_appropriate(length), $into_fifo->print($_) while <$from_file>; close $into_fifo; close $from_file; unlink $fifo_name or warn "failed to unlink temporary fifo $fifo_name: $!"; exit 0; } sub hadoop { # Generates a hadoop command string, quoting all args as necessary join ' ', $ENV{NFU_HADOOP_COMMAND} // "hadoop", @_[0, 1], shell_quote(@_ > 2 ? @_[2..$#_] : ()); } sub hadoop_ls { # Now get the output file listing. This is a huge mess because Hadoop is a # huge mess. my $ls_command = hadoop('fs', '-ls', @_); grep /\/[^_][^\/]*$/, map +(split " ", $_, 8)[7], grep !/^Found/, split /\n/, ''.qx/$ls_command/; } sub gcd { my ($a, $b) = @_; while ($b) { if ($b > $a) { ($a, $b) = ($b, $a) } else { $a %= $b } } $a; } sub hadoop_partfile_n { $_[0] =~ /[^0-9]([0-9]+)(?:\.[^\/]+)?$/ ? $1 : 0 } sub hadoop_partsort { sort {hadoop_partfile_n($a) <=> hadoop_partfile_n($b)} @_ } sub hadoop_matched_partfiles { my ($path, $partfile) = @_; my $partfile_dirname = $partfile =~ s/\/[^\/]+$//r; my @left_files; my @possibilities; # We won't be able to do anything until both sides of the join exist. until (@left_files = hadoop_partsort hadoop_ls $partfile_dirname) { print STDERR "hadoop_matched_partfiles: waiting for input...\n"; hc qw/hadoop_matched_partfiles left_wait/; sleep 1; } until (@possibilities = hadoop_partsort hadoop_ls $path) { print STDERR "hadoop_matched_partfiles: waiting for join data...\n"; hc qw/hadoop_matched_partfiles right wait/; sleep 1; } my $n = hadoop_partfile_n $partfile; unless ($n < @left_files && $left_files[$n] eq $partfile) { my $partfile_n = $n; $n = 0; ++$n until $n >= @left_files || $partfile_n == hadoop_partfile_n $partfile; } die "hadoop_matched_partfiles couldn't find the index of the specified " . "partfile ($partfile) within the list of map inputs (@left_files)" if $n >= @left_files; # Assume Hadoop's default partitioning strategy using hashcode modulus. This # means that the Kth of N files contains all keys for which H(key) % N == K. my $reduction_factor = gcd scalar(@left_files), scalar(@possibilities); my $left_redundancy = @possibilities / $reduction_factor; my @files_to_read = @possibilities[map $_ * $reduction_factor + $n % $reduction_factor, 0 .. $left_redundancy - 1]; # Log some stuff to job counters so any problems are more evident. if ($verbose) { printf STDERR "hdfsjoin:$path [$partfile] (inferred partfile $n): %s\n", join(' ', @files_to_read); hc "hdfsjoin $path", "joins attempted", 1; hc "hdfsjoin $path", "left/right reads", $left_redundancy; hc "hdfsjoin $path", "overreads", $left_redundancy - 1; } @files_to_read; } sub hadoop_tempfile { my $randomness = random128; my $dir = $ENV{NFU_HADOOP_TMPDIR} // '/tmp'; "$dir/nfu-hadoop-$ENV{USER}-$randomness"; } sub find_hadoop_streaming { my @files = split /\n/, ''.qx|locate "*hadoop-streaming.jar*"|; die "failed to locate hadoop streaming jar automatically; " . "you should set NFU_HADOOP_STREAMING" unless @files; print STDERR "nfu found hadoop streaming jar: $files[0]\n"; $files[0]; } sub hadoop_into { my ($outfile, $mapper, $combiner, $reducer) = @_; my $streaming_jar = $ENV{NFU_HADOOP_STREAMING} // find_hadoop_streaming; # Various shorthands for common cases. Both mapper and reducer being the # identity function is used to repartition stuff, so we want this to be easy # to type and recognize. $mapper = $0 if $mapper eq ':'; $reducer = $0 if $reducer eq ':'; $reducer = 'NONE' if $reducer =~ /^[-_]$/; $combiner = $0 if $combiner eq ':'; $combiner = 'NONE' if $combiner =~ /^[-_]$/; $combiner = 'NONE' if $reducer eq 'NONE'; my @input_files; my @delete_afterwards; # Figure out where our input seems to be coming from. This is a little hacky, # but I think it's worthwhile for the flexibility we get. # # Hadoop jobs output their list of output partfile names because the data is # often too large. It's possible we'll get one of these as stdin, so each # line will look like "hdfs:/...". In that case, we want to use each as an # input file. Otherwise we'll want to upload stdin and all non-HDFS files to # HDFS and use those. # # It's actually a bit tricky to figure out which files started out as hdfs: # locations because by now they've all been filename alias-expanded. We need # to reverse-engineer the alias if we want to reuse stuff already on HDFS. my @other_argv; while (@ARGV) { local $_ = unexpand_filename_shorthands shift @ARGV; if (s/^hdfs:(?:\/\/)?//) { push @input_files, '-input', $_; } else { push @other_argv, expand_filename_shorthands $_; } } @ARGV = @other_argv; my $line = undef; unless (-t STDIN) { chomp($line), push @input_files, '-input', $line while defined($line = ) && $line =~ s/^hdfs://; } # At this point $line is either undefined or contains a line we shouldn't # have read from stdin (i.e. it's data). If the latter, prepend it to the # upload to HDFS. if (defined $line) { my $tempfile = hadoop_tempfile; open my $fh, "| " . hadoop('fs', '-put', '-', $tempfile) . ' 1>&2' or die "failed to open hadoop fs -put process for uploading: $!"; print $fh $line; be_verbose_as_appropriate(length), print $fh $_ while ; close $fh; push @input_files, '-input', $tempfile; push @delete_afterwards, $tempfile; } if (@other_argv) { my $tempfile = hadoop_tempfile; open my $fh, "| " . hadoop('fs', '-put', '-', $tempfile) . ' 1>&2' or die "failed to open hadoop fs -put process for uploading: $!"; be_verbose_as_appropriate(length), print $fh $_ while <>; close $fh; push @input_files, '-input', $tempfile; push @delete_afterwards, $tempfile; } # Now all the input files are in place, so we can kick off the job. my $extra_args = $ENV{NFU_HADOOP_OPTIONS} // ''; my @file_deps = map {('-file', $_)} ($0, split /,/, $ENV{NFU_HADOOP_FILES} // ''); my $dirname = $0 =~ s/\/nfu$/\//r; # We're uploading ourselves, so we'll need a current-directory reference to # nfu. When nfu quotes itself, it produces an absolute path instead; this # code rewrites those into ./nfu. my $transformed_mapper = $mapper =~ s|\Q$dirname\E|./|gr; my $transformed_combiner = $combiner =~ s|\Q$dirname\E|./|gr; my $transformed_reducer = $reducer =~ s|\Q$dirname\E|./|gr; # Write the mapper and reducer commands into files rather than passing them # straight to hadoop streaming. This bypasses two problems: # # 1. Hadoop streaming might chop long arguments sooner than bash. # 2. It word-splits differently from bash, which breaks shell_quote. my $mapper_file = tmpnam; my $combiner_file = tmpnam; my $reducer_file = tmpnam; open my $mapper_fh, '>', $mapper_file or die "failed to create tempfile $mapper_file for map job: $!"; open my $combiner_fh, '>', $combiner_file or die "failed to create tempfile $combiner_file for combine job: $!"; open my $reducer_fh, '>', $reducer_file or die "failed to create tempfile $reducer_file for reduce job: $!"; print $mapper_fh "#!/bin/bash\n$transformed_mapper\n"; print $combiner_fh "#!/bin/bash\n$transformed_combiner\n"; print $reducer_fh "#!/bin/bash\n$transformed_reducer\n"; close $mapper_fh; close $combiner_fh; close $reducer_fh; chmod 0755, $mapper_file; chmod 0755, $combiner_file; chmod 0755, $reducer_file; my $jobname = "nfu streaming [" . join(' ', grep $_ ne '-input', @input_files) . "]: map($transformed_mapper), " . "combine($transformed_combiner), " . "reduce($transformed_reducer), " . "options($extra_args)" . " > $outfile"; my $hadoop_command = hadoop('jar', shell_quote($streaming_jar) . " -D mapred.job.name=" . shell_quote($jobname) . " $extra_args", @file_deps, @input_files, '-file', $mapper_file, '-file', $combiner_file, '-file', $reducer_file, '-output', $outfile, '-mapper', "./" . ($mapper_file =~ s/^.*\///r), $combiner ne 'NONE' ? ('-combiner', "./" . ($combiner_file =~ s/^.*\///r)) : (), '-reducer', $reducer eq 'NONE' ? $reducer : "./" . ($reducer_file =~ s/^.*\///r)); system $hadoop_command . ' 1>&2' and die "failed to execute hadoop command $hadoop_command: $!"; unlink $mapper_file; unlink $combiner_file; unlink $reducer_file; system hadoop('fs', '-rm', '-r', @delete_afterwards) . ' 1>&2' if @delete_afterwards; "hdfs:$outfile"; } sub sql_infer_column_type { # Try to figure out the right type for a column based on some values for it. # The possibilities are 'text', 'integer', or 'real'; this is roughly the set # of stuff supported by both postgres and sqlite. return 'integer' unless grep length && !/^-?[0-9]+$/, @_; return 'real' unless grep length && !/^-?[0-9]+$ | ^-?[0-9]+(?:\.[0-9]+)?(?:[eE][-+]?[0-9]+)?$ | ^-?[0-9]* \.[0-9]+ (?:[eE][-+]?[0-9]+)?$/x, @_; return 'text'; } sub sql_infer_schema { # Takes a list of TSV lines and generates a table schema that will store # them. my $n = max map scalar(split /\t/), @_; my @columns = map [], 1 .. $n; for (@_) { my @vs = split /\t/; push @{$columns[$_]}, $vs[$_] for 0 .. $#vs; } my @types = map sql_infer_column_type(@$_), @columns; join ', ', map sprintf("f%d %s", $_, $types[$_]), 0 .. $#columns; } sub sql_schema_and_buffer { # WARNING: this function modifies its argument my ($schema) = @_; my @read; if ($schema eq '_') { # Infer the schema, which involves reading some data up front. push @read, $_ while @read < SQL_INFER_PEEK_LINES and $diamond_has_data &&= defined($_ = <>); $schema = sql_infer_schema @read; } $_[0] = $schema; @read; } sub sql_parse_args { my ($dbt, $schema, $query) = @_; my @pieces = split /:/, $dbt; if (@pieces > 1) { my $table = pop @pieces; my $db = join ':', @pieces; ($db, $table, $schema, $query); } else { # Use a default table called 't' with an automatic index ($pieces[0], '+t', $schema, $query); } } sub write_buffer_and_stdin { my ($fh, @buffer) = @_; be_verbose_as_appropriate(length), $fh->print($_) for @buffer; be_verbose_as_appropriate(length), $fh->print($_) while $diamond_has_data &&= defined($_ = <>); } sub sql_first_column_index { # WARNING: this function modifies its first argument my ($table, $schema) = @_; my $column = $schema =~ s/\s.*$//r; my $index = $table =~ s/^\+// ? "CREATE INDEX $table$column ON $table($column);\n" : ''; $_[0] = $table; $index; } my %functions = ( read => sub { while (<>) { chomp; my $f = expand_filename_shorthands $_, 1; open my $fh, $f or die "failed to open pseudofile $_ ($f): $!"; be_verbose_as_appropriate(length), print while <$fh>; close $fh; } }, buffer => sub { my $f = $_[0] =~ /^[-_:]$/ ? tmpnam : $_[0]; open my $fh, '>', $f or die "failed to open buffer file $f: $!"; be_verbose_as_appropriate(length), $fh->print($_) while <>; close $fh; print $f, "\n"; }, branch => sub { my (@cases) = split /\n/, $_[0]; my %branches; my %branch_matchers; my @order; for (@cases) { my ($k, $v) = map unpack('u', $_), split /\t/; $v //= ''; open my $fh, "| $0 $v" or die "failed to open branch subprocess '$0 $v': $!"; push @order, $k; $branches{$k} = $fh; $branch_matchers{$k} = $k =~ s/^:// ? compile_eval_into_function $k, 'branch matcher function' : $k eq '_' ? sub { 1 } : sub { $_[0] eq $k }; } while (<>) { be_verbose_as_appropriate length; chomp; my @xs = split /\t/; my @matching = grep $branch_matchers{$_}->(@xs), @order; die "branch failed to match '$xs[0]' against any of ( @order )" unless @matching; $branches{$matching[0]}->print(row(@xs), "\n"); } close for values %branches; }, tcp => sub { my ($hostport) = @_; my ($host, $port) = $hostport =~ /:/ ? split /:/, $hostport : ('0.0.0.0', $hostport); socket my($serversock), PF_INET, SOCK_STREAM, getprotobyname 'tcp' or die "socket failed: $!"; setsockopt $serversock, SOL_SOCKET, SO_REUSEADDR, pack 'l', 1 or die "setsockopt failed: $!"; bind $serversock, sockaddr_in $port, INADDR_ANY or die "bind failed: $!"; listen $serversock, SOMAXCONN or die "listen failed: $!"; while (1) { # Fork twice per connection. This is egregious but necessary because the # open() call blocks on a FIFO until the other end is connected, and we # want the FIFO consumer to be at liberty to open them in either order # without creating a deadlock. my ($paddr, $client); unless ($paddr = accept $client, $serversock) { next if $!{EINTR}; die "accept failed: $!"; } my ($port, $iaddr) = sockaddr_in $paddr; my $iname = gethostbyaddr $iaddr, AF_INET; my $r = tmpnam; mkfifo $r, 0700 or die "failed to create FIFO $r: $!"; my $w = tmpnam; mkfifo $w, 0700 or die "failed to create FIFO $w: $!"; print join("\t", $r, $w, $iname, $port), "\n"; # Socket reads: write to the "reader" fifo unless (fork) { my $buf; sysopen my $fh, $r, O_WRONLY or die "failed to open $r for writing: $!"; syswrite $fh, $buf while sysread $client, $buf, 8192; close $fh; close $client; unlink $r; exit; } # Socket writes: read from the "writer" fifo unless (fork) { my $buf; sysopen my $fh, $w, O_RDONLY or die "failed to open $w for writing: $!"; syswrite $client, $buf while sysread $fh, $buf, 8192; close $fh; close $client; unlink $w; exit; } close $client; } }, http => sub { # Connect this to a --tcp thing to do HTTP stuff. In practice this just # means parsing out the headers and handing off parsed data to the command. my ($command) = @_; open my $fh, "| $command" or die "http failed to launch $command: $!"; select((select($fh), $| = 1)[0]); while (<>) { chomp; my ($in, $out, $port, $iaddr) = split /\t/; open my $indata, '<', $in or die "http failed to read socket $in: $!"; my $url; my %headers; while (<$indata>) { chomp; last if length($_) <= 1; $headers{$1} = $2 if defined $url && /^([^:]+):\s*(.*)$/; $url = $1 if !defined $url && /^[A-Z]+\s+(\S+)/; } close $indata; my $http_data = row($out, $url, je({%headers}), $port, $iaddr) . "\n"; $fh->print($http_data); } }, group => sub {exec_with_diamond 'sort', sort_options @_}, rgroup => sub {exec_with_diamond 'sort', '-r', sort_options @_}, order => sub {exec_with_diamond 'sort', '-g', sort_options @_}, rorder => sub {exec_with_diamond 'sort', '-rg', sort_options @_}, count => sub { # Same behavior as uniq -c, but delimits counts with \t; also takes an # optional series of columns to uniq by, rather than using the whole row. my @columns = split //, shift // ''; my $last; my @last; my $count = -1; while (<>) { be_verbose_as_appropriate length; chomp; my @xs = split /\t/; @xs = @xs[@columns] if @columns; $last = $_, @last = @xs unless ++$count; for (my $i = 0; $i < max scalar(@xs), scalar(@last); ++$i) { if (!defined $xs[$i] || !defined $last[$i] || $xs[$i] ne $last[$i]) { print "$count\t$last\n"; $count = 0; @last = @xs; $last = $_; last; } } } ++$count; print "$count\t$last\n" if defined $last; }, uncount => sub { while (<>) { be_verbose_as_appropriate length; my ($n, $line) = split /\t/, $_, 2; $line //= "\n"; print $line for 1..$n; } }, index => sub { # Inner join by appending joined fields to the end. my ($f1, $f2, $join_file) = parse_join_options @_; my $sorted_index = fifo_for $join_file, sort_cmd "-t '\t' -k${f2}b,$f2"; my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" . "| join -t '\t' -1 $f1 -2 $f2 - '$sorted_index'"; open my $to_join, "| $command" or die "failed to exec $command: $!"; be_verbose_as_appropriate(length), print $to_join $_ while <>; close $to_join; }, indexouter => sub { # Outer left join by appending joined fields to the end. my ($f1, $f2, $join_file) = parse_join_options @_; my $sorted_index = fifo_for $join_file, sort_cmd "-t '\t' -k ${f2}b,$f2"; my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" . "| join -a 1 -t '\t' -1 $f1 -2 $f2 - '$sorted_index'"; open my $to_join, "| $command" or die "failed to exec $command: $!"; be_verbose_as_appropriate(length), print $to_join $_ while <>; close $to_join; }, join => sub { # Inner join against sorted data by appending joined fields to the end. my ($f1, $f2, $join_file) = parse_join_options @_; my $sorted_index = fifo_for $join_file; my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" . "| join -t '\t' -1 $f1 -2 $f2 - '$sorted_index'"; open my $to_join, "| $command" or die "failed to exec $command: $!"; be_verbose_as_appropriate(length), print $to_join $_ while <>; close $to_join; }, joinouter => sub { # Outer left join against sorted data by appending joined fields to the # end. my ($f1, $f2, $join_file) = parse_join_options @_; my $sorted_index = fifo_for $join_file; my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" . "| join -a 1 -t '\t' -1 $f1 -2 $f2 - '$sorted_index'"; open my $to_join, "| $command" or die "failed to exec $command: $!"; be_verbose_as_appropriate(length), print $to_join $_ while <>; close $to_join; }, with => sub { # Like 'paste'. Joins lines with \t. my ($f) = @_; open my $fh, expand_filename_shorthands $f, 1 or die "failed to open --with pseudofile $f: $!"; my ($part1, $part2); while (defined($part1 = <>) and defined($part2 = <$fh>)) { be_verbose_as_appropriate length($part1) + length($part2); chomp $part1; chomp $part2; print $part1, "\t", $part2, "\n"; } close $fh; }, repeat => sub { my ($n, $f) = @_; my $count = 0; while (!$n || $count++ < $n) { open my $fh, expand_filename_shorthands $f, 1 or die "failed to open --repeat pseudofile $f: $!"; be_verbose_as_appropriate(length), print while <$fh>; close $fh; } }, octave => sub { my ($commands) = @_; my $temp = tmpnam; open my $fh, '>', $temp or die $!; be_verbose_as_appropriate(length), print $fh $_ while <>; close $fh; system 'octave', '-q', '--eval', "xs = load(\"$temp\");" . "unlink(\"$temp\");" . "save_precision(48);" . "$commands;" . "save -text $temp xs" and die "octave command failed"; open $fh, '<', $temp; /^\s*#/ or /^\s*$/ or print join("\t", map $_ eq "NA" ? $_ : 0 + $_, grep length, split /\s+/), "\n" while <$fh>; close $fh; unlink $temp; }, numpy => sub { my ($commands) = @_; my $temp = tmpnam; open my $fh, '>', $temp or die $!; be_verbose_as_appropriate(length), print $fh $_ while <>; close $fh; # Fix up the indentation for cases like this: # $ nfu ... --numpy 'xs += 1 # xs *= 4' # no real indent here # # $ nfu ... --numpy 'if something: # xs += 1' # minor indent here (assume 2) my @lines = split /\n/, $commands; my @indents = map length(s/\S.*$//r), @lines; my $indent = @lines > 1 ? $indents[1] - $indents[0] : 0; # If we're expecting an indentation of some amount after the first line, we # need to be careful: we don't know how much the user decided to indent the # block, and if we get it wrong then Python will complain at the next # outdent. (If there's no outdent, then we can use anything.) $indent = min $indent - 1, @indents[2..$#indents] if $lines[0] =~ /:\s*(#.*)?$/ && @lines > 2; my $spaces = ' ' x $indent; $lines[$_] =~ s/^$spaces// for 1..$#lines; $commands = join "\n", @lines; system 'python', '-c', " import numpy as np xs = np.loadtxt(\"$temp\") $commands np.savetxt(\"$temp\", xs, delimiter=\"\\t\")" and die "numpy command failed"; open $fh, '<', $temp; /^\s*#/ or /^\s*$/ or print join("\t", map $_ eq "NA" ? $_ : 0 + $_, grep length, split /\s+/), "\n" while <$fh>; close $fh; unlink $temp; }, stateful_unary_fn('average', sub {my ($size, $n, $total) = ($_[0] // 0, 0, 0); [$size, $n, $total, []]}, sub { my ($x, $state) = @_; my ($size, $n, $total, $window) = @$state; $total += $x; ++$n; my $v = $total / ($n > $size && $size ? $size : $n); $total -= shift @$window if $size and push(@$window, $x) >= $size; $$state[1] = $n; $$state[2] = $total; $v; }), stateful_unary_fn('intify', sub {[{}, 0]}, sub { my ($x, $state) = @_; $state->[0]->{$x} //= $state->[1]++; }), aggregate => sub { my $f = compile_eval_into_function $_[0], 'aggregate function'; my @columns; while (my $line = <>) { be_verbose_as_appropriate length $line; chomp $line; my @fields = split /\t/, $line; # Two cases here. If the new record is compatible with the most recent # existing one, or there aren't any existing ones, then group it and # don't call the aggregator yet. # # If we see a change, then call the aggregator and empty out the group. # # Note that the aggregator function is called on columns, not rows. my $n = @columns && @{$columns[0]}; if (!$n or $fields[0] eq ${$columns[0]}[0]) { $columns[$_][$n] = $fields[$_] for 0 .. $#fields; } else { $_ = ${$columns[0]}[0]; print $_, "\n" for $f->(@columns); @columns = (); $columns[$_][0] = $fields[$_] for 0 .. $#fields; } } if (@columns) { $_ = ${$columns[0]}[0]; print $_, "\n" for $f->(@columns); } }, fold => sub { my $f = compile_eval_into_function $_[0], 'fold function'; my @saved; while (<>) { be_verbose_as_appropriate length; chomp; my $line = $_; if ($f->(split /\t/)) { push @saved, $line; } else { print row(@saved), "\n" if @saved; @saved = ($line); } } print row(@saved), "\n" if @saved; }, stateless_unary_fn('log', sub { my ($x, $base) = @_; my $log = log $x; $log /= log $base if defined $base; $log; }), stateless_unary_fn('exp', sub { my ($x, $base) = @_; defined $base ? $base ** $x : exp $x; }), stateless_unary_fn('quant', sub { my ($x, $quantum) = @_; round_to $x, $quantum; }), # Note: this needs to be stdin; otherwise "nfu -p %l filename" will fail # (since exec_with_diamond trieds to pass filename straight into gnuplot). plot => sub { exec_with_stdin 'gnuplot', '-e', 'plot "-" ' . join(' ', expand_gnuplot_options @_), '-persist'; }, splot => sub { exec_with_stdin 'gnuplot', '-e', 'splot "-" ' . join(' ', expand_gnuplot_options @_), '-persist'; }, mplot => sub { my @gnuplot_options = split /;/, join ' ', expand_gnuplot_options @_; my $fname = tmpnam; my $cols = 0; open my $fh, '>', $fname or die "failed to open tempfile for mplot: $!"; while (<>) { be_verbose_as_appropriate length; $cols = max $cols, 1 + scalar(my @xs = /\t/g); print $fh $_; } close $fh; # If we're requesting only one plot, assume the intent is to replicate # those settings across every observed column. my $plot_command = 'plot ' . join ',', @gnuplot_options > 1 ? map("\"$fname\" $_", @gnuplot_options) : map("\"$fname\" using $_ $gnuplot_options[0]", 1..$cols); system 'gnuplot', '-e', $plot_command, '-persist'; # HACK: the problem is that gnuplot internally forks a subprocess for the # plot window, which we won't be able to see from here (that I know of). If # we delete the file before that subprocess exits, then any zoom operations # will cause gnuplot to abruptly exit. # # I'm sure there's a better way to solve this, but for now this should do # the job for now. unless (fork) { setsid; close STDIN; close STDOUT; unless (fork) { sleep 3600; unlink $fname or die "failed to unlink $fname: $!"; } } }, poll => sub { my ($sleep, $command) = @_; die "usage: --poll sleep-amount 'command ...'" unless defined $sleep and defined $command; system($command), sleep $sleep while 1; }, stateful_unary_fn('delta', sub {[0]}, sub {my ($x, $state) = @_; my $v = $x - $$state[0]; $$state[0] = $x; $v}), stateful_unary_fn('sum', sub {[0]}, sub {my ($x, $state) = @_; $$state[0] += $x}), stateful_unary_fn('variance', sub {[0, 0, 0]}, sub {my ($x, $state) = @_; $$state[0] += $x; $$state[1] += $x * $x; $$state[2]++; my ($sx, $sx2, $count) = @$state; ($sx2 - ($sx * $sx / $count)) / ($count - 1 || 1)}), stateful_unary_fn('sd', sub {[0, 0, 0]}, sub {my ($x, $state) = @_; $$state[0] += $x; $$state[1] += $x * $x; $$state[2]++; my ($sx, $sx2, $count) = @$state; sqrt(($sx2 - ($sx * $sx / $count)) / ($count - 1 || 1))}), stateful_unary_fn('entropy', # state contains [$total, $entropy_so_far] and uses the following # associative combiner (where F(X) = frequency of X, unscaled probability): # # let t = F(A) + F(B) # H(A + B) = F(A)/t * (-log(F(A)/t) + H(A)) # + F(B)/t * (-log(F(B)/t) + H(B)) sub {[0, 0]}, sub {my ($x, $state) = @_; my ($f0, $h0) = @$state; my $f = $$state[0] += $x; my $p = $x / $f; my $p0 = $f0 / $f; $$state[1] = $p0 * (($p0 > 0 ? -log($p0) / LOG_2 : 0) + $h0) + $p * ($p > 0 ? -log($p) / LOG_2 : 0)}), take => sub { if ($_[0] =~ s/^\+//) { # Take last n, so we need a line queue my @q; my $i = 0; be_verbose_as_appropriate(length), $q[$i++ % $_[0]] = $_ while <>; print for @q[$i % $_[0] .. $#q]; print for @q[0 .. $i % $_[0] - 1]; } else { my $n = $_[0] // 1; while (<>) { be_verbose_as_appropriate length; last if --$n < 0; print; } } }, sample => sub { while (<>) { be_verbose_as_appropriate length; print if rand() < $_[0]; } }, drop => sub { my $n = $_[0] // 1; if ($n) { while (<>) { be_verbose_as_appropriate length; last if --$n <= 0; } } be_verbose_as_appropriate(length), print while <>; }, map => sub { my $f = compile_eval_into_function $_[0], 'map function'; while (<>) { be_verbose_as_appropriate length; chomp; print "$_\n" for $f->(split /\t/); } }, pmap => sub { my @fhs; my $wbits = ''; my $wout = ''; my $i = 0; for (1 .. $ENV{NFU_PMAP_PARALLELISM} // 16) { my $mapper = quote_self '--child', @evaled_code, '--map', $_[0]; open my $fh, "| $mapper" or die "failed to open child process $mapper: $!"; vec($wbits, fileno($fh), 1) = 1; push @fhs, $fh; } while (<>) { be_verbose_as_appropriate length; select undef, $wout = $wbits, undef, undef; ++$i until vec($wout, fileno $fhs[$i % @fhs], 1); syswrite $fhs[$i++ % @fhs], $_; } close for @fhs; }, keep => sub { my $f = $_[0] =~ /^\d+$/ ? eval "sub {" . join("&&", map "\$_[$_]", split //, $_[0]) . "}" : compile_eval_into_function $_[0], 'keep function'; while (<>) { my $line = $_; be_verbose_as_appropriate length; chomp; my @xs = split /\t/; print $line if $f->(@xs); } }, remove => sub { my $f = $_[0] =~ /^\d+$/ ? eval "sub {" . join("&&", map "\$_[$_]", split //, $_[0]) . "}" : compile_eval_into_function $_[0], 'remove function'; while (<>) { my $line = $_; be_verbose_as_appropriate length; chomp; my @xs = split /\t/; print $line unless $f->(@xs); } }, each => sub { my ($template) = @_; while (<>) { be_verbose_as_appropriate length; chomp; my $c = $template =~ s/\{\}/$_/gr; system $c and die "each: failed to run $c: $!"; } }, every => sub { my ($n) = @_; my $i = 0; while (<>) { be_verbose_as_appropriate length; print unless $i++ % $n; } }, fields => sub { my ($fields) = @_; my $everything = $fields =~ s/\.$//; my @fs = split //, $fields; $everything &&= 1 + max @fs; while (<>) { be_verbose_as_appropriate length; chomp; my @xs = split /\t/; my @ys = @xs[@fs]; push @ys, @xs[$everything .. $#xs] if $everything; print join("\t", map $_ // '', @ys), "\n"; } }, fieldsplit => sub { my $pattern = $implosions{$_[0]} // $_[0]; $pattern = $fieldsplit_shorthands{$pattern} // $pattern; my $delim = qr/$pattern/; while (<>) { be_verbose_as_appropriate length; chomp; print join("\t", split /$delim/), "\n"; } }, number => sub { my $n = 0; while (<>) { be_verbose_as_appropriate length; chomp; print row(++$n, $_), "\n"; } }, ntiles => sub { my ($n) = @_; my $line_count = 0; my $fifo = tmpnam; mkfifo $fifo, 0700 or die "failed to create fifo: $!"; open my $sorted, sort_cmd('-g', $fifo) . " |" or die "failed to create sort process: $!"; open my $fifo_fh, '>', $fifo or die "failed to write to fifo: $!"; # Push data into the sort process, keeping track of the number of lines. # We'll use this count later to use constant rather than linear space. while (<>) { ++$line_count; be_verbose_as_appropriate(length); $fifo_fh->print($_); } close $fifo_fh; unlink $fifo; # Ok, now grab the data for each of the N sampling points. Here's what # we're doing: # # 0 1 2 <- things we grab for 2-tiles # 1 2 3 4 5 6 7 8 9 a b c d <- data points # # 0 1 2 <- things we grab for 2-tiles # 1 2 3 4 5 6 7 8 9 a b c <- data points # # 0 1 2 3 4 <- quartiles # 1 2 3 4 5 6 7 <- data points # # To make this work, we just keep track of the previous data point as we're # reading. # Normal case my $i = 0; my $previous = undef; my $max_line = $line_count - 1; while (<$sorted>) { chomp; $previous = $_ unless defined $previous; my $break = int($i * $n / $max_line) / $n * $max_line; if ($i >= $break && $i - $break < 1) { # Take this row, performing a weighted average with the previous one: # # break # x V y # | | # ----|------------ # $w 1-$w # # We want x*(1-$w) + y*$w. my $w = $break - int $break || 1; my $v = $previous * (1 - $w) + $_ * $w; print $v, "\n"; } ++$i; $previous = $_; } close $sorted; }, prepend => sub { open my $fh, expand_filename_shorthands $_[0], 1 or die "failed to open --prepend pseudofile $_[0]: $!"; be_verbose_as_appropriate(length), print while <$fh>; close $fh; print while <>; }, append => sub { open my $fh, expand_filename_shorthands $_[0], 1 or die "failed to open --append pseudofile $_[0]: $!"; print while <>; be_verbose_as_appropriate(length), print while <$fh>; close $fh; }, pipe => sub { open my $fh, "| $_[0]" or die "failed to launch $_[0]: $!"; be_verbose_as_appropriate(length), print $fh $_ while <>; close $fh; }, tee => sub { open my $fh, "| $_[0]" or die "failed to launch $_[0]: $!"; $SIG{PIPE} = 'IGNORE'; while (<>) { be_verbose_as_appropriate length; $fh->print($_); print; } close $fh; }, duplicate => sub { open my $fh1, "| $_[0]" or die "failed to launch $_[0]: $!"; open my $fh2, "| $_[1]" or die "failed to launch $_[1]: $!"; # Important: keep going even if a subprocess rejects data. Otherwise things # like "nfu --duplicate ^T1 ^T+1" will produce truncated output. $SIG{PIPE} = 'IGNORE'; while (<>) { be_verbose_as_appropriate length; $fh1->print($_); $fh2->print($_); } close $fh1; close $fh2; }, partition => sub { my ($splitter, $cmd) = @_; my %fhs; my $f = compile_eval_into_function $splitter, 'partition function'; # Important: keep going even if a subprocess rejects data. Otherwise things # like "nfu --partition ... ^T10" will produce truncated output. $SIG{PIPE} = 'IGNORE'; my @open_partitions; while (<>) { be_verbose_as_appropriate length; my $line = $_; chomp(my $cline = $line); my $p = $f->(split /\t/, $cline); unless (exists $fhs{$p}) { my $cmdsub = $cmd =~ s/\{\}/$p/gr =~ s/\{\.(\.*)\}/\{$1\}/gr; open $fhs{$p}, "| $cmdsub" or die "failed to launch $cmdsub: $!"; push @open_partitions, $p; } $fhs{$p}->print($line); close($fhs{$p = shift @open_partitions}), delete $fhs{$p} while @open_partitions > ($ENV{NFU_MAX_FILEHANDLES} // 64); } close for values %fhs; }, hadoopc => sub { my ($outfile, $mapper, $combiner, $reducer) = @_; if ($outfile eq '.') { # Print output data to stdout, then delete outfile my $filename = hadoop_tempfile; my @partfiles = map s/^hdfs:(?:\/\/)?//r, hadoop_ls hadoop_into $filename, $mapper, $combiner, $reducer; open my $fh, "| xargs hadoop fs -text" or die "failed to execute xargs: $!"; $fh->print("$_\n") for @partfiles; close $fh; system hadoop('fs', '-rm', '-r', $filename) . ' 1>&2'; } else { $outfile = hadoop_tempfile if $outfile eq '@'; print hadoop_into($outfile, $mapper, $combiner, $reducer), "\n"; } }, hadoop => sub { my ($outfile, $mapper, $reducer) = @_; if ($outfile eq '.') { # Print output data to stdout, then delete outfile my $filename = hadoop_tempfile; my @partfiles = map s/^hdfs:(?:\/\/)?//r, hadoop_ls hadoop_into $filename, $mapper, 'NONE', $reducer; open my $fh, "| xargs hadoop fs -text" or die "failed to execute xargs: $!"; $fh->print("$_\n") for @partfiles; close $fh; system hadoop('fs', '-rm', '-r', $filename) . ' 1>&2'; } else { $outfile = hadoop_tempfile if $outfile eq '@'; print hadoop_into($outfile, $mapper, 'NONE', $reducer), "\n"; } }, sql => sub { my ($db, $table, $schema, $query) = sql_parse_args @_; my @read = sql_schema_and_buffer $schema; my $index = sql_first_column_index $table, $schema; $query = expand_sql_shorthands $query; if ($db =~ s/^P//) { # postgres my $edb = expand_postgres_db $db; my $q = $query eq '_' ? '' : "COPY ($query) TO STDOUT WITH NULL AS '';\n"; open my $fh, "| psql -c " . shell_quote("DROP TABLE IF EXISTS $table;\n" . "CREATE TABLE $table ($schema);\n" . "COPY $table FROM STDIN;\n" . $index . $q) . " $edb 1>&2" or die "psql: failed to open psql for table $table $!"; write_buffer_and_stdin $fh, @read; close $fh; print "sql:P$db:select * from $table\n" unless length $q; } elsif ($db =~ s/^S//) { # sqlite3 my $fifo_name = tmpnam; mkfifo $fifo_name, 0700 or die "failed to create fifo: $!"; my $child = fork; unless ($child) { system "echo " . shell_quote(".mode tabs\n" . "DROP TABLE IF EXISTS $table;\n" . "CREATE TABLE $table ($schema);\n" . ".import $fifo_name $table\n" . $index . ($query eq '_' ? '' : "$query;\n")) . "| sqlite3 " . shell_quote expand_sqlite_db $db; } else { open my $fh, '>', $fifo_name or die "failed to open fifo into sqlite: $!"; write_buffer_and_stdin $fh, @read; close $fh; unlink $fifo_name; waitpid $child, 0; print "sql:S$db:select * from $table\n" if $query eq '_'; } } else { die "unknown SQL prefix: " . substr($db, 0, 1) . " (valid prefixes are P and S)"; } }, preview => sub { $verbose = 0; # don't print over the pager my $have_less = !system 'which less > /dev/null'; my $have_more = !system 'which more > /dev/null'; my $less_program = $have_less ? 'less' : $have_more ? 'more' : 'cat'; exec_with_stdin $less_program; }, ); my %bracket_handlers = ( '' => sub {my $stuff = shell_quote @_; "".qx|$0 --quote $stuff| =~ s/\s*$//r}, '@' => sub {my $stuff = shell_quote @_; "sh:".(qx|$0 --quote $stuff| =~ s/\s*$//r)}, q => sub {shell_quote @_}, ); my %bracket_docs = ( '' => 'nfu as function: [ -gc ] == "$(nfu --quote -gc)"', '@' => 'nfu as data: @[ -gc foo ] == sh:"$(nfu --quote -gc foo)"', q => 'quote things: q[ foo bar ] == "foo bar"', ); # Print usage if the user clearly doesn't know what they're doing. if (@ARGV ? $ARGV[0] =~ /^-[h?]$/ || $ARGV[0] =~ /^--(usage|help)$/ : -t STDIN) { # Some checks for me to make sure I'm keeping the code well-maintained exists $functions{$_} or die "no function for $_" for keys %usages; exists $usages{$_} or die "no usage for $_" for keys %functions; exists $arity{$_} or die "no arity for $_" for keys %usages; exists $usages{$_ =~ s/--//r} or die "no usage for $_" for values %explosions, keys %usages; exists $bracket_docs{$_} or die "no bracket doc for $_" for keys %bracket_handlers; print STDERR "usage: nfu [prefix-commands...] [input-files...] commands...\n"; print STDERR "where each command is one of the following:\n\n"; my $len = 1 + max map length, keys %usages; my %short_lookup; $short_lookup{$explosions{$_} =~ s/^--//r} = $_ for keys %explosions; for my $cmd (sort keys %usages) { my $short = $short_lookup{$cmd}; $short = defined $short ? "-$short|" : ' '; printf STDERR " %s--%-${len}s(%d) %s\n", $short, $cmd, $arity{$cmd}, $usages{$cmd} ? $arity{$cmd} ? "<$usages{$cmd}>" : "-- $usages{$cmd}" : ''; } print STDERR "\nand prefix commands are:\n\n"; print STDERR " documentation (not used with normal commands):\n"; print STDERR " --explain \n"; print STDERR " --expand-pseudofile \n"; print STDERR " --expand-code \n"; print STDERR " --expand-gnuplot \n"; print STDERR " --expand-sql \n"; print STDERR "\n pipeline modifiers:\n"; print STDERR " --quote -- quotes args: eval \$(nfu --quote ...)\n"; print STDERR " --use \n"; print STDERR " --run \n"; print STDERR "\nargument bracket preprocessing:\n\n"; print STDERR " ^stuff -> [ -stuff ]\n\n"; my $bracket_max = max map length, keys %bracket_docs; printf STDERR " %${bracket_max}s[ ] %s\n", $_, $bracket_docs{$_} for sort keys %bracket_docs; my $pseudofile_len = 1 + max map length, keys %pseudofile_docs; print STDERR "\npseudofile patterns:\n\n"; printf STDERR " %-${pseudofile_len}s %s\n", $_, $pseudofile_docs{$_} for sort keys %pseudofile_docs; print STDERR "\ngnuplot expansions:\n\n"; printf STDERR " %2s -> '%s'\n", $_, $gnuplot_aliases{$_} for sort keys %gnuplot_aliases; print STDERR "\nSQL expansions:\n\n"; printf STDERR " %2s -> '%s'\n", $_, $sql_aliases{$_} for sort keys %sql_aliases; print STDERR "\ndatabase prefixes:\n\n"; printf STDERR " %s = %s\n", @$_ for ['P' => 'PostgreSQL'], ['S' => 'SQLite 3']; my $env_len = 1 + max map length, keys %env_docs; print STDERR "\nenvironment variables:\n\n"; printf STDERR " %-${env_len}s %s\n", $_, $env_docs{$_} for sort keys %env_docs; print STDERR "\n"; print STDERR "see https://github.com/spencertipping/nfu for documentation\n"; print STDERR "\n"; exit 1; } if (@ARGV && $ARGV[0] =~ /^--expand/) { my ($command, $x, @others) = @ARGV; if ($command =~ /-pseudofile$/) { print expand_filename_shorthands($x) // '', "\n"; } elsif ($command =~ /-code$/) { print expand_eval_shorthands($x), "\n"; } elsif ($command =~ /-gnuplot$/) { print expand_gnuplot_options($x), "\n"; } elsif ($command =~ /-sql$/) { print expand_sql_shorthands($x), "\n"; } else { print STDERR "unknown expansion command: $command\n"; exit 1; } exit 0; } my @args_to_parse; # Preprocess args to look for bracketed groups. These need to be collapsed into # single args, which must happen before we start assigning arguments to # commands. while (@ARGV) { my $x = shift @ARGV; last if $x eq '--'; if ($x =~ s/\[$//) { die "unknown bracket prefix: $x" unless exists $bracket_handlers{$x}; my @xs; my $depth = 1; while (@ARGV) { my $next = shift @ARGV; $depth-- if $next eq ']'; last unless $depth; push @xs, $next; $depth++ if $next =~ /\[$/; } unshift @ARGV, $bracket_handlers{$x}->(@xs); } elsif ($x =~ s/^\^//) { # Lift the command (as a short option) into a quoted nfu instance. $x = shell_quote $x; unshift @ARGV, ''.qx|$0 --quote -$x|; } elsif ($x =~ s/\{$//) { # Parse a branching map, which has the form { 'pattern' stuff... , # 'pattern' stuff... , ... }. We generate a quoted TSV of packed base-64 # values. die "unknown brace prefix: $x" if length $x; my @lines; my $key; my @value; my $depth = 1; while (@ARGV) { my $next = shift @ARGV; $depth-- if $next eq '}'; if (!$depth || defined $key && $next eq ',') { push @lines, row pack('u', $key), pack('u', shell_quote @value); @value = (); $key = undef; } elsif (defined $key) { push @value, $next; } else { $key = $next; } last unless $depth; $depth++ if $next =~ /\{$/; } unshift @ARGV, join "\n", @lines; } elsif ($x eq '%') { # Everything else is a variable binding of the form 'x=y'. Then go back # through and rewrite all %x to be y. my %bindings = map split(/=/, $_, 2), @ARGV; my $names = join '|', keys %bindings; @ARGV = (); s#%($names)#$bindings{$1} // "%$1"#ge for @args_to_parse; } else { push @args_to_parse, $x; } } sub explode { return $_[0] unless $_[0] =~ s/^-([^-])/$1/; map {$explosions{$_} // $_} grep length, split /([-+.\d]*),?/, $_[0]; } my %custom_env; my @parsed; my $quote_self = 0; my $explain = 0; while (@args_to_parse) { unshift @args_to_parse, explode shift @args_to_parse; (my $command = shift @args_to_parse) =~ s/^--//; if (defined(my $arity = $arity{$command})) { my @args; push @args, shift @args_to_parse while @args_to_parse && (--$arity >= 0 || ! -e $args_to_parse[0] && $args_to_parse[0] =~ /^[-+]?\d+/); push @parsed, [$command, @args]; } elsif ($command =~ /(\w+)=(.*)/) { $ENV{$1} = $custom_env{$1} = $2; } elsif ($command eq 'run') { my $x = shift @args_to_parse; push @evaled_code, '--run', $x; eval $x; die "failed to run '$x': $@" if $@; } elsif ($command eq 'use') { my $x = shift @args_to_parse; my $s = read_file $x; # you can --use pseudofiles, woot! push @evaled_code, '--use', $x; eval $s; die "failed to use '$x': $@" if $@; } elsif ($command eq 'explain') { $explain = 1; } elsif ($command eq 'verbose' || $command eq 'v') { print STDERR "\033[2J" unless $quote_self; $verbose = 1; } elsif ($command eq 'child') { $is_child = 1; } elsif ($command eq 'quote') { $quote_self = 1; } else { if ($quote_self) { # Defer pseudofile resolution. This matters for things like intermediate # Hadoop outputs. push @ARGV, $command; } else { my $f = expand_filename_shorthands $command; die "nonexistent pseudofile: $command" unless defined $f; push @ARGV, $f; } } } if ($quote_self) { # Quote all other arguments so a shell will parse them correctly. print quote_self($verbose ? ('--verbose') : (), map("$_=$custom_env{$_}", keys %custom_env), @evaled_code, map(("--$$_[0]", @$_[1..$#$_]), @parsed), @ARGV), "\n"; exit 0; } # Open output in an interactive previewer if... push @parsed, ['preview'] if !$ENV{NFU_NO_PAGER} # we can page && (!-t STDIN || @ARGV) # not interacting for input && -t STDOUT; # interacting for output if ($explain) { # Explain what we would have done with the given command line. printf "file\t%s\n", $_ =~ s/#.*\n//gr for @ARGV; printf "--%s\t%s\n", ${$_}[0], join "\t", @{$_}[1 .. $#$_] for @parsed; } elsif (@parsed) { my $reader = undef; # Note: the loop below uses pipe/fork/dup2 instead of a more idiomatic Open2 # call. I don't have a good reason for this other than to figure out how the # low-level stuff worked. for (my $i = 0; $i < @parsed; ++$i) { my ($command, @args) = @{$parsed[$i]}; # Here's where things get fun. The question right now is, "do we need to # fork, or can we run in-process?" -- i.e. are we in the middle, or at the # end? When we're in the middle, we want to redirect STDOUT to the pipe's # writer and fork; otherwise we run in-process and write directly to the # existing STDOUT. ++$verbose_row; if ($i < @parsed - 1) { # We're in the middle, so allocate a pipe and fork. pipe my($new_reader), my($writer); $verbose_command = $command; @verbose_args = @args; unless (fork) { # We're the child, so do STDOUT redirection. close $new_reader or die "failed to close pipe reader: $!"; POSIX::close(0) or die "failed to close stdin" if defined $reader; dup2(fileno($reader), 0) or die "failed to dup input: $!" if defined $reader; POSIX::close(1); dup2(fileno($writer), 1) or die "failed to dup stdout: $!"; close $reader or die "failed to close reader: $!" if defined $reader; close $writer or die "failed to close writer: $!"; # The function here may never return. $functions{$command}->(@args); exit; } else { close $writer or die "failed to close pipe writer: $!"; close $reader if defined $reader; $reader = $new_reader; } } else { # We've hit the end of the chain. Preserve stdout, redirect stdin from # current reader. POSIX::close(0) or die "failed to close stdin" if defined $reader; dup2(fileno($reader), 0) or die "failed to dup input: $!" if defined $reader; close $reader or die "failed to close reader: $!" if defined $reader; $verbose_command = $command; @verbose_args = @args; $functions{$command}->(@args); } # Prevent <> from reading files after the first iteration (this is such a # hack). @ARGV = (); } } else { # Behave like cat, which is useful for auto-decompressing things. be_verbose_as_appropriate(length), print while <>; }