RHadoop Tutorial

Antonio Piccolboni

RHadoop

Statistical tools

Hadoop growth vs Android

R + Hadoop = RHadoop

Google Group

Github

https://github.com/RevolutionAnalytics/ {RHadoop, rhdfs, rhbase, rmr2, plyrmr} github repo

Hadoop RHadoop
hdfs rhdfs
hbase rhbase
mapreduce rmr2

plyrmr

rmr2

mapreduce model

library(lubridate)
setClass(Class="HM", representation="Period")
setAs("character", "HM", function(from) hm(paste(ifelse(nchar(from) == 3, "0", ""), from, sep = "")))
col.classes = 
    c(Year = "integer", Month = "integer", DayofMonth = "integer", 
        DayOfWeek = "integer", DepTime = "HM", CRSDepTime = "HM", 
        ArrTime = "HM", CRSArrTime = "HM", UniqueCarrier = "factor", 
        FlightNum = "integer", TailNum = "factor", ActualElapsedTime = "integer", 
        CRSElapsedTime = "integer", AirTime = "integer", ArrDelay = "integer", 
        DepDelay = "integer", Origin = "factor", Dest = "factor", 
        Distance = "integer", TaxiIn = "integer", TaxiOut = "integer", 
        Cancelled = "integer", CancellationCode = "factor", 
        Diverted = "integer", CarrierDelay = "integer", WeatherDelay = "integer", 
        NASDelay = "integer", SecurityDelay = "integer", LateAircraftDelay = "integer")
read.table(
    "../RHadoop.data/airline//air.subs", 
    sep = ",", 
    col.names = names(col.classes),
    colClasses = col.classes)
      Year Month DayofMonth DayOfWeek    DepTime CRSDepTime    ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
1     2008     1          3         4  15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S            WN      2032  N271WN               204            210     183       20       26    LAS  BNA     1588      6      15         0                         0           18            0        0             0                 2
2     2008     1          3         4 10H 30M 0S  10H 0M 0S 11H 42M 0S 11H 15M 0S            WN       184  N473WN                72             75      62       27       30    LAX  OAK      337      3       7         0                         0            0            0        0             0                27
3     2008     1          3         4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S            WN      1209  N653SW               230            215     203       14       -1    MCI  SAN     1333     20       7         0                         0           NA           NA       NA            NA                NA
4     2008     1          3         4 10H 57M 0S 10H 30M 0S 13H 47M 0S  13H 5M 0S            WN      1656  N390SW               170            155     155       42       27    MCO  BDL     1050      4      11         0                         0           15            0       15             0                12
5     2008     1          3         4 19H 14M 0S 17H 55M 0S 21H 22M 0S  20H 5M 0S            WN      1507  N334SW               128            130     113       77       79    MCO  BWI      787      1      14         0                         0            0            0        0             0                77
6     2008     1          3         4   6H 1M 0S   6H 0M 0S  8H 24M 0S   8H 5M 0S            WN      2456  N255WN                83             65      61       19        1    MDW  CLE      307      6      16         0                         0            0            0       19             0                 0
7     2008     1          3         4 11H 55M 0S 11H 50M 0S 14H 26M 0S 14H 35M 0S            WN      1488  N461WN                91            105      79       -9        5    MDW  IAD      577      6       6         0                         0           NA           NA       NA            NA                NA
8     2008     1          3         4 12H 49M 0S 11H 20M 0S 13H 43M 0S 12H 35M 0S            WN       521  N781WN                54             75      47       68       89    MSY  BHM      321      2       5         0                         0           60            0        0             0                 8
9     2008     1          3         4  9H 57M 0S  9H 45M 0S  11H 1M 0S 10H 55M 0S            WN      1343  N523SW                64             70      54        6       12    MSY  HOU      303      3       7         0                         0           NA           NA       NA            NA                NA
....
library(rmr2)
rmr.options(backend = "local")
NULL
air.in.format = 
    make.input.format(
        "csv", 
        sep = ",", 
        col.names = names(col.classes), 
        colClasses = col.classes)
from.dfs("../RHadoop.data/airline/air.subs", format = air.in.format)
$key
NULL

$val
        Year Month DayofMonth DayOfWeek    DepTime CRSDepTime    ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
