# original from jseidman, wrapped in a list() for rmr-1.2's new I/O model
csvtextinputformat = list(mode = 'text', format = function(line) {
	keyval(NULL, unlist(strsplit(line, "\\,")))
}, streaming.format=NULL)


#
# based on jseidman's cvstextinputformat
#  1. added field names for better code readability (esp. in mapper)
#  2. use make.input.format() to wrap for rmr-1.2's new I/O model
#
asa.csvtextinputformat = make.input.format( format = function(line) {

	values = unlist( strsplit(line, "\\,") )
	
	names(values) = c('Year','Month','DayofMonth','DayOfWeek','DepTime','CRSDepTime',
					  'ArrTime','CRSArrTime','UniqueCarrier','FlightNum','TailNum',
					  'ActualElapsedTime','CRSElapsedTime','AirTime','ArrDelay',
					  'DepDelay','Origin','Dest','Distance','TaxiIn','TaxiOut',
					  'Cancelled','CancellationCode','Diverted','CarrierDelay',
					  'WeatherDelay','NASDelay','SecurityDelay','LateAircraftDelay')

	return( keyval(NULL, values) )
} )

#
# the mapper gets a key and a value vector generated by the formatter
# in our case, the key is NULL and all the field values come in as a vector
#
mapper.year.market.enroute_time = function(key, val) {

	# Skip header lines, cancellations, and diversions:
	if ( !identical(as.character(val['Year']), 'Year')
		 & identical(as.numeric(val['Cancelled']), 0)
		 & identical(as.numeric(val['Diverted']), 0) ) {		 	
		
		# We don't care about direction of travel, so construct 'market'
		# with airports ordered alphabetically
		# (e.g, LAX to JFK becomes 'JFK-LAX'
		if (val['Origin'] < val['Dest'])
			market = paste(val['Origin'], val['Dest'], sep='-')
		else
			market = paste(val['Dest'], val['Origin'], sep='-')
		
		# key consists of year, market
		output.key = c(val['Year'], market)

		# output gate-to-gate elapsed times (CRS and actual) + time in air
		output.val = c(val['CRSElapsedTime'], val['ActualElapsedTime'], val['AirTime'])

		return( keyval(output.key, output.val) )
	}
}


#
# the reducer gets all the values for a given key
# the values (which may be mult-valued as here) come in the form of a list()
#
reducer.year.market.enroute_time = function(key, val.list) {

	# val.list is a list of row vectors
	# a data.frame is a list of column vectors
	# plyr's ldply() is the easiest way to convert IMHO
	if ( require(plyr) )	
		val.df = ldply(val.list, as.numeric)
	else { # this is as close as my deficient *apply skills can come w/o plyr
		val.list = lapply(val.list, as.numeric)
		val.df = data.frame( do.call(rbind, val.list) )
	}	
	colnames(val.df) = c('actual','crs','air')
	
	output.key = key
	output.val = c( nrow(val.df), mean(val.df$actual, na.rm=T), 
									mean(val.df$crs, na.rm=T), 
									mean(val.df$air, na.rm=T) )

	return( keyval(output.key, output.val) )
}