Summary

This post shows a simple, minimal example of using the R language with Apache Hive data warehouse.

Introduction

Hive supports user defined aggregation functions (UDAF’s) through custom programs that process stdin and stdout. Hive sends each row of a table to the program via stdin. The program may process the stream any way it wants. Then the program writes lines representing rows to stdout. Hive puts those lines together as a table in the database. The model is flexible, so the program can write as many or as few rows as it wants.

The CLUSTER BY part of the SQL below means that the stream Hive sends to stdin will first be sorted based on the value of a column. This means each unique value of that column will be processed by just one R script, so we can process the entire group at a time. Processing large groups, ie. 1000’s of rows at one time lets us write more efficient R code versus processing one row at a time. A follow up post explains this idea further.

To run the code in this post, load up the u_data table of movie rankings following the Hive documentation.

Goal

I’m going to do the SQL equivalent of:

SELECT userid, COUNT(*) as n
FROM u_data
GROUP BY userid
;

It’s nice to first try something that SQL can do, because then we can verify the answer. Of course the whole point of using R or any other program is to go beyond what SQL can do.

R script

The following R script essentially duplicates R’s table() function included with the base package. I wrote it in this way to emphasize the split and apply operations.

Here’s the udaf.R file:

#!/usr/bin/env Rscript

# NOT stdin() - that's different ;)
infile = file("stdin")

tbl = read.table(infile)
s = split(tbl, tbl[, 1])

ans = lapply(s, function(ss){
    data.frame(ss[1, 1], nrow(ss))
})

ans = do.call(rbind, ans)

write.table(ans, stdout(), sep = "\t"
            , col.names = FALSE, row.names = FALSE)

This script loads everything from stdin straight into memory. It does not stream. This approach will fail for large enough data. It fails because each individual R process may not be able to hold the subsets in memory. Please see the follow up post to see how to do this properly using streams in R.

Development

Before running it in Hive it’s a good idea to make sure the R script runs locally. Test it on the actual data by downloading a small part of it from Hive or HDFS. Then if the file is called little.txt you can run it on the command line:

$ cat little.txt | Rscript udaf.R

If you see the table you expected printed to stdout then it worked. Be happy. You can now run the udaf.sql script, comparing the results to the initial query:

-- udaf.sql
DROP TABLE udaf
;

CREATE TABLE udaf (
  userid INT,
  count INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
;

add FILE udaf.R
;

INSERT OVERWRITE TABLE udaf
SELECT
TRANSFORM (userid)
USING "Rscript udaf.R"
AS (userid, count)
FROM (
    SELECT userid
    FROM u_data 
    CLUSTER BY userid
) AS tmp
;

SELECT *
FROM udaf
ORDER BY count DESC
LIMIT 10
;

Debugging

If you don’t have R installed on all the Hadoop worker nodes then you’ll see an error like this:

Caused by: java.io.IOException: Cannot run program "Rscript": error=2, No
such file or directory

The solution is simple; just install R on the worker nodes.

Scripts will fail. After unintentionally writing a few with bugs I wrote a few very minimal scripts where I purposely injected bugs in various locations and then reviewed the logs. Somewhere in the 1000’s of lines of boilerplate you may find the root cause.

When the R script fails we need to look at the traceback so we can learn something about why it failed. A couple blog post tutorials catch the error in the script and write the traceback into the table. This technique works with R, but mixes data with error logs. The system already logs stderr, so we just need to find where to read those logs. Moreover, we can write to stderr for logging purposes in a regular script that doesn’t fail:

# Helpful when grepping through the logs
writeLines("\n\n\nBEGIN R SCRIPT", stderr())

When I run the hive query I see a line like this indicating the YARN application ID:

Status: Running (Executing on YARN cluster with App id application_1480440170646_0140)

We can used this ID to inspect the logs:

$ yarn logs -applicationId application_1480440170646_0140 -log_files stderr | less

Conclusion

The internet has a few examples of how to apply UDAF’s with Python, and this inspired me to try the same thing with R. R works well for this because a data frame is basically the same thing as a table.