1       2008     1          3         4  15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S            WN      2032  N271WN               204            210     183       20       26    LAS  BNA     1588      6      15         0                         0           18            0        0             0                 2
2       2008     1          3         4 10H 30M 0S  10H 0M 0S 11H 42M 0S 11H 15M 0S            WN       184  N473WN                72             75      62       27       30    LAX  OAK      337      3       7         0                         0            0            0        0             0                27
3       2008     1          3         4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S            WN      1209  N653SW               230            215     203       14       -1    MCI  SAN     1333     20       7         0                         0           NA           NA       NA            NA                NA
4       2008     1          3         4 10H 57M 0S 10H 30M 0S 13H 47M 0S  13H 5M 0S            WN      1656  N390SW               170            155     155       42       27    MCO  BDL     1050      4      11         0                         0           15            0       15             0                12
5       2008     1          3         4 19H 14M 0S 17H 55M 0S 21H 22M 0S  20H 5M 0S            WN      1507  N334SW               128            130     113       77       79    MCO  BWI      787      1      14         0                         0            0            0        0             0                77
....
air.subs = 
    mapreduce(
        "../RHadoop.data/airline/air.subs", 
        input.format = air.in.format)
air.subs
function () 
{
    fname
}
<bytecode: 0x104ceef40>
<environment: 0x104ced040>
air.df = from.dfs(air.subs)
air.df
$key
NULL

$val
        Year Month DayofMonth DayOfWeek    DepTime CRSDepTime    ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay
1       2008     1          3         4  15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S            WN      2032  N271WN               204            210     183       20       26    LAS  BNA     1588      6      15         0                         0           18            0        0             0                 2
2       2008     1          3         4 10H 30M 0S  10H 0M 0S 11H 42M 0S 11H 15M 0S            WN       184  N473WN                72             75      62       27       30    LAX  OAK      337      3       7         0                         0            0            0        0             0                27
3       2008     1          3         4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S            WN      1209  N653SW               230            215     203       14       -1    MCI  SAN     1333     20       7         0                         0           NA           NA       NA            NA                NA
4       2008     1          3         4 10H 57M 0S 10H 30M 0S 13H 47M 0S  13H 5M 0S            WN      1656  N390SW               170            155     155       42       27    MCO  BDL     1050      4      11         0                         0           15            0       15             0                12
5       2008     1          3         4 19H 14M 0S 17H 55M 0S 21H 22M 0S  20H 5M 0S            WN      1507  N334SW               128            130     113       77       79    MCO  BWI      787      1      14         0                         0            0            0        0             0                77
....
air.df = values(air.df)
important.cols = c("DayOfWeek", "DepTime", "UniqueCarrier", "FlightNum", "AirTime", "ArrDelay", "Origin", "Dest", "Distance")
air.df = subset(air.df, select = important.cols)
air.df 
        DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance
1               4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588
2               4 10H 30M 0S            WN       184      62       27    LAX  OAK      337
3               4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333
4               4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050
5               4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787
6               4   6H 1M 0S            WN      2456      61       19    MDW  CLE      307
7               4 11H 55M 0S            WN      1488      79       -9    MDW  IAD      577
8               4 12H 49M 0S            WN       521      47       68    MSY  BHM      321
9               4  9H 57M 0S            WN      1343      54        6    MSY  HOU      303
....
air.subs = 
    mapreduce(
        air.subs,  
        map = 
            function(k, v) 
                subset(v, select = important.cols))
from.dfs(air.subs)
$key
NULL

$val
        DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance
1               4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588
2               4 10H 30M 0S            WN       184      62       27    LAX  OAK      337
3               4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333
4               4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050
5               4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787
....
air.df = subset(air.df, DayOfWeek >=1 && DayOfWeek <= 7)
air.subs = 
    mapreduce(
        air.subs, 
        map = 
            function(k, v)  
                subset(v, DayOfWeek >=1 && DayOfWeek <= 7))
from.dfs(air.subs)
$key
NULL

$val
        DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance
1               4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588
2               4 10H 30M 0S            WN       184      62       27    LAX  OAK      337
3               4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333
4               4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050
5               4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787
....
air.summary = 
 mapreduce(
    air.subs, 
    map = 
        function(k, v)
            keyval(v$DayOfWeek, v$ArrDelay), 
    reduce = 
        function(k, v) 
            cbind(day = k, mean = mean(v, na.rm = TRUE)))
from.dfs(air.summary)
$key
NULL

$val
     day   mean
[1,]   4  8.138
[2,]   5 11.595
[3,]   6  5.338
[4,]   7  9.530
[5,]   1  8.194
....
air.summary = 
 mapreduce(
    air.subs, 
    map = 
        function(k, v){
            v = v[!is.na(v$ArrDelay), ]
            keyval(
                v$DayOfWeek, 
                cbind(
                    count = 1, 
                    arrdelay = v$ArrDelay))}, 
    reduce = 
        function(k, v) 
            keyval(k, t(colSums(v))), 
    combine = TRUE)
from.dfs(air.summary)
$key
[1] 5 6 7 1 2 3 4

$val
     count arrdelay
[1,] 10252   118870
[2,]  8548    45628
[3,]  9787    93266
[4,] 10379    85047
[5,] 10325    79858
....

