# original from jseidman, wrapped in a list() for rmr-1.2's new I/O model csvtextinputformat = list(mode = 'text', format = function(line) { keyval(NULL, unlist(strsplit(line, "\\,"))) }, streaming.format=NULL) # # based on jseidman's cvstextinputformat # 1. added field names for better code readability (esp. in mapper) # 2. use make.input.format() to wrap for rmr-1.2's new I/O model # asa.csvtextinputformat = make.input.format( format = function(line) { values = unlist( strsplit(line, "\\,") ) names(values) = c('Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime', 'ArrTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum', 'ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay', 'DepDelay','Origin','Dest','Distance','TaxiIn','TaxiOut', 'Cancelled','CancellationCode','Diverted','CarrierDelay', 'WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay') return( keyval(NULL, values) ) } ) # # the mapper gets a key and a value vector generated by the formatter # in our case, the key is NULL and all the field values come in as a vector # mapper.year.market.enroute_time = function(key, val) { # Skip header lines, cancellations, and diversions: if ( !identical(as.character(val['Year']), 'Year') & identical(as.numeric(val['Cancelled']), 0) & identical(as.numeric(val['Diverted']), 0) ) { # We don't care about direction of travel, so construct 'market' # with airports ordered alphabetically # (e.g, LAX to JFK becomes 'JFK-LAX' if (val['Origin'] < val['Dest']) market = paste(val['Origin'], val['Dest'], sep='-') else market = paste(val['Dest'], val['Origin'], sep='-') # key consists of year, market output.key = c(val['Year'], market) # output gate-to-gate elapsed times (CRS and actual) + time in air output.val = c(val['CRSElapsedTime'], val['ActualElapsedTime'], val['AirTime']) return( keyval(output.key, output.val) ) } } # # the reducer gets all the values for a given key # the values (which may be mult-valued as here) come in the form of a list() # reducer.year.market.enroute_time = function(key, val.list) { # val.list is a list of row vectors # a data.frame is a list of column vectors # plyr's ldply() is the easiest way to convert IMHO if ( require(plyr) ) val.df = ldply(val.list, as.numeric) else { # this is as close as my deficient *apply skills can come w/o plyr val.list = lapply(val.list, as.numeric) val.df = data.frame( do.call(rbind, val.list) ) } colnames(val.df) = c('actual','crs','air') output.key = key output.val = c( nrow(val.df), mean(val.df$actual, na.rm=T), mean(val.df$crs, na.rm=T), mean(val.df$air, na.rm=T) ) return( keyval(output.key, output.val) ) }