#!/usr/bin/env perl
# nfu: Command-line numeric fu | Spencer Tipping
# Licensed under the terms of the MIT source code license

use v5.14;
use strict;
use warnings;
use utf8;

use Fcntl;
use Socket;
use Time::HiRes qw/time/;
use POSIX       qw/dup2 mkfifo setsid :sys_wait_h/;
use File::Temp  qw/tmpnam/;
use Math::Trig;

use constant VERBOSE_INTERVAL        => 50;
use constant HADOOP_VERBOSE_INTERVAL => 10000;
use constant DELAY_BEFORE_VERBOSE    => 500;

use constant SQL_INFER_PEEK_LINES    => 20;

use constant LOG_2 => log(2);

# 64-bit hex constants in geohash encoder won't work on 32-bit architectures
no warnings 'portable';

our $diamond_has_data = 1;

++$|;

# Setup child capture. All we need to do is wait for child pids; there's no
# formal teardown.
$SIG{CHLD} = sub {
  local ($!, $?);
  1 while waitpid(-1, WNOHANG) > 0;
};

# NB: This import is not used in nfu directly; it's here so you can use these
# functions inside aggregators.
use List::Util qw(first max maxstr min minstr reduce shuffle sum);

sub prod {
  my $p = 1;
  $p *= $_ for @_;
  $p;
}