Wordcount

wordcount = 
  function(
    input, 
    output = NULL, 
    pattern = " "){
    mapreduce(
      input = input,
      output = output,
      map = wc.map,
      reduce = wc.reduce,
      combine = T)}

    wc.map = 
      function(., lines) {
        keyval(
          unlist(
            strsplit(
              x = lines,
              split = pattern)),
          1)}
    wc.reduce =
      function(word, counts ) {
        keyval(word, sum(counts))}

Contingency tables

suppressMessages(library(vcd))
library(plyr)
data(Bundesliga)
summary(Bundesliga)
                 HomeTeam                    AwayTeam      HomeGoals      AwayGoals        Round           Year           Date                    
 Hamburger SV        : 780   Hamburger SV        : 780   Min.   : 0.0   Min.   :0.00   Min.   : 1.0   Min.   :1963   Min.   :1963-08-24 07:30:00  
 Werder Bremen       : 763   Werder Bremen       : 763   1st Qu.: 1.0   1st Qu.:0.00   1st Qu.: 9.0   1st Qu.:1974   1st Qu.:1975-05-20 04:52:30  
 Bayern Muenchen     : 750   Bayern Muenchen     : 750   Median : 2.0   Median :1.00   Median :17.0   Median :1986   Median :1986-11-01 06:30:00  
 VfB Stuttgart       : 746   VfB Stuttgart       : 746   Mean   : 1.9   Mean   :1.19   Mean   :17.5   Mean   :1986   Mean   :1986-09-27 10:23:15  
 1. FC Kaiserslautern: 712   1. FC Kaiserslautern: 712   3rd Qu.: 3.0   3rd Qu.:2.00   3rd Qu.:26.0   3rd Qu.:1997   3rd Qu.:1997-12-13 06:30:00  
 Borussia Dortmund   : 712   Borussia Dortmund   : 712   Max.   :12.0   Max.   :9.00   Max.   :38.0   Max.   :2008   Max.   :2009-05-23 06:30:00  
 (Other)             :9555   (Other)             :9555                                                               NA's   :2                    
Bundesliga$HomeTeam = abbreviate(Bundesliga$HomeTeam)
Bundesliga$AwayTeam = abbreviate(Bundesliga$AwayTeam)
Bundesliga = arrange(Bundesliga, HomeTeam, AwayTeam)
matchups = Bundesliga[2*(1:50) - 1,  c("HomeTeam", "AwayTeam")]
matchups
   HomeTeam AwayTeam
1    1.FCKl   1.FCKs
3    1.FCKl   1.FCKs
5    1.FCKl   1.FCKs
7    1.FCKl   1.FCKs
9    1.FCKl   1.FCKs
11   1.FCKl   1.FCKs
13   1.FCKl   1.FCKs
15   1.FCKl   1.FCKs
17   1.FCKl   1.FCKs
....
count(matchups)
  HomeTeam AwayTeam freq
1   1.FCKl   1.FCKs   19
2   1.FCKl     1.FN   11
3   1.FCKl     1.FS    3
4   1.FCKl     1FM0    1
5   1.FCKl     AlmA    1
6   1.FCKl     ArmB    6
7   1.FCKl     B-9B    1
8   1.FCKl     BrsD    8
matchups1 = Bundesliga[2*(1:50),  c("HomeTeam", "AwayTeam")]
matchups1
    HomeTeam AwayTeam
2     1.FCKl   1.FCKs
4     1.FCKl   1.FCKs
6     1.FCKl   1.FCKs
8     1.FCKl   1.FCKs
10    1.FCKl   1.FCKs
12    1.FCKl   1.FCKs
14    1.FCKl   1.FCKs
16    1.FCKl   1.FCKs
18    1.FCKl   1.FCKs
....
count(matchups1)
  HomeTeam AwayTeam freq
1   1.FCKl   1.FCKs   19
2   1.FCKl     1.FN   11
3   1.FCKl     1.FS    2
4   1.FCKl     189H    1
5   1.FCKl     AlmA    2
6   1.FCKl     ArmB    6
7   1.FCKl     BrsD    9
rbind(matchups, matchups1) 
    HomeTeam AwayTeam
1     1.FCKl   1.FCKs
3     1.FCKl   1.FCKs
5     1.FCKl   1.FCKs
7     1.FCKl   1.FCKs
9     1.FCKl   1.FCKs
11    1.FCKl   1.FCKs
13    1.FCKl   1.FCKs
15    1.FCKl   1.FCKs
17    1.FCKl   1.FCKs
....
count(rbind(matchups, matchups1)) 
  HomeTeam AwayTeam freq
