Antonio Piccolboni
https://github.com/RevolutionAnalytics/ {RHadoop, rhdfs, rhbase, rmr2, plyrmr}
Hadoop | RHadoop |
---|---|
hdfs | rhdfs |
hbase | rhbase |
mapreduce | rmr2 plyrmr |
library(lubridate)
setClass(Class="HM", representation="Period")
setAs("character", "HM", function(from) hm(paste(ifelse(nchar(from) == 3, "0", ""), from, sep = "")))
col.classes =
c(Year = "integer", Month = "integer", DayofMonth = "integer",
DayOfWeek = "integer", DepTime = "HM", CRSDepTime = "HM",
ArrTime = "HM", CRSArrTime = "HM", UniqueCarrier = "factor",
FlightNum = "integer", TailNum = "factor", ActualElapsedTime = "integer",
CRSElapsedTime = "integer", AirTime = "integer", ArrDelay = "integer",
DepDelay = "integer", Origin = "factor", Dest = "factor",
Distance = "integer", TaxiIn = "integer", TaxiOut = "integer",
Cancelled = "integer", CancellationCode = "factor",
Diverted = "integer", CarrierDelay = "integer", WeatherDelay = "integer",
NASDelay = "integer", SecurityDelay = "integer", LateAircraftDelay = "integer")
read.table(
"../RHadoop.data/airline//air.subs",
sep = ",",
col.names = names(col.classes),
colClasses = col.classes)
Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
1 2008 1 3 4 15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S WN 2032 N271WN 204 210 183 20 26 LAS BNA 1588 6 15 0 0 18 0 0 0 2
2 2008 1 3 4 10H 30M 0S 10H 0M 0S 11H 42M 0S 11H 15M 0S WN 184 N473WN 72 75 62 27 30 LAX OAK 337 3 7 0 0 0 0 0 0 27
3 2008 1 3 4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S WN 1209 N653SW 230 215 203 14 -1 MCI SAN 1333 20 7 0 0 NA NA NA NA NA
4 2008 1 3 4 10H 57M 0S 10H 30M 0S 13H 47M 0S 13H 5M 0S WN 1656 N390SW 170 155 155 42 27 MCO BDL 1050 4 11 0 0 15 0 15 0 12
5 2008 1 3 4 19H 14M 0S 17H 55M 0S 21H 22M 0S 20H 5M 0S WN 1507 N334SW 128 130 113 77 79 MCO BWI 787 1 14 0 0 0 0 0 0 77
6 2008 1 3 4 6H 1M 0S 6H 0M 0S 8H 24M 0S 8H 5M 0S WN 2456 N255WN 83 65 61 19 1 MDW CLE 307 6 16 0 0 0 0 19 0 0
7 2008 1 3 4 11H 55M 0S 11H 50M 0S 14H 26M 0S 14H 35M 0S WN 1488 N461WN 91 105 79 -9 5 MDW IAD 577 6 6 0 0 NA NA NA NA NA
8 2008 1 3 4 12H 49M 0S 11H 20M 0S 13H 43M 0S 12H 35M 0S WN 521 N781WN 54 75 47 68 89 MSY BHM 321 2 5 0 0 60 0 0 0 8
9 2008 1 3 4 9H 57M 0S 9H 45M 0S 11H 1M 0S 10H 55M 0S WN 1343 N523SW 64 70 54 6 12 MSY HOU 303 3 7 0 0 NA NA NA NA NA
....
library(rmr2)
rmr.options(backend = "local")
NULL
air.in.format =
make.input.format(
"csv",
sep = ",",
col.names = names(col.classes),
colClasses = col.classes)
from.dfs("../RHadoop.data/airline/air.subs", format = air.in.format)
$key
NULL
$val
Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
1 2008 1 3 4 15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S WN 2032 N271WN 204 210 183 20 26 LAS BNA 1588 6 15 0 0 18 0 0 0 2
2 2008 1 3 4 10H 30M 0S 10H 0M 0S 11H 42M 0S 11H 15M 0S WN 184 N473WN 72 75 62 27 30 LAX OAK 337 3 7 0 0 0 0 0 0 27
3 2008 1 3 4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S WN 1209 N653SW 230 215 203 14 -1 MCI SAN 1333 20 7 0 0 NA NA NA NA NA
4 2008 1 3 4 10H 57M 0S 10H 30M 0S 13H 47M 0S 13H 5M 0S WN 1656 N390SW 170 155 155 42 27 MCO BDL 1050 4 11 0 0 15 0 15 0 12
5 2008 1 3 4 19H 14M 0S 17H 55M 0S 21H 22M 0S 20H 5M 0S WN 1507 N334SW 128 130 113 77 79 MCO BWI 787 1 14 0 0 0 0 0 0 77
....
air.subs =
mapreduce(
"../RHadoop.data/airline/air.subs",
input.format = air.in.format)
air.subs
function ()
{
fname
}
<bytecode: 0x104ceef40>
<environment: 0x104ced040>
air.df = from.dfs(air.subs)
air.df
$key
NULL
$val
Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
1 2008 1 3 4 15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S WN 2032 N271WN 204 210 183 20 26 LAS BNA 1588 6 15 0 0 18 0 0 0 2
2 2008 1 3 4 10H 30M 0S 10H 0M 0S 11H 42M 0S 11H 15M 0S WN 184 N473WN 72 75 62 27 30 LAX OAK 337 3 7 0 0 0 0 0 0 27
3 2008 1 3 4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S WN 1209 N653SW 230 215 203 14 -1 MCI SAN 1333 20 7 0 0 NA NA NA NA NA
4 2008 1 3 4 10H 57M 0S 10H 30M 0S 13H 47M 0S 13H 5M 0S WN 1656 N390SW 170 155 155 42 27 MCO BDL 1050 4 11 0 0 15 0 15 0 12
5 2008 1 3 4 19H 14M 0S 17H 55M 0S 21H 22M 0S 20H 5M 0S WN 1507 N334SW 128 130 113 77 79 MCO BWI 787 1 14 0 0 0 0 0 0 77
....
air.df = values(air.df)
important.cols = c("DayOfWeek", "DepTime", "UniqueCarrier", "FlightNum", "AirTime", "ArrDelay", "Origin", "Dest", "Distance")
air.df = subset(air.df, select = important.cols)
air.df
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787
6 4 6H 1M 0S WN 2456 61 19 MDW CLE 307
7 4 11H 55M 0S WN 1488 79 -9 MDW IAD 577
8 4 12H 49M 0S WN 521 47 68 MSY BHM 321
9 4 9H 57M 0S WN 1343 54 6 MSY HOU 303
....
air.subs =
mapreduce(
air.subs,
map =
function(k, v)
subset(v, select = important.cols))
from.dfs(air.subs)
$key
NULL
$val
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787
....
air.df = subset(air.df, DayOfWeek >=1 && DayOfWeek <= 7)
air.subs =
mapreduce(
air.subs,
map =
function(k, v)
subset(v, DayOfWeek >=1 && DayOfWeek <= 7))
from.dfs(air.subs)
$key
NULL
$val
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787
....
air.summary =
mapreduce(
air.subs,
map =
function(k, v)
keyval(v$DayOfWeek, v$ArrDelay),
reduce =
function(k, v)
cbind(day = k, mean = mean(v, na.rm = TRUE)))
from.dfs(air.summary)
$key
NULL
$val
day mean
[1,] 4 8.138
[2,] 5 11.595
[3,] 6 5.338
[4,] 7 9.530
[5,] 1 8.194
....
air.summary =
mapreduce(
air.subs,
map =
function(k, v){
v = v[!is.na(v$ArrDelay), ]
keyval(
v$DayOfWeek,
cbind(
count = 1,
arrdelay = v$ArrDelay))},
reduce =
function(k, v)
keyval(k, t(colSums(v))),
combine = TRUE)
from.dfs(air.summary)
$key
[1] 5 6 7 1 2 3 4
$val
count arrdelay
[1,] 10252 118870
[2,] 8548 45628
[3,] 9787 93266
[4,] 10379 85047
[5,] 10325 79858
....
wordcount =
function(
input,
output = NULL,
pattern = " "){
mapreduce(
input = input,
output = output,
map = wc.map,
reduce = wc.reduce,
combine = T)}
wc.map =
function(., lines) {
keyval(
unlist(
strsplit(
x = lines,
split = pattern)),
1)}
wc.reduce =
function(word, counts ) {
keyval(word, sum(counts))}
suppressMessages(library(vcd))
library(plyr)
data(Bundesliga)
summary(Bundesliga)
HomeTeam AwayTeam HomeGoals AwayGoals Round Year Date
Hamburger SV : 780 Hamburger SV : 780 Min. : 0.0 Min. :0.00 Min. : 1.0 Min. :1963 Min. :1963-08-24 07:30:00
Werder Bremen : 763 Werder Bremen : 763 1st Qu.: 1.0 1st Qu.:0.00 1st Qu.: 9.0 1st Qu.:1974 1st Qu.:1975-05-20 04:52:30
Bayern Muenchen : 750 Bayern Muenchen : 750 Median : 2.0 Median :1.00 Median :17.0 Median :1986 Median :1986-11-01 06:30:00
VfB Stuttgart : 746 VfB Stuttgart : 746 Mean : 1.9 Mean :1.19 Mean :17.5 Mean :1986 Mean :1986-09-27 10:23:15
1. FC Kaiserslautern: 712 1. FC Kaiserslautern: 712 3rd Qu.: 3.0 3rd Qu.:2.00 3rd Qu.:26.0 3rd Qu.:1997 3rd Qu.:1997-12-13 06:30:00
Borussia Dortmund : 712 Borussia Dortmund : 712 Max. :12.0 Max. :9.00 Max. :38.0 Max. :2008 Max. :2009-05-23 06:30:00
(Other) :9555 (Other) :9555 NA's :2
Bundesliga$HomeTeam = abbreviate(Bundesliga$HomeTeam)
Bundesliga$AwayTeam = abbreviate(Bundesliga$AwayTeam)
Bundesliga = arrange(Bundesliga, HomeTeam, AwayTeam)
matchups = Bundesliga[2*(1:50) - 1, c("HomeTeam", "AwayTeam")]
matchups
HomeTeam AwayTeam
1 1.FCKl 1.FCKs
3 1.FCKl 1.FCKs
5 1.FCKl 1.FCKs
7 1.FCKl 1.FCKs
9 1.FCKl 1.FCKs
11 1.FCKl 1.FCKs
13 1.FCKl 1.FCKs
15 1.FCKl 1.FCKs
17 1.FCKl 1.FCKs
....
count(matchups)
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 19
2 1.FCKl 1.FN 11
3 1.FCKl 1.FS 3
4 1.FCKl 1FM0 1
5 1.FCKl AlmA 1
6 1.FCKl ArmB 6
7 1.FCKl B-9B 1
8 1.FCKl BrsD 8
matchups1 = Bundesliga[2*(1:50), c("HomeTeam", "AwayTeam")]
matchups1
HomeTeam AwayTeam
2 1.FCKl 1.FCKs
4 1.FCKl 1.FCKs
6 1.FCKl 1.FCKs
8 1.FCKl 1.FCKs
10 1.FCKl 1.FCKs
12 1.FCKl 1.FCKs
14 1.FCKl 1.FCKs
16 1.FCKl 1.FCKs
18 1.FCKl 1.FCKs
....
count(matchups1)
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 19
2 1.FCKl 1.FN 11
3 1.FCKl 1.FS 2
4 1.FCKl 189H 1
5 1.FCKl AlmA 2
6 1.FCKl ArmB 6
7 1.FCKl BrsD 9
rbind(matchups, matchups1)
HomeTeam AwayTeam
1 1.FCKl 1.FCKs
3 1.FCKl 1.FCKs
5 1.FCKl 1.FCKs
7 1.FCKl 1.FCKs
9 1.FCKl 1.FCKs
11 1.FCKl 1.FCKs
13 1.FCKl 1.FCKs
15 1.FCKl 1.FCKs
17 1.FCKl 1.FCKs
....
count(rbind(matchups, matchups1))
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 38
2 1.FCKl 1.FN 22
3 1.FCKl 1.FS 5
4 1.FCKl 189H 1
5 1.FCKl 1FM0 1
6 1.FCKl AlmA 3
7 1.FCKl ArmB 12
8 1.FCKl B-9B 1
9 1.FCKl BrsD 17
....
raw.combination =
rbind(
count(matchups),
count(matchups1))
arrange(raw.combination, HomeTeam, AwayTeam)
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 19
2 1.FCKl 1.FCKs 19
3 1.FCKl 1.FN 11
4 1.FCKl 1.FN 11
5 1.FCKl 1.FS 3
6 1.FCKl 1.FS 2
7 1.FCKl 189H 1
8 1.FCKl 1FM0 1
9 1.FCKl AlmA 1
....
ddply(
raw.combination,
c("HomeTeam", "AwayTeam"),
.fun = function(x) c(freq = sum(x$freq)))
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 38
2 1.FCKl 1.FN 22
3 1.FCKl 1.FS 5
4 1.FCKl 189H 1
5 1.FCKl 1FM0 1
6 1.FCKl AlmA 3
7 1.FCKl ArmB 12
8 1.FCKl B-9B 1
9 1.FCKl BrsD 17
....
shrink =
function(ct)
ddply(
ct,
names(ct)[-ncol(ct)],
.fun = function(x) c(freq = sum(x$freq)))
shrink(raw.combination)
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 38
2 1.FCKl 1.FN 22
3 1.FCKl 1.FS 5
4 1.FCKl 189H 1
5 1.FCKl 1FM0 1
6 1.FCKl AlmA 3
7 1.FCKl ArmB 12
8 1.FCKl B-9B 1
9 1.FCKl BrsD 17
....
matchups.big = to.dfs(Bundesliga[, c("HomeTeam", "AwayTeam")])
map =
function(k, v)
keyval(
1,
count(v))
reduce =
function(k, vv)
keyval(
1,
shrink(vv))
values(
from.dfs(
mapreduce(
matchups.big,
map = map,
reduce = reduce,
combine = T)))
HomeTeam AwayTeam freq
1 1.FCKl 1.FCKs 38
2 1.FCKl 1.FN 22
3 1.FCKl 1.FS 5
4 1.FCKl 189H 1
5 1.FCKl 1FM0 1
6 1.FCKl AlmA 3
7 1.FCKl ArmB 12
8 1.FCKl B-9B 1
9 1.FCKl BrsD 36
....
map =
function(k, v) {
ct = count(v)
keyval(ct[, -ncol(ct)], ct)}
N = 10
map =
function(k, v) {
ct = count(v)
keyval(
apply(
ct[, -ncol(v)],
2,
as.numeric)%%N,
ct)}
library(bitops)
N = 10
map =
function(k, v) {
ct = count(v)
keyval(
apply(
ct[, -ncol(v)],
2,
cksum)%%N,
ct)}
library(cluster)
napply = function(ll, a.name) lapply(ll, function(l) l[[a.name]])
cluster.mr =
function(data, subcluster, merge)
mapreduce(
data,
map =
function(., data.chunk)
keyval(1, list(subcluster(data.chunk))),
combine = T,
reduce =
function(., clusterings)
keyval(1, list(merge(clusterings))))
subclara =
function(data, n.centers) {
clust =
clara(
data,
n.centers,
keep.data=F)
list(
size = nrow(data),
sample = data[clust$sample,],
medoids = clust$medoids)}
merge.clara =
function(clusterings, n.centers){
sizes = unlist(napply(clusterings, 'size'))
total.size = sum(sizes)
size.range = range(sizes)
size.ratio = max(size.range)/min(size.range)
resample =
function(x)
x$sample[
sample(
1:nrow(x$sample),
round(nrow(x$sample) * size.ratio),
replace = TRUE)]
clust =
subclara(
do.call(
rbind,
lapply(
clusterings,
resample)),
n.centers)
clust$size = total.size
clust}
clara.mr =
function(data, n.centers)
values(
from.dfs(
cluster.mr(
data,
Curry(subclara, n.centers = n.centers),
Curry(merge.clara, n.centers = n.centers))))[[1]]
\[ \mathbf{X b = y} \]
solve(t(X)%*%X, t(X)%*%y)
\[ \newcommand{X}{\mathbf{X}} \] \[ (\X_1^t, \X_2^t, \ldots, \X_n^t) \left( \begin{array}{c} \X_1 \\ \X_2 \\ \ldots \\ \X_n \end{array}\right) = \sum_i \X_i^t \X_i \]
Sum =
function(., YY)
keyval(1, list(Reduce('+', YY)))
XtX =
values(
from.dfs(
mapreduce(
input = X.index,
map =
function(., Xi) {
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% Xi))},
reduce = Sum,
combine = TRUE)))[[1]]
Xty =
values(
from.dfs(
mapreduce(
input = X.index,
map = function(., Xi) {
yi = y[Xi[,1],]
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% yi))},
reduce = Sum,
combine = TRUE)))[[1]]
solve(XtX, Xty)
X = matrix(rnorm(2000), ncol = 10)
X.index = to.dfs(cbind(1:nrow(X), X))
y = as.matrix(rnorm(200))
library(randomForest)
frac.per.model <- 0.1
num.models <- 50
column.names <- c("MachineID", "SalePrice", "ModelID.x", "datasource", "auctioneerID",
"YearMade", "MachineHoursCurrentMeter", "UsageBand", "saledate",
"fiModelDesc.x", "fiBaseModel.x", "fiSecondaryDesc.x", "fiModelSeries.x",
"fiModelDescriptor.x", "ProductSize", "fiProductClassDesc.x",
"state", "ProductGroup.x", "ProductGroupDesc.x", "Drive_System",
"Enclosure", "Forks", "Pad_Type", "Ride_Control", "Stick", "Transmission",
"Turbocharged", "Blade_Extension", "Blade_Width", "Enclosure_Type",
"Engine_Horsepower", "Hydraulics", "Pushblock", "Ripper", "Scarifier",
"Tip_Control", "Tire_Size", "Coupler", "Coupler_System", "Grouser_Tracks",
"Hydraulics_Flow", "Track_Type", "Undercarriage_Pad_Width", "Stick_Length",
"Thumb", "Pattern_Changer", "Grouser_Type", "Backhoe_Mounting",
"Blade_Type", "Travel_Controls", "Differential_Type", "Steering_Controls",
"saledatenumeric", "ageAtSale", "saleYear", "saleMonth", "saleDay",
"saleWeekday", "MedianModelPrice", "ModelCount", "ModelID.y",
"fiModelDesc.y", "fiBaseModel.y", "fiSecondaryDesc.y", "fiModelSeries.y",
"fiModelDescriptor.y", "fiProductClassDesc.y", "ProductGroup.y",
"ProductGroupDesc.y", "MfgYear", "fiManufacturerID", "fiManufacturerDesc",
"PrimarySizeBasis", "PrimaryLower", "PrimaryUpper")
model.formula <- SalePrice ~ datasource + auctioneerID + YearMade + saledatenumeric + ProductSize +
ProductGroupDesc.x + Enclosure + Hydraulics + ageAtSale + saleYear +
saleMonth + saleDay + saleWeekday + MedianModelPrice + ModelCount +
MfgYear
bulldozer.input.format =
make.input.format(
"csv",
sep=",",
quote="\"",
row.names=NULL,
col.names=column.names,
fill=TRUE,
na.strings=c("NA"),
colClasses=c(MachineID="NULL",
SalePrice="numeric",
YearMade="numeric",
MachineHoursCurrentMeter="numeric",
ageAtSale="numeric",
saleYear="numeric",
ModelCount="numeric",
MfgYear="numeric",
ModelID.x="factor",
ModelID.y="factor",
fiManufacturerID="factor",
datasource="factor",
auctioneerID="factor",
saledatenumeric="numeric",
saleDay="factor",
Stick_Length="numeric"))
poisson.subsample <- function(k, input) {
generate.sample <- function(i) {
draws <- rpois(n=nrow(input), lambda=frac.per.model)
indices <- rep((1:nrow(input)), draws)
keyval(i, input[indices, ])}
c.keyval(lapply(1:num.models, generate.sample))}
fit.trees <- function(k, v) {
rf <- randomForest(formula=model.formula, data=v, na.action=na.roughfix, ntree=10, do.trace=FALSE)
keyval(k, list(forest=rf))}
mapreduce(input="/poisson/training.csv",
input.format=bulldozer.input.format,
map=poisson.subsample,
reduce=fit.trees,
output="/poisson/output")
raw.forests <- values(from.dfs("/poisson/output"))
forest <- do.call(combine, raw.forests)
air.df = transform(air.df, Late = ArrDelay > 15)
air.df
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588 TRUE
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337 TRUE
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333 FALSE
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050 TRUE
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787 TRUE
6 4 6H 1M 0S WN 2456 61 19 MDW CLE 307 TRUE
7 4 11H 55M 0S WN 1488 79 -9 MDW IAD 577 FALSE
8 4 12H 49M 0S WN 521 47 68 MSY BHM 321 TRUE
9 4 9H 57M 0S WN 1343 54 6 MSY HOU 303 FALSE
....
library(plyrmr)
air.subs =
transform(
input(
"../RHadoop.data/airline/air.subs",
format = air.in.format),
Late = ArrDelay > 15)
air.subs
[1] "Got it! To generate results call the functions output or as.data.frame on this object. Computation has been delayed at least in part."
air.df = as.data.frame(air.subs)
air.df
Year Month DayofMonth DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay Late
1 2008 1 3 4 15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S WN 2032 N271WN 204 210 183 20 26 LAS BNA 1588 6 15 0 0 18 0 0 0 2 TRUE
2 2008 1 3 4 10H 30M 0S 10H 0M 0S 11H 42M 0S 11H 15M 0S WN 184 N473WN 72 75 62 27 30 LAX OAK 337 3 7 0 0 0 0 0 0 27 TRUE
3 2008 1 3 4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S WN 1209 N653SW 230 215 203 14 -1 MCI SAN 1333 20 7 0 0 NA NA NA NA NA FALSE
4 2008 1 3 4 10H 57M 0S 10H 30M 0S 13H 47M 0S 13H 5M 0S WN 1656 N390SW 170 155 155 42 27 MCO BDL 1050 4 11 0 0 15 0 15 0 12 TRUE
5 2008 1 3 4 19H 14M 0S 17H 55M 0S 21H 22M 0S 20H 5M 0S WN 1507 N334SW 128 130 113 77 79 MCO BWI 787 1 14 0 0 0 0 0 0 77 TRUE
6 2008 1 3 4 6H 1M 0S 6H 0M 0S 8H 24M 0S 8H 5M 0S WN 2456 N255WN 83 65 61 19 1 MDW CLE 307 6 16 0 0 0 0 19 0 0 TRUE
7 2008 1 3 4 11H 55M 0S 11H 50M 0S 14H 26M 0S 14H 35M 0S WN 1488 N461WN 91 105 79 -9 5 MDW IAD 577 6 6 0 0 NA NA NA NA NA FALSE
8 2008 1 3 4 12H 49M 0S 11H 20M 0S 13H 43M 0S 12H 35M 0S WN 521 N781WN 54 75 47 68 89 MSY BHM 321 2 5 0 0 60 0 0 0 8 TRUE
9 2008 1 3 4 9H 57M 0S 9H 45M 0S 11H 1M 0S 10H 55M 0S WN 1343 N523SW 64 70 54 6 12 MSY HOU 303 3 7 0 0 NA NA NA NA NA FALSE
....
output(air.subs, "/tmp/air.subs.out")
[1] "Big Data object:" "/tmp/air.subs.out" "native"
air.df =
select(
air.df, DayOfWeek, DepTime, UniqueCarrier,
FlightNum, AirTime, ArrDelay, Origin, Dest,
Distance, Late)
air.df
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588 TRUE
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337 TRUE
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333 FALSE
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050 TRUE
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787 TRUE
6 4 6H 1M 0S WN 2456 61 19 MDW CLE 307 TRUE
7 4 11H 55M 0S WN 1488 79 -9 MDW IAD 577 FALSE
8 4 12H 49M 0S WN 521 47 68 MSY BHM 321 TRUE
9 4 9H 57M 0S WN 1343 54 6 MSY HOU 303 FALSE
....
air.subs =
select(
air.subs,
DayOfWeek, DepTime, UniqueCarrier,
FlightNum, AirTime, ArrDelay,
Origin, Dest, Distance, Late)
as.data.frame(air.subs)
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588 TRUE
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337 TRUE
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333 FALSE
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050 TRUE
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787 TRUE
6 4 6H 1M 0S WN 2456 61 19 MDW CLE 307 TRUE
7 4 11H 55M 0S WN 1488 79 -9 MDW IAD 577 FALSE
8 4 12H 49M 0S WN 521 47 68 MSY BHM 321 TRUE
9 4 9H 57M 0S WN 1343 54 6 MSY HOU 303 FALSE
....
air.df = where(air.df, DayOfWeek >=1 && DayOfWeek <= 7)
air.subs = where(air.subs, DayOfWeek >=1 && DayOfWeek <= 7)
as.data.frame(air.subs)
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
1 4 15H 6M 0S WN 2032 183 20 LAS BNA 1588 TRUE
2 4 10H 30M 0S WN 184 62 27 LAX OAK 337 TRUE
3 4 16H 54M 0S WN 1209 203 14 MCI SAN 1333 FALSE
4 4 10H 57M 0S WN 1656 155 42 MCO BDL 1050 TRUE
5 4 19H 14M 0S WN 1507 113 77 MCO BWI 787 TRUE
6 4 6H 1M 0S WN 2456 61 19 MDW CLE 307 TRUE
7 4 11H 55M 0S WN 1488 79 -9 MDW IAD 577 FALSE
8 4 12H 49M 0S WN 521 47 68 MSY BHM 321 TRUE
9 4 9H 57M 0S WN 1343 54 6 MSY HOU 303 FALSE
....
air.summary =
select(
group(air.subs, DayOfWeek),
mean = mean(ArrDelay, na.rm = TRUE))
as.data.frame(air.summary)
DayOfWeek mean
1.1 5 11.595
1.2 6 5.338
1.3 7 9.530
1.4 1 8.194
1.5 2 7.734
1.6 3 6.674
1 4 8.138
mapreduce(
air.subs,
map =
function(k, v)
keyval(v$DayOfWeek, v$ArrDelay),
reduce =
function(k, v) mean(v, na.rm = TRUE))
air.summary =
select(
group(
select(air.subs, ArrDelay, DayOfWeek),
DayOfWeek,
.recursive = TRUE),
sum = sum(ArrDelay, na.rm = TRUE),
count = sum(!is.na(ArrDelay)))
as.data.frame(air.summary)
DayOfWeek sum count
1.1 5 118870 10252
1.2 6 45628 8548
1.3 7 93266 9787
1.4 1 85047 10379
1.5 2 79858 10325
1.6 3 69394 10398
1 4 83335 10240
origin | functions |
---|---|
base | transform , subset |
plyr | mutate , summarize |
reshape2 | melt , dcast |
new | select , where , do |
SQL dplyr | group , group.f , gather , ungroup , moving.window |
other | top.k , bottom.k , quantile.cols , count.cols |
as.data.frame(
top.k(air.subs, 3, ArrDelay, -AirTime))
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
2845 4 7H 24M 0S NW 1699 132 2453 CLT MSP 930 TRUE
510 7 9H 39M 0S NW 1639 110 1249 EGE MSP 788 TRUE
8738 5 7H 24M 0S MQ 3784 38 1000 FSM DFW 228 TRUE
as.data.frame(
top.k(
group(air.subs, DayOfWeek),
3, ArrDelay, -AirTime))
DayOfWeek DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
2845.2 4 7H 24M 0S NW 1699 132 2453 CLT MSP 930 TRUE
2250 4 20H 43M 0S OO 5804 95 495 FSD ORD 462 TRUE
8842.4 4 23H 38M 0S B6 616 143 482 JAX JFK 829 TRUE
8738.5 5 7H 24M 0S MQ 3784 38 1000 FSM DFW 228 TRUE
5212.3 5 17H 25M 0S NW 1557 55 664 SYR DTW 374 TRUE
7598.4 5 16H 47M 0S MQ 4831 94 653 SDF LGA 658 TRUE
8868.5 6 22H 36M 0S MQ 4382 52 954 CLE ORD 316 TRUE
4442.6 6 16H 55M 0S MQ 4779 73 445 DCA BOS 399 TRUE
8978.2 6 15H 39M 0S NW 1498 52 439 SDF DTW 306 TRUE
510.1 7 9H 39M 0S NW 1639 110 1249 EGE MSP 788 TRUE
8567.4 7 14H 30M 0S AA 2063 135 500 LAS DFW 1055 TRUE
9985.6 7 15H 55M 0S NW 218 170 486 PDX MSP 1426 TRUE
4512.6 1 6H 10M 0S NW 308 194 860 LAX MSP 1536 TRUE
4175.6 1 6H 59M 0S MQ 3504 44 750 LBB DFW 282 TRUE
3489 1 16H 50M 0S EV 4781 106 657 FNT ATL 644 TRUE
....
as.data.frame(quantile.cols(air.subs))
DayOfWeek DepTime FlightNum AirTime ArrDelay Distance
1 1 0 1.999 15.0 -47.18 49.0
2 2 0 623.500 56.0 -10.00 325.0
3 4 0 1566.266 86.0 -2.00 580.5
4 6 0 3521.495 132.0 12.00 952.5
5 7 0 7812.002 453.4 385.77 3904.0
as.data.frame(quantile.cols(group(air.subs, DayOfWeek)))
DayOfWeek DepTime FlightNum AirTime ArrDelay Distance
1.1 5 0 1.253 14.00 -47.00 49.00
2.1 5 0 623.500 56.00 -9.00 321.00
3.1 5 0 1558.000 85.50 0.00 577.00
4.1 5 0 3477.997 133.00 15.00 954.75
5.1 5 0 7820.747 460.37 452.49 3924.34
1.2 6 0 1.000 15.00 -47.73 49.00
2.2 6 0 642.461 57.00 -12.00 336.00
3.2 6 0 1558.500 89.00 -4.00 599.00
4.2 6 0 3534.750 136.00 9.00 991.25
5.2 6 0 7803.000 445.72 356.81 3947.21
1.3 7 0 2.011 14.00 -47.11 49.08
2.3 7 0 625.500 56.00 -10.00 328.75
3.3 7 0 1577.000 86.50 -1.00 588.00
4.3 7 0 3534.250 133.25 13.00 967.00
5.3 7 0 7810.879 469.39 413.39 3972.00
....
as.data.frame(
select(
group(
select(
air.subs,
ArrDelay,
DayOfWeek,
DepTime,
AirTime,
Distance),
DayOfWeek),
model =
list(
lm(
ArrDelay ~
DepTime + AirTime + Distance))))
DayOfWeek model
1.1 5 c(1.8499....
1.2 6 c(-3.422....
1.3 7 c(0.9583....
1.4 1 c(-1.693....
1.5 2 c(-3.172....
1.6 3 c(-2.453....
1 4 c(-0.869....
x = select(air.subs, ArrDelay, DayOfWeek,
DepTime, AirTime, Distance)
x = group(x, DayOfWeek)
x = select(x, model = list(lm(ArrDelay ~ DepTime + AirTime + Distance)))
as.data.frame(x)
DayOfWeek model
1.1 5 c(1.8499....
1.2 6 c(-3.422....
1.3 7 c(0.9583....
1.4 1 c(-1.693....
1.5 2 c(-3.172....
1.6 3 c(-2.453....
1 4 c(-0.869....
between =
function(x, lower, upper)
x >= lower && x <= upper
output(
where(
select(
input(
"../RHadoop.data/airline/airline.csv",
format = air.in.format),
DayOfWeek, DepTime, UniqueCarrier,
FlightNum, AirTime, ArrDelay, Origin, Dest,
Distance, Late = ArrDelay > 15),
between(DayOfWeek, 1, 7) &&
between(DepTime, hm("0000"), hm("2359")) &&
between(AirTime, 0, 600) &&
between(ArrDelay, 60, 700) &&
between(Distance, 0, 3100)),
path = "JR.out")