# Same for this, which is especially useful from aggregators because multiple
# values create multiple output rows, not multiple columns on the same output
# row.
sub row {join "\t", map s/\n//gr, @_}

# Order-preserving unique values for strings. This is just too useful not to
# provide.
sub uniq {
  local $_;
  my %seen;
  my @order;
  $seen{$_}++ or push @order, $_ for @_;
  @order;
}

sub frequencies {
  local $_;
  my %freqs;
  ++$freqs{$_} for @_;
  %freqs;
}

sub reductions(&$@) {
  my ($f, $x, @xs) = @_;
  my @ys;
  push @ys, $x = $f->($x, $_) for @xs;
  @ys;
}

sub cp {
  # Cartesian product of N arrays, each passed in as a ref
  return ()                 if @_ == 0;
  return map [$_], @{$_[0]} if @_ == 1;

  my @ns     = map scalar(@$_), @_;
  my @shifts = reverse reductions {$_[0] * $_[1]} 1 / $ns[0], reverse @ns;
  map {
    my $i = $_;
    [map $_[$_][int($i / $shifts[$_]) % $ns[$_]], 0..$#_];
  } 0..prod(@ns) - 1;
}

sub round_to {
  my ($x, $quantum) = @_;
  $quantum ||= 1;
  my $sign = $x < 0 ? -1 : 1;
  int(abs($x) / $quantum + 0.5) * $quantum * $sign;
}

sub mean {scalar @_ && sum(@_) / @_}
sub log2 {log($_[0]) / LOG_2}

sub entropy {
  local $_;
  my $s = sum(@_) || 1;
  my $t = 0;
  $t -= ($_ / $s) * log($_ / $s) for @_;
  $t / LOG_2;
}

sub dot {
  local $_;
  my ($u, $v) = @_;
  my $s = 0;
  $s += $$u[$_] * $$v[$_] for 0 .. min($#{$u}, $#{$v});
  $s;
}

sub dist {
  # Euclidean distance (you specify the deltas)
  local $_;
  my $s = 0;
  $s += $_*$_ for @_;
  sqrt($s);
}

sub sdist {
  # Spherical-coordinate great circle distance; you specify theta1, phi1,
  # theta2, phi2, each in degrees; radius is assumed to be 1. Math from
  # http://stackoverflow.com/questions/27928/how-do-i-calculate-distance-between-two-latitude-longitude-points
  local $_;
  my ($t1, $p1, $t2, $p2) = map $_ / 180 * pi, @_;
  my $dt = $t2 - $t1;
  my $dp = $p2 - $p1;
  my $a  = sin($dp / 2) * sin($dp / 2)
         + cos($p1) * cos($p2) * sin($dt / 2) * sin($dt / 2);
  2 * atan2(sqrt($a), sqrt(1 - $a));
}

sub edist {
  # Earth distance between two latitudes/longitudes, in km; up to 0.5% error
  my ($lat1, $lng1, $lat2, $lng2) = @_;
  6371 * sdist $lng1, $lat1, $lng2, $lat2;
}

sub line_opposite {
  # Returns true if two points are on opposite sides of the line starting at
  # (x0, y0) and whose direction is (dx, dy).
  my ($x0, $y0, $dx, $dy, $x1, $y1, $x2, $y2) = @_;
  return (($x1 - $x0) * $dy - ($y1 - $y0) * $dx)
       * (($x2 - $x0) * $dy - ($y2 - $y0) * $dx) < 0;
}

sub evens(@) {local $_; @_[map $_ * 2,     0 .. $#_ >> 1]}
sub odds(@)  {local $_; @_[map $_ * 2 + 1, 0 .. $#_ >> 1]}

sub parse_wkt {
  my @rings = map [/([-0-9.]+)\s+([-0-9.]+)/g], split /\)\s*,\s*\(/, $_[0];
  {
    rings  => [map [map $_ + 0, @$_], @rings],
    ylimit => 1 + max(map max(@$_), @rings),
    bounds => [min(map evens(@$_), @rings), max(map evens(@$_), @rings),
               min(map odds(@$_),  @rings), max(map odds(@$_),  @rings)],
  }
}

sub in_poly {
  # Returns true if a point resides in the given parsed polygon.
  my ($x, $y, $parsed) = @_;
  my $ylimit = $parsed->{ylimit};
  my @bounds = @{$parsed->{bounds}};
  return 0 if $x < $bounds[0] || $x > $bounds[1]
           || $y < $bounds[2] || $y > $bounds[3];

  my $hits = 0;
  for my $r (@{$parsed->{rings}}) {
    my ($lx, $ly) = @$r[0, 1];
    for (my $i = 2; $i < @$r; $i += 2) {
      my $cx = $$r[$i];
      my $cy = $$r[$i + 1];
      ++$hits if $lx <= $x && $x < $cx || $lx >= $x && $x > $cx
             and line_opposite $lx, $ly, $cx - $lx, $cy - $ly,
                               $x, $y, $x, $ylimit;
      $lx = $cx;
      $ly = $cy;
    }
  }
  $hits & 1;
}

sub rect_polar { (dist(@_), atan2($_[0], $_[1]) / pi * 180) }
sub polar_rect { ($_[0] * sin($_[1] / 180 * pi),
                  $_[0] * cos($_[1] / 180 * pi)) }

sub degrees_radians { $_[0] / 180 * pi }
sub radians_degrees { $_[0] / 180 * pi }

# JSON support (if available)
our $json;
if (eval {require JSON}) {
  JSON->import;
  no warnings qw(uninitialized);
  $json = JSON->new->allow_nonref->utf8(1);
} elsif (eval {require JSON::PP}) {
  JSON::PP->import;
  no warnings qw(uninitialized);
  $json = JSON::PP->new->allow_nonref->utf8(1);
} else {
  print STDERR "note: no JSON support detected (try 'cpan install JSON')\n";
  print STDERR "nfu will soon have its own JSON parser rather than using ";
  print STDERR "a native library for this. Sorry for the inconvenience.";
}

# These are callable from evaled code
sub expand_filename_shorthands;
sub read_file {
  open my $fh, expand_filename_shorthands $_[0], 1;
  my $result = join '', <$fh>;
  close $fh;
  $result;
}

sub write_file {
  open my $fh, '>', $_[0];
  $fh->print($_[1]);
  close $fh;
  $_[0];
}

sub read_lines {
  local $_;
  open my $fh, expand_filename_shorthands $_[0], 1;
  my @result;
  chomp, push @result, $_ for <$fh>;
  close $fh;
  @result;
}

sub write_lines {
  local $_;
  my $filename = shift @_;
  open my $fh, '>', $filename;
  $fh->print($_, "\n") for @_;
  close $fh;
  $filename;
}

sub open_file {
  open my $fh, expand_filename_shorthands $_[0], 1;
  $fh;
}

sub json_encode {$json->encode(@_)}
sub json_decode {$json->decode(@_)}

sub hadoop_counter {
  printf STDERR "reporter:counter:nfu,%s_%s,%d\n", $_[0] =~ y/,/_/r,
                                                   $_[1] =~ y/,/_/r,
                                                   $_[2] // 1;
}

sub F {my $l = shift @_; (split /\t/, $l)[@_]}

our @gh_alphabet = split //, '0123456789bcdefghjkmnpqrstuvwxyz';
our %gh_decode   = map(($gh_alphabet[$_], $_), 0..$#gh_alphabet);

sub gap_bits {
  my ($x) = @_;
  $x |= $x << 16; $x &= 0x0000ffff0000ffff;
  $x |= $x << 8;  $x &= 0x00ff00ff00ff00ff;
  $x |= $x << 4;  $x &= 0x0f0f0f0f0f0f0f0f;
  $x |= $x << 2;  $x &= 0x3333333333333333;
  return ($x | $x << 1) & 0x5555555555555555;
}

sub ungap_bits {
  my ($x) = @_;  $x &= 0x5555555555555555;
  $x ^= $x >> 1; $x &= 0x3333333333333333;
  $x ^= $x >> 2; $x &= 0x0f0f0f0f0f0f0f0f;
  $x ^= $x >> 4; $x &= 0x00ff00ff00ff00ff;
  $x ^= $x >> 8; $x &= 0x0000ffff0000ffff;
  return ($x ^ $x >> 16) & 0x00000000ffffffff;
}

sub geohash_encode {
  local $_;
  my ($lat, $lng, $precision) = @_;
  $precision //= 12;
  my $bits = $precision > 0 ? $precision * 5 : -$precision;
  my $gh   = (gap_bits(int(($lat +  90) / 180 * 0x40000000)) |
              gap_bits(int(($lng + 180) / 360 * 0x40000000)) << 1)
             >> 60 - $bits;

  $precision > 0 ? join '', reverse map $gh_alphabet[$gh >> $_ * 5 & 31],
                                        0 .. $precision - 1
                 : $gh;
}

sub geohash_decode {
  local $_;
  my ($gh, $bits) = @_;
  unless (defined $bits) {
    # Decode gh from base-32
    $bits = length($gh) * 5;
    my $n = 0;
    $n = $n << 5 | $gh_decode{lc $_} for split //, $gh;
    $gh = $n;
  }
  $gh <<= 60 - $bits;
  return (ungap_bits($gh)      / 0x40000000 * 180 -  90,
          ungap_bits($gh >> 1) / 0x40000000 * 360 - 180);
}

# HTTP
sub http_send { sprintf "HTTP/1.0 %s\nContent-Length: %d\n\n%s\n",
                        $_[0],
                        length($_[1]),
                        $_[1] }

sub httpok   { sprintf "HTTP/1.0 200 OK\nContent-Length: %d\n\n%s\n",
                       length($_[0]),
                       $_[0] }

sub chunk    { sprintf "%x\r\n%s\r\n", length($_[0]), $_[0] }
sub endchunk { chunk '' }

# Function shorthands
BEGIN {
  *dr = \&degrees_radians;
  *rd = \&radians_degrees;
  *je = \&json_encode;
  *jd = \&json_decode;

  *hc = \&hadoop_counter;

  *rf = \&read_file;
  *rl = \&read_lines;
  *wf = \&write_file;
  *wl = \&write_lines;

  *of = \&open_file;

  *ghe = \&geohash_encode;
  *ghd = \&geohash_decode;
}

# File functions
sub random128 {join '', map sprintf('%04x', rand 65536), 0 .. 8}
sub hadoop_ls;
sub hadoop_matched_partfiles;
sub shell_quote;
sub expand_sql_shorthands;
sub expand_sqlite_db;
sub expand_postgres_db;

sub expand_filename_shorthands {
  # NB: we prepend a shell comment containing the original $f so anyone
  # downstream can get it back. This is currently used by Hadoop to reuse data
  # stored on HDFS if you write it on the command-line. (Otherwise nfu would
  # hadoop fs -text to download it, then re-upload to a tempfile.)

  my ($f, $always_make_a_command) = @_;
  my $result;
  my $original = ($f =~ s/^/#/mgr) . "\n";

  no warnings 'newline';

  if (-e $f || $f =~ s/^file://) {
    # It's really a filename, so push it onto @ARGV. If it's compressed, run it
    # through the appropriate decompressor first.
    my $piped = $f =~ s/^(.*\.gz)/cat '$1' | gzip -d/ri
                   =~ s/^(.*\.bz2)/cat '$1' | bzip2 -d/ri
                   =~ s/^(.*\.xz)/cat '$1' | xz -d/ri
                   =~ s/^(.*\.lzo)/cat '$1' | lzop -d/ri;
    $result = $piped =~ /\|/ ? "$original$piped |" : $piped;
  } elsif ($f =~ /^https?:\/\//) {
    # Assume a URL and curl it
    $f = shell_quote $f;
    $result = "${original}curl $f |";
  } elsif ($f =~ s/^sh://) {
    # Execute a command and capture stdout
    $result = "$original$f |";
  } elsif ($f =~ /^s3:/) {
    # Use s3cmd and cat to stdout
    $result = "${original}s3cmd get '$f' - |";
  } elsif ($f =~ s/^hdfs-ls:(?:\/\/)?//) {
    # Just list a directory. This is used by hdfs: below to provide
    # indirection.
    $result = "${original}$0 n:1 -m '"
            . q{map "hdfs:" . shell_quote($_), hadoop_ls %[q[} . $f . q{]%]}
            . "' |";
  } elsif ($f =~ s/^hdfs:(?:\/\/)?//) {
    # Use Hadoop commands to read stuff. The command itself needs to be
    # deferred because the hdfs path might not exist when the pseudofile is
    # resolved. Indirected through $0 hdfs-ls:X because some HDFS directories
    # contain enough files that we'll get arg-list-too-long errors if we invoke
    # hadoop fs -text directly. This workaround lets us stream the output from
    # hadoop fs -ls.
    #
    # Redirecting errors to /dev/null because hadoop fs -text complains loudly
    # if you close its output stream, and this is really annoying.
    $f = shell_quote "hdfs-ls:$f";
    $result = "${original}$0 $f | xargs hadoop fs -text 2>/dev/null |";
  } elsif ($f =~ s/^hdfsjoin://) {
    $result = "${original}$0 "
            . shell_quote(map "hdfs:$_",
                          hadoop_matched_partfiles $f, $ENV{map_input_file})
            . " |";
  } elsif ($f =~ s/^perl://) {
    # Evaluate a Perl expression
    $f =~ s/'/'"'"'/g;
    $result = "${original}perl -e 'print \$_, \"\\n\" for ($f)' |";
  } elsif ($f =~ s/^n://) {
    $result = "${original}perl -e 'print \$_, \"\\n\" for (1..($f))' |";
  } elsif ($f =~ s/^id://) {
    $result = "${original}echo " . shell_quote($f) . " |";
  } elsif ($f =~ s/^sql://) {
    # Run a postgres or sqlite3 query, exporting results as TSV
    my ($db, $query) = split /:/, $f, 2;
    $query = expand_sql_shorthands $query;

    if ($db =~ s/^P//) {
      # postgres
      $db     = expand_postgres_db $db;
      $query  = shell_quote "COPY ($query) TO STDOUT WITH NULL AS ''";
      $result = "${original}psql -c $query $db |";
    } elsif ($db =~ s/^S//) {
      # sqlite3
      $db     = shell_quote expand_sqlite_db $db;
      $result = "${original}echo " . shell_quote(".mode tabs\n$query;")
                                   . "| sqlite3 $db |";
    } else {
      die "unknown database prefix " . substr($db, 0, 1)
        . " for pseudofile $original (valid prefixes are P and S)";
    }
  } elsif ($f =~ /(\w*@?[^:]+):(.*)$/) {
    # Access file over SSH. We need to make sure nfu is running on the remote
    # end, since the remote file might be gzipped or some such. Because the
    # remote machine might not have nfu, this involves us piping ourselves over
    # to that machine.
    $result = "${original}cat '$0' | ssh -C '$1' perl - '$2' |";
  } else {
    return undef;
  }

  $always_make_a_command && $result !~ /\|/ ? "${original}cat '$result' |"
                                            : $result;
}

sub unexpand_filename_shorthands {
  my ($f) = @_;
  $f =~ /^#(.*)/ ? $1 : $f;
}

my %pseudofile_docs = (
  'file.gz'       => 'decompress file with gzip -dc',
  'file.bz2'      => 'decompress file with bzip2 -dc',
  'file.xz'       => 'decompress file with xz -dc',
  'file.lzo'      => 'decompress file with lzop -dc',
  'http[s]://url' => 'retrieve url with curl',
  'sh:stuff'      => 'run sh -c "stuff", take stdout',
  's3://url'      => 'access S3 using s3cmd',
  'hdfs:path'     => 'read HDFS file(s) with hadoop fs -text',
  'hdfs-ls:path'  => 'pseudofiles parsed from hadoop fs -ls',
  'hdfsjoin:path' => 'mapside join pseudofile (a subset of hdfs:path)',
  'sql:db:query'  => 'results of query as TSV',
  'perl:expr'     => 'perl -e \'print "$_\n" for (expr)\'',
  'n:number'      => 'numbers from 1 to n, inclusive',
  'id:X'          => 'verbatim text X',
  'user@host:x'   => 'remote data access (x can be a pseudofile)',
);

# Flags
our $is_child   = 0;
our $verbose    = 0;
our $n_lines    = 0;
our $n_bytes    = 0;
our $start_time = undef;

our $verbose_command = '';
our @verbose_args;
our $verbose_command_formatted = undef;
our $inside_hadoop_job         = length $ENV{mapred_job_id};
our $verbose_interval          = $inside_hadoop_job ? HADOOP_VERBOSE_INTERVAL
                                                    : VERBOSE_INTERVAL;
our $empirical_verbosity       = 0;

$verbose ||= $inside_hadoop_job;
$verbose ||= length $ENV{NFU_ALWAYS_VERBOSE};

our $last_verbose_report = 0;
our $verbose_row         = 0;

# Call it like this:
# while (<>) {
#   be_verbose_as_appropriate length;
#   ...
# }
sub be_verbose_as_appropriate {
  return if $is_child;
  return unless $verbose;
  local $_;
  my ($record_length) = @_;
  $n_lines += !!$record_length;
  $n_bytes += $record_length;
  my $now = time;
  return unless $record_length == 0
             || ($now - $last_verbose_report) * 1000 > $verbose_interval;

  $last_verbose_report = $now;
  $verbose_command_formatted //= join ' ', $verbose_command, @verbose_args;
  $start_time //= $now;
  my $runtime = $now - $start_time || 0.001;

  return if $runtime * 1000 < DELAY_BEFORE_VERBOSE;
  ++$empirical_verbosity;

  unless ($inside_hadoop_job) {
    # Print status updates straight to the terminal
    printf STDERR "\033[%d;1H\033[K%10dl %8.1fl/s %10dk %8.1fkB/s  %s",
                  $verbose_row,
                  $n_lines,
                  $n_lines / $runtime,
                  $n_bytes / 1024,
                  $n_bytes / 1024 / $runtime,
                  substr($verbose_command_formatted, 0, 40);
  } else {
    # Use Hadoop-specific syntax to update job counters. Smaller units are
    # better because Hadoop counters are integers, so they'll suffer from
    # truncation for fractional quantities.
    $verbose_command_formatted =~ s/[,\n]/_/g;
    hc $verbose_command_formatted, @$_
    for (['lines',      $n_lines],
         ['runtime ms', $runtime * 1000],
         ['bytes',      $n_bytes]);

    # Reset variables because Hadoop treats them as incremental
    $n_lines    = 0;
    $n_bytes    = 0;
    $start_time = $now;
  }
}

END {
  be_verbose_as_appropriate 0;
  print STDERR "\n" if $empirical_verbosity;
}

# This variable will keep track of any state accumulated from --use or --run
# arguments. This is required for --pmap to work correctly.
my @evaled_code;

sub shell_quote {join ' ', map /[^-\/\w]/ ? "'" . s/(['\\])/'\\$1'/gr . "'"
                             : length $_  ? $_
                             :              "''", @_}

sub quote_self {shell_quote $0, @_}

my %explosions = (
  a => '--average',
  A => '--aggregate',
  b => '--branch',
  c => '--count',
  C => '--uncount',
  D => '--drop',
  e => '--each',
  E => '--every',
  f => '--fields',
  F => '--fieldsplit',
  g => '--group',
  G => '--rgroup',
  h => '--hadoopc',
  H => '--hadoop',
  i => '--index',
  I => '--indexouter',
  j => '--join',
  J => '--joinouter',
  k => '--keep',
  K => '--remove',
  l => '--log',
  L => '--exp',
  m => '--map',
  M => '--pmap',
  n => '--number',
  N => '--ntiles',
  o => '--order',
  O => '--rorder',
  p => '--plot',
  P => '--poll',
  q => '--quant',
  Q => '--sql',
  r => '--read',
  R => '--buffer',
  s => '--sum',
  S => '--delta',
  T => '--take',
  # v => '--verbose'            # handled during option parsing
  V => '--variance',
  w => '--with',
  z => '--intify',
);

my %implosions;
$implosions{$explosions{$_}} = $_ for keys %explosions;

# Minimum number of required arguments for each function. Numeric arguments are
# automatically forwarded, so are always optional.
my %arity = (
  average    => 0,
  aggregate  => 1,
  branch     => 1,
  count      => 0,
  uncount    => 0,
  delta      => 0,
  drop       => 0,
  each       => 1,
  every      => 1,
  fields     => 0,
  fieldsplit => 1,
  fold       => 1,
  group      => 0,
  rgroup     => 0,
  hadoop     => 3,
  hadoopc    => 4,
  index      => 2,
  indexouter => 2,
  join       => 2,
  joinouter  => 2,
  keep       => 1,
  log        => 0,
  exp        => 0,
  map        => 1,
  pmap       => 1,
  number     => 0,
  ntiles     => 1,
  order      => 0,
  rorder     => 0,
  plot       => 1,
  poll       => 2,
  read       => 0,
  buffer     => 1,
  sum        => 0,
  quant      => 1,
  remove     => 1,
  sample     => 1,
  take       => 0,
  variance   => 0,
  with       => 1,
  intify     => 0,

  # Commands with no shorthands
  append     => 1,
  prepend    => 1,
  tee        => 1,
  duplicate  => 2,
  partition  => 2,
  splot      => 1,
  sd         => 0,
  mplot      => 1,
  preview    => 0,
  pipe       => 1,
  entropy    => 0,
  sql        => 3,
  tcp        => 1,
  http       => 1,
  repeat     => 2,
  octave     => 1,
  numpy      => 1,
);

my %usages = (
  average    => 'window size (0 for full average) -- running average',
  aggregate  => 'aggregator fn',
  branch     => 'branch (takes a pattern map)',
  count      => 'counts by first column value; like uniq -c',
  uncount    => 'the opposite of --count; repeats each row N times',
  delta      => 'value -> difference from last value',
  drop       => 'number of records to drop',
  each       => 'template; executes with {} set to each value',
  every      => 'n (returns every nth row)',
  fields     => 'string of digits, each a zero-indexed column selector',
  fieldsplit => 'regexp to use for splitting',
  fold       => 'function that returns true when line should be folded',
  group      => 'sorts ascending, takes optional column list',
  rgroup     => 'sorts descending, takes optional column list',
  hadoop     => 'hadoop streaming: outpath|.|@, mapper|:, reducer|:|_',
  hadoopc    => 'hadoop streaming: ..., combiner|:|_, reducer|:|_',
  index      => 'field index, unsorted pseudofile to join against',
  indexouter => 'field index, unsorted pseudofile to join against',
  join       => 'field index, sorted pseudofile to join against',
  joinouter  => 'field index, sorted pseudofile to join against',
  keep       => 'row filter fn',
  log        => 'optional base (default e)',
  exp        => 'optional base (default e)',
  map        => 'row map fn',
  pmap       => 'row map fn (executed multiple times in parallel)',
  number     => 'prepends line number to each line',
  ntiles     => 'takes N, produces ntiles of numbers',
  order      => 'sorts ascending by general numeric value',
  rorder     => 'sorts descending by general numeric value',
  plot       => 'gnuplot arguments',
  poll       => 'interval in seconds, command whose output to collect',
  sum        => 'value -> total += value',
  quant      => 'number to round to',
  read       => 'reads pseudofiles from the data stream',
  buffer     => 'creates a pseudofile from the data stream',
  remove     => 'inverted row filter fn',
  sample     => 'row selection probability in [0, 1]',
  take       => 'n to take first n, +n to take last n',
  variance   => 'running variance',
  with       => 'pseudofile to join column-wise onto input',
  intify     => 'convert column to dense integers (linear space)',

  append     => 'pseudofile; appends its contents to current stream',
  prepend    => 'pseudofile; prepends its contents to current stream',
  tee        => 'shell command; duplicates data to stdin of command',
  duplicate  => 'two shell commands as separate arguments',
  partition  => 'partition id fn, shell command (using {})',
  splot      => 'gnuplot arguments',
  sd         => 'running standard deviation',
  mplot      => 'gnuplot arguments per column, separated by ;',
  preview    => '',
  pipe       => 'shell command to pipe through',
  entropy    => 'running entropy of relative probabilities/frequencies',
  sql        => 'create/query SQL table: db[:[+]table], schema|_, query|_',
  tcp        => 'TCP server (emits fifo filenames)',
  http       => 'HTTP adapter for TCP server output',
  repeat     => 'repeat count, pseudofile to repeat',
  octave     => 'pipe through octave; vector is called xs',
  numpy      => 'pipe through numpy; vector is called xs',
);

my %env_docs = (
  NFU_SORT_BUFFER      => 'default 64M; size of in-memory sort for -g and -o',
  NFU_SORT_PARALLEL    => 'default 4; number of concurrent sorts to run',
  NFU_SORT_COMPRESS    => 'default none; compression program for sort tempfiles',
  NFU_SORT_OPTIONS     => 'override all sort options except column spec',
  NFU_ALWAYS_VERBOSE   => 'if set, nfu will be verbose all the time',
  NFU_NO_PAGER         => 'if set, nfu will not use "less" to preview stdout',
  NFU_PMAP_PARALLELISM => 'number of subprocesses for -M',
  NFU_MAX_FILEHANDLES  => 'default 64; maximum #subprocesses for --partition',
  NFU_HADOOP_FILES     => 'comma-separated files to include with streaming job',
  NFU_HADOOP_STREAMING => 'absolute location of hadoop-streaming.jar',
  NFU_HADOOP_OPTIONS   => '-D options for hadoop streaming jobs',
  NFU_HADOOP_COMMAND   => 'hadoop executable; e.g. hadoop jar, hadoop fs -ls',
  NFU_HADOOP_TMPDIR    => 'default /tmp; temp dir for hadoop uploads',
);

my %gnuplot_aliases = (
  '%l' => ' with lines',
  '%d' => ' with dots',
  '%i' => ' with impulses',
  '%v' => ' with vectors ',
  '%u' => ' using ',
  '%t' => ' title ',
  '%p' => ' lc palette ',
);

my %fieldsplit_shorthands = (
  S => '\s+',
  W => '\W+',
  C => ',',
);

sub expand_gnuplot_options {
  my @transformed_opts;
  for my $opt (@_) {
    $opt =~ s/$_/$gnuplot_aliases{$_}/g for keys %gnuplot_aliases;
    push @transformed_opts, $opt;
  }
  @transformed_opts;
}

my %sql_aliases = (
  '%\*' => ' select * from ',
  '%c'  => ' select count(1) from ',
  '%d'  => ' select distinct * from ',
  '%g'  => ' group by ',
  '%j'  => ' inner join ',
  '%l'  => ' outer left join ',
  '%r'  => ' outer right join ',
  '%w'  => ' where ',
);

sub expand_sql_shorthands {
  my ($sql) = @_;
  $sql =~ s/$_/$sql_aliases{$_}/eg for keys %sql_aliases;
  $sql;
}

sub expand_sqlite_db {
  my $tempdir = tmpnam =~ s/\/[^\/]+$//r;
  return "$tempdir/nfu-$ENV{USER}-sqlite.db" if $_[0] eq '@';
  return $_[0];
}

sub expand_postgres_db {
  # Expands a DB descriptor into a series of properly-shellquoted options to
  # pass to the 'psql' command.
  my ($host, $user, $db) = ('localhost', $ENV{USER}, $ENV{USER});
  if ($_[0] =~ m#^(?:([^\@/:]+)@)?([^\@/:]+)/([^/]+)$#) {
    # NB: don't try to change this to use $1, $2, etc as arguments to
    # shell_quote. Perl references arguments, so this will fail horribly.
    ($user, $host, $db) = ($1 // $ENV{USER}, $2 // 'localhost', $3);
    shell_quote '-U', $user, '-h', $host, '-d', $db;
  } elsif ($_[0] =~ m#^(\w+)@(\w+)$#) {
    # Simple DB connection; assume localhost
    ($user, $db) = ($1, $2);
    shell_quote '-U', $user, '-d', $db;
  } elsif ($_[0] =~ m#^[^\@:/]+$#) {
    # Really simple connection
    shell_quote '-d', $_[0];
  } elsif ($_[0] eq '@') {
    # Really simple connection; use username as DB
    shell_quote '-d', $ENV{USER};
  } else {
    die "not sure how to parse postgres DB '$_[0]'";
  }
}

sub expand_eval_shorthands {
  my $code   = $_[0];
  my @pieces = split /%\[(.*?)%\]/, $code;
  for my $i (0..$#pieces) {
    unless ($i & 1) {
      $pieces[$i] =~ s/%(\d+)/\$_[$1]/g;
      1 while $pieces[$i]
                =~ s/([a-zA-Z0-9_\)\}\]?\$])
                     \.
                     ([\$_a-zA-Z](?:-[0-9\w?\$]|[0-9_\w?\$])*)
                    /$1\->{'$2'}/x;
    }
  }
  join '', @pieces;
}

sub parse_join_options {
  my ($f1, $f2, $file) = @_ == 3 ? @_ : ($_[0], 0, $_[1]);
  ($f1 + 1, $f2 + 1, $file);
}

sub compile_eval_into_function {
  my ($code, $name) = @_;
  $code = expand_eval_shorthands $code;
  eval "sub {\n$code\n}"
    or die "failed to compile $name function: $@\n  (code was $code)";
}

sub stateless_unary_fn {
  my ($name, $f) = @_;
  my $arity = $arity{$name};
  ($name, sub {
    my @columns = split //, (@_ > $arity ? shift : undef) // '0';
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      my @fs = split /\t/;
      $fs[$_] = $f->($fs[$_], @_) for @columns;
      print row(@fs), "\n";
    }
  });
}

sub stateful_unary_fn {
  my ($name, $setup, $f) = @_;
  my $arity = $arity{$name};
  ($name, sub {
    my @columns = split //, (@_ > $arity ? shift : undef) // '0';
    my %states;
    $states{$_} = $setup->(@_) for @columns;
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      my @fs = split /\t/;
      $fs[$_] = $f->($fs[$_], $states{$_}, @_) for @columns;
      print row(@fs), "\n";
    }
  });
}

sub exec_with_stdin {
  open my $fh, '|' . shell_quote @_ or die "failed to exec @_";
  be_verbose_as_appropriate(length), print $fh $_ while <>;
  close $fh;
}

sub exec_with_diamond {
  if ($verbose || grep /\|/, @ARGV) {
    # Arguments are specified in filenames and involve processes, so use perl
    # to forward data.
    exec_with_stdin @_;
  } else {
    # Faster option: just exec the program in-place. This avoids a layer of
    # interprocess piping. Assume filenames follow arguments.
    exec @_, @ARGV or die "failed to exec @_ @ARGV";
  }
}

sub sort_options {
  my ($column_spec) = @_;
  my @columns       = split //, $column_spec // '';
  my @options       = exists $ENV{NFU_SORT_OPTIONS}
                    ? split /\s+/, $ENV{NFU_SORT_OPTIONS}
                    : ('-S', $ENV{NFU_SORT_BUFFER} || '64M',
                       '--parallel=' . ($ENV{NFU_SORT_PARALLEL} || 4),
                       $ENV{NFU_SORT_COMPRESS}
                         ? ("--compress-program=$ENV{NFU_SORT_COMPRESS}")
                         : ());
  return @options,
         (@columns
           ? ('-t', "\t",
              map {('-k', sprintf "%d,%d", $_ + 1, $_ + 1)} @columns)
           : ());
}

sub sort_cmd {join ' ', 'sort', sort_options, @_}

sub fifo_for {
  my ($file, @transforms) = @_;
  my $fifo_name = tmpnam;

  mkfifo $fifo_name, 0700 or die "failed to create fifo: $!";

  return $fifo_name if fork;

  my $command = expand_filename_shorthands($file, 1)
              . join '', map {"$_ |"} @transforms;
  open my $into_fifo, '>', $fifo_name
    or die "failed to open fifo $fifo_name for writing: $!";
  open my $from_file, $command
    or die "failed to open file/command $command for reading: $!";

  be_verbose_as_appropriate(length), $into_fifo->print($_) while <$from_file>;
  close $into_fifo;
  close $from_file;

  unlink $fifo_name or warn "failed to unlink temporary fifo $fifo_name: $!";
  exit 0;
}

sub hadoop {
  # Generates a hadoop command string, quoting all args as necessary
  join ' ', $ENV{NFU_HADOOP_COMMAND} // "hadoop",
            @_[0, 1],
            shell_quote(@_ > 2 ? @_[2..$#_] : ());
}

sub hadoop_ls {
  # Now get the output file listing. This is a huge mess because Hadoop is a
  # huge mess.
  my $ls_command = hadoop('fs', '-ls', @_);
  grep /\/[^_][^\/]*$/, map +(split " ", $_, 8)[7],
                        grep !/^Found/,
                        split /\n/, ''.qx/$ls_command/;
}

sub gcd {
  my ($a, $b) = @_;
  while ($b) {
    if ($b > $a) { ($a, $b) = ($b, $a) }
    else         { $a %= $b }
  }
  $a;
}

sub hadoop_partfile_n { $_[0] =~ /[^0-9]([0-9]+)(?:\.[^\/]+)?$/ ? $1 : 0 }
sub hadoop_partsort {
  sort {hadoop_partfile_n($a) <=> hadoop_partfile_n($b)} @_
}

sub hadoop_matched_partfiles {
  my ($path, $partfile) = @_;
  my $partfile_dirname = $partfile =~ s/\/[^\/]+$//r;
  my @left_files;
  my @possibilities;

  # We won't be able to do anything until both sides of the join exist.
  until (@left_files = hadoop_partsort hadoop_ls $partfile_dirname) {
    print STDERR "hadoop_matched_partfiles: waiting for input...\n";
    hc qw/hadoop_matched_partfiles left_wait/;
    sleep 1;
  }

  until (@possibilities = hadoop_partsort hadoop_ls $path) {
    print STDERR "hadoop_matched_partfiles: waiting for join data...\n";
    hc qw/hadoop_matched_partfiles right wait/;
    sleep 1;
  }

  my $n = hadoop_partfile_n $partfile;
  unless ($n < @left_files && $left_files[$n] eq $partfile) {
    my $partfile_n = $n;
    $n = 0;
    ++$n until $n >= @left_files
            || $partfile_n == hadoop_partfile_n $partfile;
  }

  die "hadoop_matched_partfiles couldn't find the index of the specified "
    . "partfile ($partfile) within the list of map inputs (@left_files)"
  if $n >= @left_files;

  # Assume Hadoop's default partitioning strategy using hashcode modulus. This
  # means that the Kth of N files contains all keys for which H(key) % N == K.
  my $reduction_factor = gcd scalar(@left_files), scalar(@possibilities);
  my $left_redundancy  = @possibilities / $reduction_factor;
  my @files_to_read    = @possibilities[map $_ * $reduction_factor
                                          + $n % $reduction_factor,
                                            0 .. $left_redundancy - 1];

  # Log some stuff to job counters so any problems are more evident.
  if ($verbose) {
    printf STDERR "hdfsjoin:$path [$partfile] (inferred partfile $n): %s\n",
                  join(' ', @files_to_read);

    hc "hdfsjoin $path", "joins attempted",  1;
    hc "hdfsjoin $path", "left/right reads", $left_redundancy;
    hc "hdfsjoin $path", "overreads",        $left_redundancy - 1;
  }

  @files_to_read;
}

sub hadoop_tempfile {
  my $randomness = random128;
  my $dir        = $ENV{NFU_HADOOP_TMPDIR} // '/tmp';
  "$dir/nfu-hadoop-$ENV{USER}-$randomness";
}

sub find_hadoop_streaming {
  my @files = split /\n/, ''.qx|locate "*hadoop-streaming.jar*"|;
  die "failed to locate hadoop streaming jar automatically; "
    . "you should set NFU_HADOOP_STREAMING" unless @files;
  print STDERR "nfu found hadoop streaming jar: $files[0]\n";
  $files[0];
}

sub hadoop_into {
  my ($outfile, $mapper, $combiner, $reducer) = @_;
  my $streaming_jar = $ENV{NFU_HADOOP_STREAMING} // find_hadoop_streaming;

  # Various shorthands for common cases. Both mapper and reducer being the
  # identity function is used to repartition stuff, so we want this to be easy
  # to type and recognize.
  $mapper   = $0     if $mapper   eq ':';
  $reducer  = $0     if $reducer  eq ':';
  $reducer  = 'NONE' if $reducer  =~ /^[-_]$/;
  $combiner = $0     if $combiner eq ':';
  $combiner = 'NONE' if $combiner =~ /^[-_]$/;
  $combiner = 'NONE' if $reducer  eq 'NONE';

  my @input_files;
  my @delete_afterwards;

  # Figure out where our input seems to be coming from. This is a little hacky,
  # but I think it's worthwhile for the flexibility we get.
  #
  # Hadoop jobs output their list of output partfile names because the data is
  # often too large. It's possible we'll get one of these as stdin, so each
  # line will look like "hdfs:/...". In that case, we want to use each as an
  # input file. Otherwise we'll want to upload stdin and all non-HDFS files to
  # HDFS and use those.
  #
  # It's actually a bit tricky to figure out which files started out as hdfs:
  # locations because by now they've all been filename alias-expanded. We need
  # to reverse-engineer the alias if we want to reuse stuff already on HDFS.

  my @other_argv;
  while (@ARGV) {
    local $_ = unexpand_filename_shorthands shift @ARGV;
    if (s/^hdfs:(?:\/\/)?//) {
      push @input_files, '-input', $_;
    } else {
      push @other_argv, expand_filename_shorthands $_;
    }
  }

  @ARGV = @other_argv;
  my $line = undef;
  unless (-t STDIN) {
    chomp($line), push @input_files, '-input', $line
    while defined($line = <STDIN>) && $line =~ s/^hdfs://;
  }

  # At this point $line is either undefined or contains a line we shouldn't
  # have read from stdin (i.e. it's data). If the latter, prepend it to the
  # upload to HDFS.
  if (defined $line) {
    my $tempfile = hadoop_tempfile;
    open my $fh, "| " . hadoop('fs', '-put', '-', $tempfile) . ' 1>&2'
      or die "failed to open hadoop fs -put process for uploading: $!";
    print $fh $line;
    be_verbose_as_appropriate(length), print $fh $_ while <STDIN>;
    close $fh;

    push @input_files,       '-input', $tempfile;
    push @delete_afterwards, $tempfile;
  }

  if (@other_argv) {
    my $tempfile = hadoop_tempfile;
    open my $fh, "| " . hadoop('fs', '-put', '-', $tempfile) . ' 1>&2'
      or die "failed to open hadoop fs -put process for uploading: $!";
    be_verbose_as_appropriate(length), print $fh $_ while <>;
    close $fh;

    push @input_files,       '-input', $tempfile;
    push @delete_afterwards, $tempfile;
  }

  # Now all the input files are in place, so we can kick off the job.
  my $extra_args = $ENV{NFU_HADOOP_OPTIONS} // '';
  my @file_deps  = map {('-file', $_)}
                   ($0, split /,/, $ENV{NFU_HADOOP_FILES} // '');
  my $dirname    = $0 =~ s/\/nfu$/\//r;

  # We're uploading ourselves, so we'll need a current-directory reference to
  # nfu. When nfu quotes itself, it produces an absolute path instead; this
  # code rewrites those into ./nfu.
  my $transformed_mapper   = $mapper   =~ s|\Q$dirname\E|./|gr;
  my $transformed_combiner = $combiner =~ s|\Q$dirname\E|./|gr;
  my $transformed_reducer  = $reducer  =~ s|\Q$dirname\E|./|gr;

  # Write the mapper and reducer commands into files rather than passing them
  # straight to hadoop streaming. This bypasses two problems:
  #
  # 1. Hadoop streaming might chop long arguments sooner than bash.
  # 2. It word-splits differently from bash, which breaks shell_quote.

  my $mapper_file   = tmpnam;
  my $combiner_file = tmpnam;
  my $reducer_file  = tmpnam;

  open my $mapper_fh,  '>', $mapper_file
    or die "failed to create tempfile $mapper_file for map job: $!";
  open my $combiner_fh,  '>', $combiner_file
    or die "failed to create tempfile $combiner_file for combine job: $!";
  open my $reducer_fh, '>', $reducer_file
    or die "failed to create tempfile $reducer_file for reduce job: $!";
  print $mapper_fh   "#!/bin/bash\n$transformed_mapper\n";
  print $combiner_fh "#!/bin/bash\n$transformed_combiner\n";
  print $reducer_fh  "#!/bin/bash\n$transformed_reducer\n";
  close $mapper_fh;
  close $combiner_fh;
  close $reducer_fh;

  chmod 0755, $mapper_file;
  chmod 0755, $combiner_file;
  chmod 0755, $reducer_file;

  my $jobname = "nfu streaming ["
              . join(' ', grep $_ ne '-input', @input_files)
              . "]: map($transformed_mapper), "
              . "combine($transformed_combiner), "
              . "reduce($transformed_reducer), "
              . "options($extra_args)"
              . " > $outfile";

  my $hadoop_command =
    hadoop('jar',
           shell_quote($streaming_jar)
           . " -D mapred.job.name=" . shell_quote($jobname) . " $extra_args",
           @file_deps,
           @input_files,
           '-file',    $mapper_file,
           '-file',    $combiner_file,
           '-file',    $reducer_file,
           '-output',  $outfile,
           '-mapper',  "./" . ($mapper_file =~ s/^.*\///r),
           $combiner ne 'NONE'
             ? ('-combiner', "./" . ($combiner_file =~ s/^.*\///r))
             : (),
           '-reducer', $reducer eq 'NONE'
                         ? $reducer
                         : "./" . ($reducer_file =~ s/^.*\///r));

  system $hadoop_command . ' 1>&2'
    and die "failed to execute hadoop command $hadoop_command: $!";

  unlink $mapper_file;
  unlink $combiner_file;
  unlink $reducer_file;

  system hadoop('fs', '-rm', '-r', @delete_afterwards) . ' 1>&2'
    if @delete_afterwards;
  "hdfs:$outfile";
}

sub sql_infer_column_type {
  # Try to figure out the right type for a column based on some values for it.
  # The possibilities are 'text', 'integer', or 'real'; this is roughly the set
  # of stuff supported by both postgres and sqlite.
  return 'integer' unless grep length && !/^-?[0-9]+$/, @_;
  return 'real'    unless grep length &&
                               !/^-?[0-9]+$
                               | ^-?[0-9]+(?:\.[0-9]+)?(?:[eE][-+]?[0-9]+)?$
                               | ^-?[0-9]*   \.[0-9]+  (?:[eE][-+]?[0-9]+)?$/x,
                               @_;
  return 'text';
}

sub sql_infer_schema {
  # Takes a list of TSV lines and generates a table schema that will store
  # them.
  my $n       = max map scalar(split /\t/), @_;
  my @columns = map [], 1 .. $n;
  for (@_) {
    my @vs = split /\t/;
    push @{$columns[$_]}, $vs[$_] for 0 .. $#vs;
  }

  my @types = map sql_infer_column_type(@$_), @columns;
  join ', ', map sprintf("f%d %s", $_, $types[$_]), 0 .. $#columns;
}

sub sql_schema_and_buffer {
  # WARNING: this function modifies its argument
  my ($schema) = @_;
  my @read;
  if ($schema eq '_') {
    # Infer the schema, which involves reading some data up front.
    push @read, $_ while @read < SQL_INFER_PEEK_LINES
                         and $diamond_has_data &&= defined($_ = <>);
    $schema = sql_infer_schema @read;
  }
  $_[0] = $schema;
  @read;
}

sub sql_parse_args {
  my ($dbt, $schema, $query) = @_;
  my @pieces = split /:/, $dbt;

  if (@pieces > 1) {
    my $table = pop @pieces;
    my $db    = join ':', @pieces;
    ($db, $table, $schema, $query);
  } else {
    # Use a default table called 't' with an automatic index
    ($pieces[0], '+t', $schema, $query);
  }
}

sub write_buffer_and_stdin {
  my ($fh, @buffer) = @_;
  be_verbose_as_appropriate(length), $fh->print($_) for @buffer;
  be_verbose_as_appropriate(length), $fh->print($_)
    while $diamond_has_data &&= defined($_ = <>);
}

sub sql_first_column_index {
  # WARNING: this function modifies its first argument
  my ($table, $schema) = @_;
  my $column = $schema =~ s/\s.*$//r;
  my $index  = $table =~ s/^\+//
    ? "CREATE INDEX $table$column ON $table($column);\n"
    : '';
  $_[0] = $table;
  $index;
}

my %functions = (
  read => sub {
    while (<>) {
      chomp;
      my $f = expand_filename_shorthands $_, 1;
      open my $fh, $f or die "failed to open pseudofile $_ ($f): $!";
      be_verbose_as_appropriate(length), print while <$fh>;
      close $fh;
    }
  },

  buffer => sub {
    my $f = $_[0] =~ /^[-_:]$/ ? tmpnam : $_[0];
    open my $fh, '>', $f or die "failed to open buffer file $f: $!";
    be_verbose_as_appropriate(length), $fh->print($_) while <>;
    close $fh;
    print $f, "\n";
  },

  branch => sub {
    my (@cases) = split /\n/, $_[0];
    my %branches;
    my %branch_matchers;
    my @order;

    for (@cases) {
      my ($k, $v) = map unpack('u', $_), split /\t/;
      $v //= '';
      open my $fh, "| $0 $v"
        or die "failed to open branch subprocess '$0 $v': $!";
      push @order, $k;
      $branches{$k} = $fh;
      $branch_matchers{$k} =
        $k =~ s/^:// ? compile_eval_into_function $k, 'branch matcher function'
      : $k eq '_'    ? sub { 1 }
                     : sub { $_[0] eq $k };
    }

    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      my @xs = split /\t/;
      my @matching = grep $branch_matchers{$_}->(@xs), @order;

      die "branch failed to match '$xs[0]' against any of ( @order )"
      unless @matching;
      $branches{$matching[0]}->print(row(@xs), "\n");
    }

    close for values %branches;
  },

  tcp => sub {
    my ($hostport)    = @_;
    my ($host, $port) = $hostport =~ /:/ ? split /:/, $hostport
                                         : ('0.0.0.0', $hostport);

    socket my($serversock), PF_INET, SOCK_STREAM, getprotobyname 'tcp'
      or die "socket failed: $!";
    setsockopt $serversock, SOL_SOCKET, SO_REUSEADDR, pack 'l', 1
      or die "setsockopt failed: $!";
    bind $serversock, sockaddr_in $port, INADDR_ANY
      or die "bind failed: $!";
    listen $serversock, SOMAXCONN
      or die "listen failed: $!";

    while (1) {
      # Fork twice per connection. This is egregious but necessary because the
      # open() call blocks on a FIFO until the other end is connected, and we
      # want the FIFO consumer to be at liberty to open them in either order
      # without creating a deadlock.
      my ($paddr, $client);
      unless ($paddr = accept $client, $serversock) {
        next if $!{EINTR};
        die "accept failed: $!";
      }

      my ($port, $iaddr) = sockaddr_in $paddr;
      my $iname = gethostbyaddr $iaddr, AF_INET;

      my $r = tmpnam; mkfifo $r, 0700 or die "failed to create FIFO $r: $!";
      my $w = tmpnam; mkfifo $w, 0700 or die "failed to create FIFO $w: $!";

      print join("\t", $r, $w, $iname, $port), "\n";

      # Socket reads: write to the "reader" fifo
      unless (fork) {
        my $buf;
        sysopen my $fh, $r, O_WRONLY or die "failed to open $r for writing: $!";
        syswrite $fh, $buf while sysread $client, $buf, 8192;
        close $fh;
        close $client;
        unlink $r;
        exit;
      }

      # Socket writes: read from the "writer" fifo
      unless (fork) {
        my $buf;
        sysopen my $fh, $w, O_RDONLY or die "failed to open $w for writing: $!";
        syswrite $client, $buf while sysread $fh, $buf, 8192;
        close $fh;
        close $client;
        unlink $w;
        exit;
      }

      close $client;
    }
  },

  http => sub {
    # Connect this to a --tcp thing to do HTTP stuff. In practice this just
    # means parsing out the headers and handing off parsed data to the command.
    my ($command) = @_;
    open my $fh, "| $command" or die "http failed to launch $command: $!";
    select((select($fh), $| = 1)[0]);

    while (<>) {
      chomp;
      my ($in, $out, $port, $iaddr) = split /\t/;
      open my $indata, '<', $in or die "http failed to read socket $in: $!";

      my $url;
      my %headers;
      while (<$indata>) {
        chomp;
        last if length($_) <= 1;
        $headers{$1} = $2 if  defined $url && /^([^:]+):\s*(.*)$/;
        $url         = $1 if !defined $url && /^[A-Z]+\s+(\S+)/;
      }
      close $indata;
      my $http_data = row($out, $url, je({%headers}), $port, $iaddr) . "\n";
      $fh->print($http_data);
    }
  },

  group  => sub {exec_with_diamond 'sort', sort_options @_},
  rgroup => sub {exec_with_diamond 'sort', '-r', sort_options @_},
  order  => sub {exec_with_diamond 'sort', '-g', sort_options @_},
  rorder => sub {exec_with_diamond 'sort', '-rg', sort_options @_},

  count => sub {
    # Same behavior as uniq -c, but delimits counts with \t; also takes an
    # optional series of columns to uniq by, rather than using the whole row.
    my @columns = split //, shift // '';
    my $last;
    my @last;
    my $count = -1;

    while (<>) {
      be_verbose_as_appropriate length;
      chomp;

      my @xs = split /\t/;
      @xs   = @xs[@columns] if @columns;
      $last = $_, @last = @xs unless ++$count;

      for (my $i = 0; $i < max scalar(@xs), scalar(@last); ++$i) {
        if (!defined $xs[$i] || !defined $last[$i] || $xs[$i] ne $last[$i]) {
          print "$count\t$last\n";
          $count = 0;
          @last  = @xs;
          $last  = $_;
          last;
        }
      }
    }

    ++$count;
    print "$count\t$last\n" if defined $last;
  },

  uncount => sub {
    while (<>) {
      be_verbose_as_appropriate length;
      my ($n, $line) = split /\t/, $_, 2;
      $line //= "\n";
      print $line for 1..$n;
    }
  },

  index => sub {
    # Inner join by appending joined fields to the end.
    my ($f1, $f2, $join_file) = parse_join_options @_;

    my $sorted_index = fifo_for $join_file, sort_cmd "-t '\t' -k${f2}b,$f2";
    my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" .
                  "| join -t '\t' -1 $f1 -2 $f2 - '$sorted_index'";

    open my $to_join, "| $command" or die "failed to exec $command: $!";
    be_verbose_as_appropriate(length), print $to_join $_ while <>;
    close $to_join;
  },

  indexouter => sub {
    # Outer left join by appending joined fields to the end.
    my ($f1, $f2, $join_file) = parse_join_options @_;

    my $sorted_index = fifo_for $join_file, sort_cmd "-t '\t' -k ${f2}b,$f2";
    my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" .
                  "| join -a 1 -t '\t' -1 $f1 -2 $f2 - '$sorted_index'";

    open my $to_join, "| $command" or die "failed to exec $command: $!";
    be_verbose_as_appropriate(length), print $to_join $_ while <>;
    close $to_join;
  },

  join => sub {
    # Inner join against sorted data by appending joined fields to the end.
    my ($f1, $f2, $join_file) = parse_join_options @_;

    my $sorted_index = fifo_for $join_file;
    my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" .
                  "| join -t '\t' -1 $f1 -2 $f2 - '$sorted_index'";

    open my $to_join, "| $command" or die "failed to exec $command: $!";
    be_verbose_as_appropriate(length), print $to_join $_ while <>;
    close $to_join;
  },

  joinouter => sub {
    # Outer left join against sorted data by appending joined fields to the
    # end.
    my ($f1, $f2, $join_file) = parse_join_options @_;

    my $sorted_index = fifo_for $join_file;
    my $command = sort_cmd "-t '\t' -k ${f1}b,$f1" .
                  "| join -a 1 -t '\t' -1 $f1 -2 $f2 - '$sorted_index'";

    open my $to_join, "| $command" or die "failed to exec $command: $!";
    be_verbose_as_appropriate(length), print $to_join $_ while <>;
    close $to_join;
  },

  with => sub {
    # Like 'paste'. Joins lines with \t.
    my ($f) = @_;
    open my $fh, expand_filename_shorthands $f, 1
      or die "failed to open --with pseudofile $f: $!";
    my ($part1, $part2);
    while (defined($part1 = <>) and defined($part2 = <$fh>)) {
      be_verbose_as_appropriate length($part1) + length($part2);
      chomp $part1;
      chomp $part2;
      print $part1, "\t", $part2, "\n";
    }
    close $fh;
  },

  repeat => sub {
    my ($n, $f) = @_;
    my $count = 0;
    while (!$n || $count++ < $n) {
      open my $fh, expand_filename_shorthands $f, 1
        or die "failed to open --repeat pseudofile $f: $!";
      be_verbose_as_appropriate(length), print while <$fh>;
      close $fh;
    }
  },

  octave => sub {
    my ($commands) = @_;
    my $temp       = tmpnam;
    open my $fh, '>', $temp or die $!;
    be_verbose_as_appropriate(length), print $fh $_ while <>;
    close $fh;

    system 'octave',
           '-q',
           '--eval', "xs = load(\"$temp\");"
                     . "unlink(\"$temp\");"
                     . "save_precision(48);"
                     . "$commands;"
                     . "save -text $temp xs" and die "octave command failed";

    open $fh, '<', $temp;
    /^\s*#/ or
    /^\s*$/ or
    print join("\t", map $_ eq "NA" ? $_ : 0 + $_, grep length, split /\s+/),
          "\n" while <$fh>;
    close $fh;
    unlink $temp;
  },

  numpy => sub {
    my ($commands) = @_;
    my $temp       = tmpnam;
    open my $fh, '>', $temp or die $!;
    be_verbose_as_appropriate(length), print $fh $_ while <>;
    close $fh;

    # Fix up the indentation for cases like this:
    # $ nfu ... --numpy 'xs += 1
    #                    xs *= 4'       # no real indent here
    #
    # $ nfu ... --numpy 'if something:
    #                      xs += 1'     # minor indent here (assume 2)

    my @lines   = split /\n/, $commands;
    my @indents = map length(s/\S.*$//r), @lines;
    my $indent  = @lines > 1 ? $indents[1] - $indents[0] : 0;

    # If we're expecting an indentation of some amount after the first line, we
    # need to be careful: we don't know how much the user decided to indent the
    # block, and if we get it wrong then Python will complain at the next
    # outdent. (If there's no outdent, then we can use anything.)
    $indent = min $indent - 1, @indents[2..$#indents]
      if $lines[0] =~ /:\s*(#.*)?$/ && @lines > 2;

    my $spaces = ' ' x $indent;
    $lines[$_] =~ s/^$spaces// for 1..$#lines;
    $commands = join "\n", @lines;

    system 'python',
           '-c',
           "
import numpy as np
xs = np.loadtxt(\"$temp\")
$commands
np.savetxt(\"$temp\", xs, delimiter=\"\\t\")" and die "numpy command failed";

    open $fh, '<', $temp;
    /^\s*#/ or
    /^\s*$/ or
    print join("\t", map $_ eq "NA" ? $_ : 0 + $_, grep length, split /\s+/),
          "\n" while <$fh>;
    close $fh;
    unlink $temp;
  },

  stateful_unary_fn('average',
    sub {my ($size, $n, $total) = ($_[0] // 0, 0, 0);
         [$size, $n, $total, []]},
    sub {
      my ($x, $state) = @_;
      my ($size, $n, $total, $window) = @$state;
      $total += $x;
      ++$n;
      my $v = $total / ($n > $size && $size ? $size : $n);
      $total -= shift @$window if $size and push(@$window, $x) >= $size;
      $$state[1] = $n;
      $$state[2] = $total;
      $v;
    }),

  stateful_unary_fn('intify',
    sub {[{}, 0]},
    sub {
      my ($x, $state) = @_;
      $state->[0]->{$x} //= $state->[1]++;
    }),

  aggregate => sub {
    my $f = compile_eval_into_function $_[0], 'aggregate function';
    my @columns;
    while (my $line = <>) {
      be_verbose_as_appropriate length $line;
      chomp $line;
      my @fields = split /\t/, $line;

      # Two cases here. If the new record is compatible with the most recent
      # existing one, or there aren't any existing ones, then group it and
      # don't call the aggregator yet.
      #
      # If we see a change, then call the aggregator and empty out the group.
      #
      # Note that the aggregator function is called on columns, not rows.

      my $n = @columns && @{$columns[0]};
      if (!$n or $fields[0] eq ${$columns[0]}[0]) {
        $columns[$_][$n] = $fields[$_] for 0 .. $#fields;
      } else {
        $_ = ${$columns[0]}[0];
        print $_, "\n" for $f->(@columns);
        @columns = ();
        $columns[$_][0] = $fields[$_] for 0 .. $#fields;
      }
    }
    if (@columns) {
      $_ = ${$columns[0]}[0];
      print $_, "\n" for $f->(@columns);
    }
  },

  fold => sub {
    my $f = compile_eval_into_function $_[0], 'fold function';
    my @saved;
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      my $line = $_;
      if ($f->(split /\t/)) {
        push @saved, $line;
      } else {
        print row(@saved), "\n" if @saved;
        @saved = ($line);
      }
    }
    print row(@saved), "\n" if @saved;
  },

  stateless_unary_fn('log', sub {
    my ($x, $base) = @_;
    my $log = log $x;
    $log /= log $base if defined $base;
    $log;
  }),

  stateless_unary_fn('exp', sub {
    my ($x, $base) = @_;
    defined $base ? $base ** $x : exp $x;
  }),

  stateless_unary_fn('quant', sub {
    my ($x, $quantum) = @_;
    round_to $x, $quantum;
  }),

  # Note: this needs to be stdin; otherwise "nfu -p %l filename" will fail
  # (since exec_with_diamond trieds to pass filename straight into gnuplot).
  plot => sub {
    exec_with_stdin 'gnuplot',
                    '-e',
                    'plot "-" ' . join(' ', expand_gnuplot_options @_),
                    '-persist';
  },

  splot => sub {
    exec_with_stdin 'gnuplot',
                    '-e',
                    'splot "-" ' . join(' ', expand_gnuplot_options @_),
                    '-persist';
  },

  mplot => sub {
    my @gnuplot_options = split /;/, join ' ', expand_gnuplot_options @_;
    my $fname = tmpnam;
    my $cols  = 0;
    open my $fh, '>', $fname or die "failed to open tempfile for mplot: $!";
    while (<>) {
      be_verbose_as_appropriate length;
      $cols = max $cols, 1 + scalar(my @xs = /\t/g);
      print $fh $_;
    }
    close $fh;

    # If we're requesting only one plot, assume the intent is to replicate
    # those settings across every observed column.
    my $plot_command =
      'plot ' . join ',',
                  @gnuplot_options > 1
                    ? map("\"$fname\" $_", @gnuplot_options)
                    : map("\"$fname\" using $_ $gnuplot_options[0]", 1..$cols);

    system 'gnuplot', '-e', $plot_command, '-persist';

    # HACK: the problem is that gnuplot internally forks a subprocess for the
    # plot window, which we won't be able to see from here (that I know of). If
    # we delete the file before that subprocess exits, then any zoom operations
    # will cause gnuplot to abruptly exit.
    #
    # I'm sure there's a better way to solve this, but for now this should do
    # the job for now.
    unless (fork) {
      setsid;
      close STDIN;
      close STDOUT;
      unless (fork) {
        sleep 3600;
        unlink $fname or die "failed to unlink $fname: $!";
      }
    }
  },

  poll => sub {
    my ($sleep, $command) = @_;
    die "usage: --poll sleep-amount 'command ...'"
      unless defined $sleep and defined $command;
    system($command), sleep $sleep while 1;
  },

  stateful_unary_fn('delta',
    sub {[0]},
    sub {my ($x, $state) = @_;
         my $v = $x - $$state[0];
         $$state[0] = $x;
         $v}),

  stateful_unary_fn('sum',
    sub {[0]},
    sub {my ($x, $state) = @_;
         $$state[0] += $x}),

  stateful_unary_fn('variance',
    sub {[0, 0, 0]},
    sub {my ($x, $state) = @_;
         $$state[0] += $x;
         $$state[1] += $x * $x;
         $$state[2]++;
         my ($sx, $sx2, $count) = @$state;
         ($sx2 - ($sx * $sx / $count)) / ($count - 1 || 1)}),

  stateful_unary_fn('sd',
    sub {[0, 0, 0]},
    sub {my ($x, $state) = @_;
         $$state[0] += $x;
         $$state[1] += $x * $x;
         $$state[2]++;
         my ($sx, $sx2, $count) = @$state;
         sqrt(($sx2 - ($sx * $sx / $count)) / ($count - 1 || 1))}),

  stateful_unary_fn('entropy',
    # state contains [$total, $entropy_so_far] and uses the following
    # associative combiner (where F(X) = frequency of X, unscaled probability):
    #
    # let t = F(A) + F(B)
    # H(A + B) = F(A)/t * (-log(F(A)/t) + H(A))
    #          + F(B)/t * (-log(F(B)/t) + H(B))

    sub {[0, 0]},
    sub {my ($x, $state) = @_;
         my ($f0, $h0)   = @$state;
         my $f           = $$state[0] += $x;
         my $p           = $x  / $f;
         my $p0          = $f0 / $f;
         $$state[1]      = $p0 * (($p0 > 0 ? -log($p0) / LOG_2 : 0) + $h0)
                         + $p  *  ($p  > 0 ? -log($p)  / LOG_2 : 0)}),

  take => sub {
    if ($_[0] =~ s/^\+//) {
      # Take last n, so we need a line queue
      my @q;
      my $i = 0;
      be_verbose_as_appropriate(length), $q[$i++ % $_[0]] = $_ while <>;
      print for @q[$i % $_[0] .. $#q];
      print for @q[0 .. $i % $_[0] - 1];
    } else {
      my $n = $_[0] // 1;
      while (<>) {
        be_verbose_as_appropriate length;
        last if --$n < 0;
        print;
      }
    }
  },

  sample => sub {
    while (<>) {
      be_verbose_as_appropriate length;
      print if rand() < $_[0];
    }
  },

  drop => sub {
    my $n = $_[0] // 1;
    if ($n) {
      while (<>) {
        be_verbose_as_appropriate length;
        last if --$n <= 0;
      }
    }
    be_verbose_as_appropriate(length), print while <>;
  },

  map => sub {
    my $f = compile_eval_into_function $_[0], 'map function';
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      print "$_\n" for $f->(split /\t/);
    }
  },

  pmap => sub {
    my @fhs;
    my $wbits = '';
    my $wout  = '';
    my $i     = 0;

    for (1 .. $ENV{NFU_PMAP_PARALLELISM} // 16) {
      my $mapper = quote_self '--child', @evaled_code, '--map', $_[0];
      open my $fh, "| $mapper"
        or die "failed to open child process $mapper: $!";

      vec($wbits, fileno($fh), 1) = 1;
      push @fhs, $fh;
    }

    while (<>) {
      be_verbose_as_appropriate length;
      select undef, $wout = $wbits, undef, undef;
      ++$i until vec($wout, fileno $fhs[$i % @fhs], 1);
      syswrite $fhs[$i++ % @fhs], $_;
    }
    close for @fhs;
  },

  keep => sub {
    my $f = $_[0] =~ /^\d+$/
          ? eval "sub {" . join("&&", map "\$_[$_]", split //, $_[0]) . "}"
          : compile_eval_into_function $_[0], 'keep function';
    while (<>) {
      my $line = $_;
      be_verbose_as_appropriate length;
      chomp;
      my @xs = split /\t/;
      print $line if $f->(@xs);
    }
  },

  remove => sub {
    my $f = $_[0] =~ /^\d+$/
          ? eval "sub {" . join("&&", map "\$_[$_]", split //, $_[0]) . "}"
          : compile_eval_into_function $_[0], 'remove function';
    while (<>) {
      my $line = $_;
      be_verbose_as_appropriate length;
      chomp;
      my @xs = split /\t/;
      print $line unless $f->(@xs);
    }
  },

  each => sub {
    my ($template) = @_;
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      my $c = $template =~ s/\{\}/$_/gr;
      system $c and die "each: failed to run $c: $!";
    }
  },

  every => sub {
    my ($n) = @_;
    my $i = 0;
    while (<>) {
      be_verbose_as_appropriate length;
      print unless $i++ % $n;
    }
  },

  fields => sub {
    my ($fields)   = @_;
    my $everything = $fields =~ s/\.$//;
    my @fs         = split //, $fields;
    $everything &&= 1 + max @fs;

    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      my @xs = split /\t/;
      my @ys = @xs[@fs];
      push @ys, @xs[$everything .. $#xs] if $everything;
      print join("\t", map $_ // '', @ys), "\n";
    }
  },

  fieldsplit => sub {
    my $pattern = $implosions{$_[0]} // $_[0];
    $pattern    = $fieldsplit_shorthands{$pattern} // $pattern;
    my $delim   = qr/$pattern/;
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      print join("\t", split /$delim/), "\n";
    }
  },

  number => sub {
    my $n = 0;
    while (<>) {
      be_verbose_as_appropriate length;
      chomp;
      print row(++$n, $_), "\n";
    }
  },

  ntiles => sub {
    my ($n)        = @_;
    my $line_count = 0;
    my $fifo       = tmpnam;
    mkfifo $fifo, 0700 or die "failed to create fifo: $!";
    open my $sorted, sort_cmd('-g', $fifo) . " |"
      or die "failed to create sort process: $!";
    open my $fifo_fh, '>', $fifo or die "failed to write to fifo: $!";

    # Push data into the sort process, keeping track of the number of lines.
    # We'll use this count later to use constant rather than linear space.
    while (<>) {
      ++$line_count;
      be_verbose_as_appropriate(length);
      $fifo_fh->print($_);
    }
    close $fifo_fh;
    unlink $fifo;

    # Ok, now grab the data for each of the N sampling points. Here's what
    # we're doing:
    #
    # 0           1           2    <- things we grab for 2-tiles
    # 1 2 3 4 5 6 7 8 9 a b c d    <- data points
    #
    # 0          1          2      <- things we grab for 2-tiles
    # 1 2 3 4 5 6 7 8 9 a b c      <- data points
    #
    # 0     1     2     3     4    <- quartiles
    # 1   2   3   4   5   6   7    <- data points
    #
    # To make this work, we just keep track of the previous data point as we're
    # reading.

    # Normal case
    my $i        = 0;
    my $previous = undef;
    my $max_line = $line_count - 1;
    while (<$sorted>) {
      chomp;
      $previous = $_ unless defined $previous;

      my $break = int($i * $n / $max_line) / $n * $max_line;
      if ($i >= $break && $i - $break < 1) {
        # Take this row, performing a weighted average with the previous one:
        #
        #   break
        # x   V           y
        # |               |
        # ----|------------
        #  $w     1-$w
        #
        # We want x*(1-$w) + y*$w.

        my $w = $break - int $break || 1;
        my $v = $previous * (1 - $w) + $_ * $w;
        print $v, "\n";
      }

      ++$i;
      $previous = $_;
    }
    close $sorted;
  },

  prepend => sub {
    open my $fh, expand_filename_shorthands $_[0], 1
      or die "failed to open --prepend pseudofile $_[0]: $!";
    be_verbose_as_appropriate(length), print while <$fh>;
    close $fh;
    print while <>;
  },

  append => sub {
    open my $fh, expand_filename_shorthands $_[0], 1
      or die "failed to open --append pseudofile $_[0]: $!";
    print while <>;
    be_verbose_as_appropriate(length), print while <$fh>;
    close $fh;
  },

  pipe => sub {
    open my $fh, "| $_[0]" or die "failed to launch $_[0]: $!";
    be_verbose_as_appropriate(length), print $fh $_ while <>;
    close $fh;
  },

  tee => sub {
    open my $fh, "| $_[0]" or die "failed to launch $_[0]: $!";
    $SIG{PIPE} = 'IGNORE';
    while (<>) {
      be_verbose_as_appropriate length;
      $fh->print($_);
      print;
    }
    close $fh;
  },

  duplicate => sub {
    open my $fh1, "| $_[0]" or die "failed to launch $_[0]: $!";
    open my $fh2, "| $_[1]" or die "failed to launch $_[1]: $!";

    # Important: keep going even if a subprocess rejects data. Otherwise things
    # like "nfu --duplicate ^T1 ^T+1" will produce truncated output.
    $SIG{PIPE} = 'IGNORE';

    while (<>) {
      be_verbose_as_appropriate length;
      $fh1->print($_);
      $fh2->print($_);
    }
    close $fh1;
    close $fh2;
  },

  partition => sub {
    my ($splitter, $cmd) = @_;
    my %fhs;
    my $f = compile_eval_into_function $splitter, 'partition function';

    # Important: keep going even if a subprocess rejects data. Otherwise things
    # like "nfu --partition ... ^T10" will produce truncated output.
    $SIG{PIPE} = 'IGNORE';

    my @open_partitions;
    while (<>) {
      be_verbose_as_appropriate length;
      my $line = $_;
      chomp(my $cline = $line);
      my $p = $f->(split /\t/, $cline);
      unless (exists $fhs{$p}) {
        my $cmdsub = $cmd =~ s/\{\}/$p/gr =~ s/\{\.(\.*)\}/\{$1\}/gr;
        open $fhs{$p}, "| $cmdsub" or die "failed to launch $cmdsub: $!";
        push @open_partitions, $p;
      }
      $fhs{$p}->print($line);
      close($fhs{$p = shift @open_partitions}), delete $fhs{$p}
        while @open_partitions > ($ENV{NFU_MAX_FILEHANDLES} // 64);
    }
    close for values %fhs;
  },

  hadoopc => sub {
    my ($outfile, $mapper, $combiner, $reducer) = @_;
    if ($outfile eq '.') {
      # Print output data to stdout, then delete outfile
      my $filename  = hadoop_tempfile;
      my @partfiles = map s/^hdfs:(?:\/\/)?//r,
                          hadoop_ls hadoop_into $filename, $mapper, $combiner, $reducer;
      open my $fh, "| xargs hadoop fs -text"
        or die "failed to execute xargs: $!";
      $fh->print("$_\n") for @partfiles;
      close $fh;
      system hadoop('fs', '-rm', '-r', $filename) . ' 1>&2';
    } else {
      $outfile = hadoop_tempfile if $outfile eq '@';
      print hadoop_into($outfile, $mapper, $combiner, $reducer), "\n";
    }
  },

  hadoop => sub {
    my ($outfile, $mapper, $reducer) = @_;
    if ($outfile eq '.') {
      # Print output data to stdout, then delete outfile
      my $filename  = hadoop_tempfile;
      my @partfiles = map s/^hdfs:(?:\/\/)?//r,
                          hadoop_ls hadoop_into $filename, $mapper, 'NONE', $reducer;
      open my $fh, "| xargs hadoop fs -text"
        or die "failed to execute xargs: $!";
      $fh->print("$_\n") for @partfiles;
      close $fh;
      system hadoop('fs', '-rm', '-r', $filename) . ' 1>&2';
    } else {
      $outfile = hadoop_tempfile if $outfile eq '@';
      print hadoop_into($outfile, $mapper, 'NONE', $reducer), "\n";
    }
  },

  sql => sub {
    my ($db, $table, $schema, $query) = sql_parse_args @_;
    my @read  = sql_schema_and_buffer $schema;
    my $index = sql_first_column_index $table, $schema;
    $query = expand_sql_shorthands $query;

    if ($db =~ s/^P//) {
      # postgres
      my $edb = expand_postgres_db $db;
      my $q = $query eq '_' ? ''
                            : "COPY ($query) TO STDOUT WITH NULL AS '';\n";

      open my $fh, "| psql -c "
                 . shell_quote("DROP TABLE IF EXISTS $table;\n"
                             . "CREATE TABLE $table ($schema);\n"
                             . "COPY $table FROM STDIN;\n"
                             . $index
                             . $q)
                 . " $edb 1>&2"
        or die "psql: failed to open psql for table $table $!";

      write_buffer_and_stdin $fh, @read;
      close $fh;
      print "sql:P$db:select * from $table\n" unless length $q;
    } elsif ($db =~ s/^S//) {
      # sqlite3
      my $fifo_name = tmpnam;
      mkfifo $fifo_name, 0700 or die "failed to create fifo: $!";

      my $child = fork;
      unless ($child) {
        system "echo "
             . shell_quote(".mode tabs\n"
                         . "DROP TABLE IF EXISTS $table;\n"
                         . "CREATE TABLE $table ($schema);\n"
                         . ".import $fifo_name $table\n"
                         . $index
                         . ($query eq '_' ? '' : "$query;\n"))
             . "| sqlite3 " . shell_quote expand_sqlite_db $db;
      } else {
        open my $fh, '>', $fifo_name or die "failed to open fifo into sqlite: $!";
        write_buffer_and_stdin $fh, @read;
        close $fh;
        unlink $fifo_name;
        waitpid $child, 0;
        print "sql:S$db:select * from $table\n" if $query eq '_';
      }
    } else {
      die "unknown SQL prefix: " . substr($db, 0, 1)
        . " (valid prefixes are P and S)";
    }
  },

  preview => sub {
    $verbose = 0;               # don't print over the pager
    my $have_less = !system 'which less > /dev/null';
    my $have_more = !system 'which more > /dev/null';

    my $less_program = $have_less ? 'less'
                     : $have_more ? 'more' : 'cat';

    exec_with_stdin $less_program;
  },
);

my %bracket_handlers = (
  ''  => sub {my $stuff = shell_quote @_;
              "".qx|$0 --quote $stuff| =~ s/\s*$//r},
  '@' => sub {my $stuff = shell_quote @_;
              "sh:".(qx|$0 --quote $stuff| =~ s/\s*$//r)},
  q   => sub {shell_quote @_},
);

my %bracket_docs = (
  ''  => 'nfu as function: [ -gc ]     == "$(nfu --quote -gc)"',
  '@' => 'nfu as data:    @[ -gc foo ] == sh:"$(nfu --quote -gc foo)"',
  q   => 'quote things:   q[ foo bar ] == "foo bar"',
);

# Print usage if the user clearly doesn't know what they're doing.
if (@ARGV ? $ARGV[0] =~ /^-[h?]$/ || $ARGV[0] =~ /^--(usage|help)$/
          : -t STDIN) {

  # Some checks for me to make sure I'm keeping the code well-maintained
  exists $functions{$_}         or die "no function for $_" for keys %usages;
  exists $usages{$_}            or die "no usage for $_"    for keys %functions;
  exists $arity{$_}             or die "no arity for $_"    for keys %usages;
  exists $usages{$_ =~ s/--//r} or die "no usage for $_"
    for values %explosions, keys %usages;

  exists $bracket_docs{$_} or die "no bracket doc for $_"
    for keys %bracket_handlers;

  print STDERR "usage: nfu [prefix-commands...] [input-files...] commands...\n";
  print STDERR "where each command is one of the following:\n\n";

  my $len = 1 + max map length, keys %usages;
  my %short_lookup;
  $short_lookup{$explosions{$_} =~ s/^--//r} = $_ for keys %explosions;

  for my $cmd (sort keys %usages) {
    my $short = $short_lookup{$cmd};
    $short = defined $short ? "-$short|" : '   ';
    printf STDERR "  %s--%-${len}s(%d) %s\n",
                  $short,
                  $cmd,
                  $arity{$cmd},
                  $usages{$cmd} ? $arity{$cmd} ? "<$usages{$cmd}>"
                                               : "-- $usages{$cmd}" : '';
  }

  print STDERR "\nand prefix commands are:\n\n";

  print STDERR "  documentation (not used with normal commands):\n";
  print STDERR "    --explain           <other-options>\n";
  print STDERR "    --expand-pseudofile <filename>\n";
  print STDERR "    --expand-code       <code>\n";
  print STDERR "    --expand-gnuplot    <gnuplot options>\n";
  print STDERR "    --expand-sql        <sql>\n";

  print STDERR "\n  pipeline modifiers:\n";
  print STDERR "    --quote     -- quotes args: eval \$(nfu --quote ...)\n";
  print STDERR "    --use       <file.pl>\n";
  print STDERR "    --run       <perl code>\n";

  print STDERR "\nargument bracket preprocessing:\n\n";

  print STDERR "  ^stuff -> [ -stuff ]\n\n";

  my $bracket_max = max map length, keys %bracket_docs;
  printf STDERR "  %${bracket_max}s[ ]    %s\n", $_, $bracket_docs{$_}
  for sort keys %bracket_docs;

  my $pseudofile_len = 1 + max map length, keys %pseudofile_docs;
  print STDERR "\npseudofile patterns:\n\n";
  printf STDERR "  %-${pseudofile_len}s %s\n", $_, $pseudofile_docs{$_}
  for sort keys %pseudofile_docs;

  print STDERR "\ngnuplot expansions:\n\n";
  printf STDERR "  %2s -> '%s'\n", $_, $gnuplot_aliases{$_}
  for sort keys %gnuplot_aliases;

  print STDERR "\nSQL expansions:\n\n";
  printf STDERR "  %2s -> '%s'\n", $_, $sql_aliases{$_}
  for sort keys %sql_aliases;

  print STDERR "\ndatabase prefixes:\n\n";
  printf STDERR "  %s = %s\n", @$_
  for ['P' => 'PostgreSQL'],
      ['S' => 'SQLite 3'];

  my $env_len = 1 + max map length, keys %env_docs;
  print STDERR "\nenvironment variables:\n\n";
  printf STDERR "  %-${env_len}s %s\n", $_, $env_docs{$_}
  for sort keys %env_docs;

  print STDERR "\n";
  print STDERR "see https://github.com/spencertipping/nfu for documentation\n";
  print STDERR "\n";

  exit 1;
}

if (@ARGV && $ARGV[0] =~ /^--expand/) {
  my ($command, $x, @others) = @ARGV;
  if ($command =~ /-pseudofile$/) {
    print expand_filename_shorthands($x) // '<invalid/nonexistent>', "\n";
  } elsif ($command =~ /-code$/) {
    print expand_eval_shorthands($x), "\n";
  } elsif ($command =~ /-gnuplot$/) {
    print expand_gnuplot_options($x), "\n";
  } elsif ($command =~ /-sql$/) {
    print expand_sql_shorthands($x), "\n";
  } else {
    print STDERR "unknown expansion command: $command\n";
    exit 1;
  }
  exit 0;
}

my @args_to_parse;

# Preprocess args to look for bracketed groups. These need to be collapsed into
# single args, which must happen before we start assigning arguments to
# commands.
while (@ARGV) {
  my $x = shift @ARGV;
  last if $x eq '--';
  if ($x =~ s/\[$//) {
    die "unknown bracket prefix: $x" unless exists $bracket_handlers{$x};
    my @xs;
    my $depth = 1;
    while (@ARGV) {
      my $next = shift @ARGV;
      $depth-- if $next eq ']';
      last unless $depth;
      push @xs, $next;
      $depth++ if $next =~ /\[$/;
    }
    unshift @ARGV, $bracket_handlers{$x}->(@xs);
  } elsif ($x =~ s/^\^//) {
    # Lift the command (as a short option) into a quoted nfu instance.
    $x = shell_quote $x;
    unshift @ARGV, ''.qx|$0 --quote -$x|;
  } elsif ($x =~ s/\{$//) {
    # Parse a branching map, which has the form { 'pattern' stuff... ,
    # 'pattern' stuff... , ... }. We generate a quoted TSV of packed base-64
    # values.
    die "unknown brace prefix: $x" if length $x;
    my @lines;
    my $key;
    my @value;
    my $depth = 1;
    while (@ARGV) {
      my $next = shift @ARGV;
      $depth-- if $next eq '}';

      if (!$depth || defined $key && $next eq ',') {
        push @lines, row pack('u', $key),
                         pack('u', shell_quote @value);
        @value = ();
        $key   = undef;
      } elsif (defined $key) {
        push @value, $next;
      } else {
        $key = $next;
      }

      last unless $depth;
      $depth++ if $next =~ /\{$/;
    }

    unshift @ARGV, join "\n", @lines;
  } elsif ($x eq '%') {
    # Everything else is a variable binding of the form 'x=y'. Then go back
    # through and rewrite all %x to be y.
    my %bindings = map split(/=/, $_, 2), @ARGV;
    my $names    = join '|', keys %bindings;
    @ARGV = ();
    s#%($names)#$bindings{$1} // "%$1"#ge for @args_to_parse;
  } else {
    push @args_to_parse, $x;
  }
}

sub explode {
  return $_[0] unless $_[0] =~ s/^-([^-])/$1/;
  map {$explosions{$_} // $_} grep length, split /([-+.\d]*),?/, $_[0];
}

my %custom_env;
my @parsed;
my $quote_self = 0;
my $explain    = 0;

while (@args_to_parse) {
  unshift @args_to_parse, explode shift @args_to_parse;
  (my $command = shift @args_to_parse) =~ s/^--//;

  if (defined(my $arity = $arity{$command})) {
    my @args;
    push @args, shift @args_to_parse
    while @args_to_parse && (--$arity >= 0
                             || ! -e $args_to_parse[0]
                                && $args_to_parse[0] =~ /^[-+]?\d+/);
    push @parsed, [$command, @args];
  } elsif ($command =~ /(\w+)=(.*)/) {
    $ENV{$1} = $custom_env{$1} = $2;
  } elsif ($command eq 'run') {
    my $x = shift @args_to_parse;
    push @evaled_code, '--run', $x;
    eval $x;
    die "failed to run '$x': $@" if $@;
  } elsif ($command eq 'use') {
    my $x = shift @args_to_parse;
    my $s = read_file $x;       # you can --use pseudofiles, woot!
    push @evaled_code, '--use', $x;
    eval $s;
    die "failed to use '$x': $@" if $@;
  } elsif ($command eq 'explain') {
    $explain = 1;
  } elsif ($command eq 'verbose' || $command eq 'v') {
    print STDERR "\033[2J" unless $quote_self;
    $verbose = 1;
  } elsif ($command eq 'child') {
    $is_child = 1;
  } elsif ($command eq 'quote') {
    $quote_self = 1;
  } else {
    if ($quote_self) {
      # Defer pseudofile resolution. This matters for things like intermediate
      # Hadoop outputs.
      push @ARGV, $command;
    } else {
      my $f = expand_filename_shorthands $command;
      die "nonexistent pseudofile: $command" unless defined $f;
      push @ARGV, $f;
    }
  }
}

if ($quote_self) {
  # Quote all other arguments so a shell will parse them correctly.
  print quote_self($verbose ? ('--verbose') : (),
                   map("$_=$custom_env{$_}", keys %custom_env),
                   @evaled_code,
                   map(("--$$_[0]", @$_[1..$#$_]), @parsed),
                   @ARGV), "\n";
  exit 0;
}

# Open output in an interactive previewer if...
push @parsed, ['preview'] if !$ENV{NFU_NO_PAGER}    # we can page
                          && (!-t STDIN || @ARGV)   # not interacting for input
                          && -t STDOUT;             # interacting for output

if ($explain) {
  # Explain what we would have done with the given command line.
  printf "file\t%s\n", $_ =~ s/#.*\n//gr for @ARGV;
  printf "--%s\t%s\n", ${$_}[0], join "\t", @{$_}[1 .. $#$_] for @parsed;
} elsif (@parsed) {
  my $reader = undef;

  # Note: the loop below uses pipe/fork/dup2 instead of a more idiomatic Open2
  # call. I don't have a good reason for this other than to figure out how the
  # low-level stuff worked.
  for (my $i = 0; $i < @parsed; ++$i) {
    my ($command, @args) = @{$parsed[$i]};

    # Here's where things get fun. The question right now is, "do we need to
    # fork, or can we run in-process?" -- i.e. are we in the middle, or at the
    # end? When we're in the middle, we want to redirect STDOUT to the pipe's
    # writer and fork; otherwise we run in-process and write directly to the
    # existing STDOUT.
    ++$verbose_row;
    if ($i < @parsed - 1) {
      # We're in the middle, so allocate a pipe and fork.
      pipe my($new_reader), my($writer);
      $verbose_command = $command;
      @verbose_args    = @args;
      unless (fork) {
        # We're the child, so do STDOUT redirection.
        close $new_reader or die "failed to close pipe reader: $!";
        POSIX::close(0) or die "failed to close stdin" if defined $reader;
        dup2(fileno($reader), 0) or die "failed to dup input: $!"
          if defined $reader;
        POSIX::close(1);
        dup2(fileno($writer), 1) or die "failed to dup stdout: $!";

        close $reader or die "failed to close reader: $!" if defined $reader;
        close $writer or die "failed to close writer: $!";

        # The function here may never return.
        $functions{$command}->(@args);
        exit;
      } else {
        close $writer or die "failed to close pipe writer: $!";
        close $reader if defined $reader;
        $reader = $new_reader;
      }
    } else {
      # We've hit the end of the chain. Preserve stdout, redirect stdin from
      # current reader.
      POSIX::close(0) or die "failed to close stdin" if defined $reader;
      dup2(fileno($reader), 0) or die "failed to dup input: $!"
        if defined $reader;
      close $reader or die "failed to close reader: $!" if defined $reader;
      $verbose_command = $command;
      @verbose_args    = @args;
      $functions{$command}->(@args);
    }

    # Prevent <> from reading files after the first iteration (this is such a
    # hack).
    @ARGV = ();
  }
} else {
  # Behave like cat, which is useful for auto-decompressing things.
  be_verbose_as_appropriate(length), print while <>;
}