1   1.FCKl   1.FCKs   38
2   1.FCKl     1.FN   22
3   1.FCKl     1.FS    5
4   1.FCKl     189H    1
5   1.FCKl     1FM0    1
6   1.FCKl     AlmA    3
7   1.FCKl     ArmB   12
8   1.FCKl     B-9B    1
9   1.FCKl     BrsD   17
....
raw.combination =  
  rbind(
    count(matchups), 
    count(matchups1))
arrange(raw.combination, HomeTeam, AwayTeam)
   HomeTeam AwayTeam freq
1    1.FCKl   1.FCKs   19
2    1.FCKl   1.FCKs   19
3    1.FCKl     1.FN   11
4    1.FCKl     1.FN   11
5    1.FCKl     1.FS    3
6    1.FCKl     1.FS    2
7    1.FCKl     189H    1
8    1.FCKl     1FM0    1
9    1.FCKl     AlmA    1
....
ddply( 
    raw.combination, 
    c("HomeTeam", "AwayTeam"), 
    .fun = function(x) c(freq = sum(x$freq)))
  HomeTeam AwayTeam freq
1   1.FCKl   1.FCKs   38
2   1.FCKl     1.FN   22
3   1.FCKl     1.FS    5
4   1.FCKl     189H    1
5   1.FCKl     1FM0    1
6   1.FCKl     AlmA    3
7   1.FCKl     ArmB   12
8   1.FCKl     B-9B    1
9   1.FCKl     BrsD   17
....
shrink =  
  function(ct) 
      ddply(
        ct, 
        names(ct)[-ncol(ct)], 
        .fun = function(x) c(freq = sum(x$freq)))
shrink(raw.combination)
  HomeTeam AwayTeam freq
1   1.FCKl   1.FCKs   38
2   1.FCKl     1.FN   22
3   1.FCKl     1.FS    5
4   1.FCKl     189H    1
5   1.FCKl     1FM0    1
6   1.FCKl     AlmA    3
7   1.FCKl     ArmB   12
8   1.FCKl     B-9B    1
9   1.FCKl     BrsD   17
....
matchups.big = to.dfs(Bundesliga[, c("HomeTeam", "AwayTeam")])  
map = 
  function(k, v) 
    keyval(
      1, 
      count(v))
reduce = 
  function(k, vv) 
    keyval(
      1, 
      shrink(vv))
  values(
    from.dfs(
      mapreduce( 
        matchups.big, 
        map = map, 
        reduce = reduce, 
        combine = T)))
     HomeTeam AwayTeam freq
1      1.FCKl   1.FCKs   38
2      1.FCKl     1.FN   22
3      1.FCKl     1.FS    5
4      1.FCKl     189H    1
5      1.FCKl     1FM0    1
6      1.FCKl     AlmA    3
7      1.FCKl     ArmB   12
8      1.FCKl     B-9B    1
9      1.FCKl     BrsD   36
....
map = 
  function(k, v) {
    ct = count(v)
    keyval(ct[, -ncol(ct)], ct)}
N = 10
map = 
  function(k, v) {
    ct = count(v)
    keyval(
      apply(
        ct[, -ncol(v)], 
        2, 
        as.numeric)%%N, 
      ct)}
library(bitops)
N = 10
map = 
  function(k, v) {
    ct = count(v)
    keyval(
      apply(
        ct[, -ncol(v)], 
        2, 
        cksum)%%N, 
      ct)}

Recursive Clara

library(cluster)
napply = function(ll, a.name) lapply(ll, function(l) l[[a.name]])
cluster.mr = 
    function(data, subcluster, merge) 
        mapreduce(
            data,
            map = 
                function(., data.chunk) 
                    keyval(1, list(subcluster(data.chunk))),
            combine = T,
            reduce = 
                function(., clusterings)
                    keyval(1, list(merge(clusterings))))
subclara = 
    function(data, n.centers) {
        clust = 
            clara(
                data, 
                n.centers, 
                keep.data=F)
        list(
            size = nrow(data),
            sample = data[clust$sample,],
            medoids = clust$medoids)}
merge.clara =
    function(clusterings, n.centers){
        sizes = unlist(napply(clusterings, 'size'))
        total.size = sum(sizes)
        size.range = range(sizes)
        size.ratio = max(size.range)/min(size.range)
        resample = 
            function(x) 
                x$sample[
                    sample(
                        1:nrow(x$sample), 
                        round(nrow(x$sample) * size.ratio),
                        replace = TRUE)]
        clust = 
            subclara(
                do.call(
                    rbind, 
                    lapply(
                        clusterings, 
                        resample)),
                n.centers)
        clust$size = total.size
        clust}        
