Phoenix Map Reduce
Phoenix provides support for retrieving and writing to Phoenix tables from within MapReduce jobs. The framework now provides custom InputFormat and OutputFormat classes PhoenixInputFormat , PhoenixOutputFormat.
PhoenixMapReduceUtil provides several utility methods to set the input and output configuration parameters to the job.
When a Phoenix table is the source for the Map Reduce job, we can provide a SELECT query or pass a table name and specific columns to import data . To retrieve data from the table within the mapper class, we need to have a class that implements DBWritable and pass it as an argument to PhoenixMapReduceUtil.setInput method. The custom DBWritable class provides implementation for readFields(ResultSet rs) that allows us to retrieve columns for each row. This custom DBWritable class will form the input value to the mapper class.
Note: The SELECT query must not perform any aggregation or use DISTINCT as these are not supported by our map-reduce integration.
Similarly, when writing to a Phoenix table, we use the PhoenixMapReduceUtil.setOutput method to set the output table and the columns.
Note: Phoenix internally builds the UPSERT query for you .
The output key and value class for the job should always be NullWritable and the custom DBWritable class that implements the write method .
Let’s dive into an example where we have a table, STOCK , that holds the master data of quarterly recordings in a double array for each year and we would like to find out the max price of each stock across all years. Let’s store the output to a STOCK_STATS table which is another Phoenix table.
Note , you can definitely have a job configured to read from hdfs and load into a phoenix table.
a) stock
CREATE TABLE IF NOT EXISTS STOCK (STOCK_NAME VARCHAR NOT NULL ,RECORDING_YEAR INTEGER NOT NULL, RECORDINGS_QUARTER DOUBLE array[] CONSTRAINT pk PRIMARY KEY (STOCK_NAME , RECORDING_YEAR));
b) stock_stats
CREATE TABLE IF NOT EXISTS STOCK_STATS (STOCK_NAME VARCHAR NOT NULL , MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME));
Sample Data
UPSERT into STOCK values ('AAPL',2009,ARRAY[85.88,91.04,88.5,90.3]); UPSERT into STOCK values ('AAPL',2008,ARRAY[199.27,200.26,192.55,194.84]); UPSERT into STOCK values ('AAPL',2007,ARRAY[86.29,86.58,81.90,83.80]); UPSERT into STOCK values ('CSCO',2009,ARRAY[16.41,17.00,16.25,16.96]); UPSERT into STOCK values ('CSCO',2008,ARRAY[27.00,27.30,26.21,26.54]); UPSERT into STOCK values ('CSCO',2007,ARRAY[27.46,27.98,27.33,27.73]); UPSERT into STOCK values ('CSCO',2006,ARRAY[17.21,17.49,17.18,17.45]); UPSERT into STOCK values ('GOOG',2009,ARRAY[308.60,321.82,305.50,321.32]); UPSERT into STOCK values ('GOOG',2008,ARRAY[692.87,697.37,677.73,685.19]); UPSERT into STOCK values ('GOOG',2007,ARRAY[466.00,476.66,461.11,467.59]); UPSERT into STOCK values ('GOOG',2006,ARRAY[422.52,435.67,418.22,435.23]); UPSERT into STOCK values ('MSFT',2009,ARRAY[19.53,20.40,19.37,20.33]); UPSERT into STOCK values ('MSFT',2008,ARRAY[35.79,35.96,35.00,35.22]); UPSERT into STOCK values ('MSFT',2007,ARRAY[29.91,30.25,29.40,29.86]); UPSERT into STOCK values ('MSFT',2006,ARRAY[26.25,27.00,26.10,26.84]); UPSERT into STOCK values ('YHOO',2009,ARRAY[12.17,12.85,12.12,12.85]); UPSERT into STOCK values ('YHOO',2008,ARRAY[23.80,24.15,23.60,23.72]); UPSERT into STOCK values ('YHOO',2007,ARRAY[25.85,26.26,25.26,25.61]); UPSERT into STOCK values ('YHOO',2006,ARRAY[39.69,41.22,38.79,40.91]);
Below is a simple job configuration
Job Configuration
final Configuration configuration = HBaseConfiguration.create(); final Job job = Job.getInstance(configuration, "phoenix-mr-job"); // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK "; // StockWritable is the DBWritable class that enables us to process the Result of the above query PhoenixMapReduceUtil.setInput(job, StockWritable.class, "STOCK", selectQuery); // Set the target Phoenix table and the columns PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING"); job.setMapperClass(StockMapper.class); job.setReducerClass(StockReducer.class); job.setOutputFormatClass(PhoenixOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(StockWritable.class); TableMapReduceUtil.addDependencyJars(job); job.waitForCompletion(true);
StockWritable
public class StockWritable implements DBWritable,Writable { private String stockName; private int year; private double[] recordings; private double maxPrice; @Override public void readFields(DataInput input) throws IOException { } @Override public void write(DataOutput output) throws IOException { } @Override public void readFields(ResultSet rs) throws SQLException { stockName = rs.getString("STOCK_NAME"); year = rs.getInt("RECORDING_YEAR"); final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER"); recordings = (double[])recordingsArray.getArray(); } @Override public void write(PreparedStatement pstmt) throws SQLException { pstmt.setString(1, stockName); pstmt.setDouble(2, maxPrice); } // getters / setters for the fields ... ...
Stock Mapper
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> { private Text stock = new Text(); private DoubleWritable price = new DoubleWritable (); @Override protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException { double[] recordings = stockWritable.getRecordings(); final String stockName = stockWritable.getStockName(); double maxPrice = Double.MIN_VALUE; for(double recording : recordings) { if(maxPrice < recording) { maxPrice = recording; } } stock.set(stockName); price.set(maxPrice); context.write(stock,price); } }
Stock Reducer
public static class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException { double maxPrice = Double.MIN_VALUE; for(DoubleWritable recording : recordings) { if(maxPrice < recording.get()) { maxPrice = recording.get(); } } final StockWritable stock = new StockWritable(); stock.setStockName(key.toString()); stock.setMaxPrice(maxPrice); context.write(NullWritable.get(),stock); } }
Packaging & Running
- Ensure phoenix-[version]-client.jar is in the classpath of your Map Reduce job jar.
- To run the job, use the hadoop jar command with the necessary arguments.