clara.mr = 
    function(data, n.centers)
        values(
            from.dfs(
                cluster.mr(
                    data, 
                    Curry(subclara, n.centers = n.centers), 
                    Curry(merge.clara, n.centers = n.centers))))[[1]]

Linear Least Squares

\[ \mathbf{X b = y} \]

solve(t(X)%*%X, t(X)%*%y)

\[ \newcommand{X}{\mathbf{X}} \] \[ (\X_1^t, \X_2^t, \ldots, \X_n^t) \left( \begin{array}{c} \X_1 \\ \X_2 \\ \ldots \\ \X_n \end{array}\right) = \sum_i \X_i^t \X_i \]

Sum = 
  function(., YY) 
    keyval(1, list(Reduce('+', YY)))
XtX = 
  values(
    from.dfs(
      mapreduce(
        input = X.index,
        map = 
          function(., Xi) {
            Xi = Xi[,-1]
            keyval(1, list(t(Xi) %*% Xi))},
        reduce = Sum,
        combine = TRUE)))[[1]]
Xty = 
  values(
    from.dfs(
      mapreduce(
        input = X.index,
        map = function(., Xi) {
          yi = y[Xi[,1],]
          Xi = Xi[,-1]
          keyval(1, list(t(Xi) %*% yi))},
        reduce = Sum,
        combine = TRUE)))[[1]]
solve(XtX, Xty)
X = matrix(rnorm(2000), ncol = 10)
X.index = to.dfs(cbind(1:nrow(X), X))
y = as.matrix(rnorm(200))

Random Forests

library(randomForest)

frac.per.model <- 0.1
num.models <- 50

column.names <- c("MachineID", "SalePrice", "ModelID.x", "datasource", "auctioneerID", 
                  "YearMade", "MachineHoursCurrentMeter", "UsageBand", "saledate", 
                  "fiModelDesc.x", "fiBaseModel.x", "fiSecondaryDesc.x", "fiModelSeries.x", 
                  "fiModelDescriptor.x", "ProductSize", "fiProductClassDesc.x", 
                  "state", "ProductGroup.x", "ProductGroupDesc.x", "Drive_System", 
                  "Enclosure", "Forks", "Pad_Type", "Ride_Control", "Stick", "Transmission", 
                  "Turbocharged", "Blade_Extension", "Blade_Width", "Enclosure_Type", 
                  "Engine_Horsepower", "Hydraulics", "Pushblock", "Ripper", "Scarifier", 
                  "Tip_Control", "Tire_Size", "Coupler", "Coupler_System", "Grouser_Tracks", 
                  "Hydraulics_Flow", "Track_Type", "Undercarriage_Pad_Width", "Stick_Length", 
                  "Thumb", "Pattern_Changer", "Grouser_Type", "Backhoe_Mounting", 
                  "Blade_Type", "Travel_Controls", "Differential_Type", "Steering_Controls", 
                  "saledatenumeric", "ageAtSale", "saleYear", "saleMonth", "saleDay", 
                  "saleWeekday", "MedianModelPrice", "ModelCount", "ModelID.y", 
                  "fiModelDesc.y", "fiBaseModel.y", "fiSecondaryDesc.y", "fiModelSeries.y", 
                  "fiModelDescriptor.y", "fiProductClassDesc.y", "ProductGroup.y", 
                  "ProductGroupDesc.y", "MfgYear", "fiManufacturerID", "fiManufacturerDesc", 
                  "PrimarySizeBasis", "PrimaryLower", "PrimaryUpper")
model.formula <- SalePrice ~ datasource + auctioneerID + YearMade + saledatenumeric + ProductSize +
                               ProductGroupDesc.x + Enclosure + Hydraulics + ageAtSale + saleYear +
                               saleMonth + saleDay + saleWeekday + MedianModelPrice + ModelCount +
                               MfgYear
bulldozer.input.format = 
    make.input.format(
        "csv", 
        sep=",", 
        quote="\"", 
        row.names=NULL, 
        col.names=column.names, 
        fill=TRUE, 
        na.strings=c("NA"), 
        colClasses=c(MachineID="NULL", 
                                 SalePrice="numeric", 
                                 YearMade="numeric", 
                                 MachineHoursCurrentMeter="numeric", 
                                 ageAtSale="numeric", 
                                 saleYear="numeric", 
                                 ModelCount="numeric", 
                                 MfgYear="numeric", 
                                 ModelID.x="factor", 
                                 ModelID.y="factor", 
                                 fiManufacturerID="factor", 
                                 datasource="factor", 
                                 auctioneerID="factor", 
                                 saledatenumeric="numeric", 
                                 saleDay="factor", 
                                 Stick_Length="numeric"))
poisson.subsample <- function(k, input) {
  generate.sample <- function(i) {
    draws <- rpois(n=nrow(input), lambda=frac.per.model)
    indices <- rep((1:nrow(input)), draws)
    keyval(i, input[indices, ])}
    c.keyval(lapply(1:num.models, generate.sample))}
fit.trees <- function(k, v) {
  rf <- randomForest(formula=model.formula, data=v, na.action=na.roughfix, ntree=10, do.trace=FALSE)
  keyval(k, list(forest=rf))}
mapreduce(input="/poisson/training.csv", 
               input.format=bulldozer.input.format, 
               map=poisson.subsample, 
               reduce=fit.trees, 
               output="/poisson/output")

raw.forests <- values(from.dfs("/poisson/output"))
forest <- do.call(combine, raw.forests)

plyrmr

air.df = transform(air.df, Late = ArrDelay > 15)
air.df
        DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance  Late
1               4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588  TRUE
2               4 10H 30M 0S            WN       184      62       27    LAX  OAK      337  TRUE
3               4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333 FALSE
4               4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050  TRUE
5               4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787  TRUE
6               4   6H 1M 0S            WN      2456      61       19    MDW  CLE      307  TRUE
7               4 11H 55M 0S            WN      1488      79       -9    MDW  IAD      577 FALSE
8               4 12H 49M 0S            WN       521      47       68    MSY  BHM      321  TRUE
9               4  9H 57M 0S            WN      1343      54        6    MSY  HOU      303 FALSE
....
library(plyrmr)
air.subs = 
    transform(
        input(
            "../RHadoop.data/airline/air.subs", 
            format = air.in.format),  
        Late = ArrDelay > 15) 
air.subs
[1] "Got it! To generate results call the functions output or as.data.frame on this object. Computation has been delayed at least in part."
air.df = as.data.frame(air.subs)
air.df 
        Year Month DayofMonth DayOfWeek    DepTime CRSDepTime    ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled CancellationCode Diverted CarrierDelay WeatherDelay NASDelay SecurityDelay LateAircraftDelay  Late
1       2008     1          3         4  15H 6M 0S 14H 40M 0S 20H 30M 0S 20H 10M 0S            WN      2032  N271WN               204            210     183       20       26    LAS  BNA     1588      6      15         0                         0           18            0        0             0                 2  TRUE
2       2008     1          3         4 10H 30M 0S  10H 0M 0S 11H 42M 0S 11H 15M 0S            WN       184  N473WN                72             75      62       27       30    LAX  OAK      337      3       7         0                         0            0            0        0             0                27  TRUE
3       2008     1          3         4 16H 54M 0S 16H 55M 0S 18H 44M 0S 18H 30M 0S            WN      1209  N653SW               230            215     203       14       -1    MCI  SAN     1333     20       7         0                         0           NA           NA       NA            NA                NA FALSE
4       2008     1          3         4 10H 57M 0S 10H 30M 0S 13H 47M 0S  13H 5M 0S            WN      1656  N390SW               170            155     155       42       27    MCO  BDL     1050      4      11         0                         0           15            0       15             0                12  TRUE
5       2008     1          3         4 19H 14M 0S 17H 55M 0S 21H 22M 0S  20H 5M 0S            WN      1507  N334SW               128            130     113       77       79    MCO  BWI      787      1      14         0                         0            0            0        0             0                77  TRUE
6       2008     1          3         4   6H 1M 0S   6H 0M 0S  8H 24M 0S   8H 5M 0S            WN      2456  N255WN                83             65      61       19        1    MDW  CLE      307      6      16         0                         0            0            0       19             0                 0  TRUE
7       2008     1          3         4 11H 55M 0S 11H 50M 0S 14H 26M 0S 14H 35M 0S            WN      1488  N461WN                91            105      79       -9        5    MDW  IAD      577      6       6         0                         0           NA           NA       NA            NA                NA FALSE
8       2008     1          3         4 12H 49M 0S 11H 20M 0S 13H 43M 0S 12H 35M 0S            WN       521  N781WN                54             75      47       68       89    MSY  BHM      321      2       5         0                         0           60            0        0             0                 8  TRUE
9       2008     1          3         4  9H 57M 0S  9H 45M 0S  11H 1M 0S 10H 55M 0S            WN      1343  N523SW                64             70      54        6       12    MSY  HOU      303      3       7         0                         0           NA           NA       NA            NA                NA FALSE
....
output(air.subs, "/tmp/air.subs.out")
[1] "Big Data object:"  "/tmp/air.subs.out" "native"           
air.df = 
    select(
        air.df, DayOfWeek, DepTime, UniqueCarrier, 
        FlightNum, AirTime, ArrDelay, Origin, Dest, 
        Distance, Late)
air.df 
      DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance  Late
1             4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588  TRUE
2             4 10H 30M 0S            WN       184      62       27    LAX  OAK      337  TRUE
3             4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333 FALSE
4             4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050  TRUE
5             4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787  TRUE
6             4   6H 1M 0S            WN      2456      61       19    MDW  CLE      307  TRUE
7             4 11H 55M 0S            WN      1488      79       -9    MDW  IAD      577 FALSE
8             4 12H 49M 0S            WN       521      47       68    MSY  BHM      321  TRUE
9             4  9H 57M 0S            WN      1343      54        6    MSY  HOU      303 FALSE
....
air.subs = 
    select(
        air.subs, 
        DayOfWeek, DepTime, UniqueCarrier, 
        FlightNum, AirTime, ArrDelay, 
        Origin, Dest, Distance, Late)
as.data.frame(air.subs)
        DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance  Late
1               4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588  TRUE
2               4 10H 30M 0S            WN       184      62       27    LAX  OAK      337  TRUE
3               4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333 FALSE
4               4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050  TRUE
5               4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787  TRUE
6               4   6H 1M 0S            WN      2456      61       19    MDW  CLE      307  TRUE
7               4 11H 55M 0S            WN      1488      79       -9    MDW  IAD      577 FALSE
8               4 12H 49M 0S            WN       521      47       68    MSY  BHM      321  TRUE
9               4  9H 57M 0S            WN      1343      54        6    MSY  HOU      303 FALSE
....
air.df = where(air.df, DayOfWeek >=1 && DayOfWeek <= 7)
air.subs = where(air.subs, DayOfWeek >=1 && DayOfWeek <= 7)
as.data.frame(air.subs)
        DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance  Late
1               4  15H 6M 0S            WN      2032     183       20    LAS  BNA     1588  TRUE
2               4 10H 30M 0S            WN       184      62       27    LAX  OAK      337  TRUE
3               4 16H 54M 0S            WN      1209     203       14    MCI  SAN     1333 FALSE
4               4 10H 57M 0S            WN      1656     155       42    MCO  BDL     1050  TRUE
5               4 19H 14M 0S            WN      1507     113       77    MCO  BWI      787  TRUE
6               4   6H 1M 0S            WN      2456      61       19    MDW  CLE      307  TRUE
7               4 11H 55M 0S            WN      1488      79       -9    MDW  IAD      577 FALSE
8               4 12H 49M 0S            WN       521      47       68    MSY  BHM      321  TRUE
9               4  9H 57M 0S            WN      1343      54        6    MSY  HOU      303 FALSE
....
air.summary = 
    select(
        group(air.subs, DayOfWeek), 
        mean = mean(ArrDelay, na.rm = TRUE))
as.data.frame(air.summary)
    DayOfWeek   mean
1.1         5 11.595
1.2         6  5.338
1.3         7  9.530
1.4         1  8.194
1.5         2  7.734
1.6         3  6.674
1           4  8.138
mapreduce(
    air.subs, 
    map = 
        function(k, v)
            keyval(v$DayOfWeek, v$ArrDelay), 
    reduce = 
        function(k, v) mean(v, na.rm = TRUE))
air.summary = 
    select(
        group(
            select(air.subs, ArrDelay, DayOfWeek), 
            DayOfWeek, 
            .recursive = TRUE), 
        sum = sum(ArrDelay, na.rm = TRUE),
        count = sum(!is.na(ArrDelay)))
as.data.frame(air.summary)
    DayOfWeek    sum count
1.1         5 118870 10252
1.2         6  45628  8548
1.3         7  93266  9787
1.4         1  85047 10379
1.5         2  79858 10325
1.6         3  69394 10398
1           4  83335 10240
origin functions
base transform, subset
plyr mutate, summarize
reshape2 melt, dcast
new select, where, do
SQL dplyr group, group.f, gather, ungroup, moving.window
other top.k, bottom.k, quantile.cols, count.cols

union, intersect, sample

Better than SQL

as.data.frame(
    top.k(air.subs, 3, ArrDelay, -AirTime))
     DayOfWeek   DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
2845         4 7H 24M 0S            NW      1699     132     2453    CLT  MSP      930 TRUE
510          7 9H 39M 0S            NW      1639     110     1249    EGE  MSP      788 TRUE
8738         5 7H 24M 0S            MQ      3784      38     1000    FSM  DFW      228 TRUE
as.data.frame(
    top.k(
        group(air.subs, DayOfWeek), 
        3, ArrDelay, -AirTime))
       DayOfWeek    DepTime UniqueCarrier FlightNum AirTime ArrDelay Origin Dest Distance Late
2845.2         4  7H 24M 0S            NW      1699     132     2453    CLT  MSP      930 TRUE
2250           4 20H 43M 0S            OO      5804      95      495    FSD  ORD      462 TRUE
8842.4         4 23H 38M 0S            B6       616     143      482    JAX  JFK      829 TRUE
8738.5         5  7H 24M 0S            MQ      3784      38     1000    FSM  DFW      228 TRUE
5212.3         5 17H 25M 0S            NW      1557      55      664    SYR  DTW      374 TRUE
7598.4         5 16H 47M 0S            MQ      4831      94      653    SDF  LGA      658 TRUE
8868.5         6 22H 36M 0S            MQ      4382      52      954    CLE  ORD      316 TRUE
4442.6         6 16H 55M 0S            MQ      4779      73      445    DCA  BOS      399 TRUE
8978.2         6 15H 39M 0S            NW      1498      52      439    SDF  DTW      306 TRUE
510.1          7  9H 39M 0S            NW      1639     110     1249    EGE  MSP      788 TRUE
8567.4         7 14H 30M 0S            AA      2063     135      500    LAS  DFW     1055 TRUE
9985.6         7 15H 55M 0S            NW       218     170      486    PDX  MSP     1426 TRUE
4512.6         1  6H 10M 0S            NW       308     194      860    LAX  MSP     1536 TRUE
4175.6         1  6H 59M 0S            MQ      3504      44      750    LBB  DFW      282 TRUE
3489           1 16H 50M 0S            EV      4781     106      657    FNT  ATL      644 TRUE
....
as.data.frame(quantile.cols(air.subs))
  DayOfWeek DepTime FlightNum AirTime ArrDelay Distance
1         1       0     1.999    15.0   -47.18     49.0
2         2       0   623.500    56.0   -10.00    325.0
3         4       0  1566.266    86.0    -2.00    580.5
4         6       0  3521.495   132.0    12.00    952.5
5         7       0  7812.002   453.4   385.77   3904.0
as.data.frame(quantile.cols(group(air.subs, DayOfWeek)))
    DayOfWeek DepTime FlightNum AirTime ArrDelay Distance
1.1         5       0     1.253   14.00   -47.00    49.00
2.1         5       0   623.500   56.00    -9.00   321.00
3.1         5       0  1558.000   85.50     0.00   577.00
4.1         5       0  3477.997  133.00    15.00   954.75
5.1         5       0  7820.747  460.37   452.49  3924.34
1.2         6       0     1.000   15.00   -47.73    49.00
2.2         6       0   642.461   57.00   -12.00   336.00
3.2         6       0  1558.500   89.00    -4.00   599.00
4.2         6       0  3534.750  136.00     9.00   991.25
5.2         6       0  7803.000  445.72   356.81  3947.21
1.3         7       0     2.011   14.00   -47.11    49.08
2.3         7       0   625.500   56.00   -10.00   328.75
3.3         7       0  1577.000   86.50    -1.00   588.00
4.3         7       0  3534.250  133.25    13.00   967.00
5.3         7       0  7810.879  469.39   413.39  3972.00
....
as.data.frame(
    select(
        group(
            select(
                air.subs, 
                ArrDelay,
                DayOfWeek, 
                DepTime, 
                AirTime, 
                Distance),
            DayOfWeek),
        model = 
            list(
                lm(
                    ArrDelay ~ 
                        DepTime + AirTime + Distance))))
    DayOfWeek        model
1.1         5 c(1.8499....
1.2         6 c(-3.422....
1.3         7 c(0.9583....
1.4         1 c(-1.693....
1.5         2 c(-3.172....
1.6         3 c(-2.453....
1           4 c(-0.869....
x = select(air.subs, ArrDelay, DayOfWeek, 
                     DepTime, AirTime, Distance)
x = group(x, DayOfWeek)
x = select(x,   model = list(lm(ArrDelay ~ DepTime + AirTime + Distance)))

as.data.frame(x)
    DayOfWeek        model
1.1         5 c(1.8499....
1.2         6 c(-3.422....
1.3         7 c(0.9583....
1.4         1 c(-1.693....
1.5         2 c(-3.172....
1.6         3 c(-2.453....
1           4 c(-0.869....
between = 
    function(x, lower, upper)
        x >= lower && x <= upper
output(
    where(
        select(
            input(
                "../RHadoop.data/airline/airline.csv", 
                format = air.in.format), 
            DayOfWeek, DepTime, UniqueCarrier, 
            FlightNum, AirTime, ArrDelay, Origin, Dest, 
            Distance, Late = ArrDelay > 15),
        between(DayOfWeek, 1, 7) &&
            between(DepTime, hm("0000"), hm("2359")) &&
            between(AirTime, 0, 600) &&
            between(ArrDelay, 60, 700) &&
            between(Distance, 0, 3100)),
    path = "JR.out")