The phoenix-spark plugin extends Phoenix’s MapReduce support to allow Spark to load Phoenix tables as DataFrames, and enables persisting them back to Phoenix.

Prerequisites

  • Phoenix 4.4.0+
  • Spark 1.3.1+ (prebuilt with Hadoop 2.4 recommended)

Why not JDBC?

Although Spark supports connecting directly to JDBC databases, it’s only able to parallelize queries by partioning on a numeric column. It also requires a known lower bound, upper bound and partition count in order to create split queries.

In contrast, the phoenix-spark integration is able to leverage the underlying splits provided by Phoenix in order to retrieve and save data across multiple workers. All that’s required is a database URL and a table name. Optional SELECT columns can be given, as well as pushdown predicates for efficient filtering.

The choice of which method to use to access Phoenix comes down to each specific use case.

Spark setup

  • To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both ‘spark.executor.extraClassPath’ and ‘spark.driver.extraClassPath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’

  • Note that for Phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’.

  • As of Phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the spark16 maven profile.

  • To help your IDE, you can add the following provided dependency to your build:

<dependency>
  <groupId>org.apache.phoenix</groupId>
  <artifactId>phoenix-spark</artifactId>
  <version>${phoenix.version}</version>
  <scope>provided</scope>
</dependency>
  • As of Phoenix 4.15.0, the connectors project will be separated from the main phoenix project (see phoenix-connectors) and will have its own releases. You can add the following dependency in your project:
<dependency>
  <groupId>org.apache.phoenix</groupId>
  <artifactId>phoenix-spark</artifactId>
  <version>${phoenix.connectors.version}</version>
</dependency>

The first released connectors jar is connectors-1.0.0 (replace above phoenix.connectors.version with this version)

Reading Phoenix Tables

Given a Phoenix table with the following DDL and DML:

CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1');
UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');

Load as a DataFrame using the DataSourceV2 API

Scala example:

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource

val spark = SparkSession
  .builder()
  .appName("phoenix-test")
  .master("local")
  .getOrCreate()

// Load data from TABLE1
val df = spark.sqlContext
  .read
  .format("phoenix")
  .options(Map("table" -> "TABLE1", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
  .load

df.filter(df("COL1") === "test_row_1" && df("ID") === 1L)
  .select(df("ID"))
  .show

Java example:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;

public class PhoenixSparkRead {
    
    public static void main() throws Exception {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(jsc);
        
        // Load data from TABLE1
        Dataset<Row> df = sqlContext
            .read()
            .format("phoenix")
            .option("table", "TABLE1")
            .option(ZOOKEEPER_URL, "phoenix-server:2181")
            .load();
        df.createOrReplaceTempView("TABLE1");
    
        SQLContext sqlCtx = new SQLContext(jsc);
        df = sqlCtx.sql("SELECT * FROM TABLE1 WHERE COL1='test_row_1' AND ID=1L");
        df.show();
        jsc.stop();
    }
}

Saving to Phoenix

Save DataFrames to Phoenix using DataSourceV2

The save is method on DataFrame allows passing in a data source type. You can use phoenix for DataSourceV2 and must also pass in a table and zkUrl parameter to specify which table and server to persist the DataFrame to. The column names are derived from the DataFrame’s schema field names, and must match the Phoenix column names.

The save method also takes a SaveMode option, for which only SaveMode.Overwrite is supported.

Given two Phoenix tables with the following DDL:

CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);

you can load from an input table and save to an output table as a DataFrame as follows in Scala:

import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SparkSession, SaveMode}
import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource

val spark = SparkSession
  .builder()
  .appName("phoenix-test")
  .master("local")
  .getOrCreate()
  
// Load INPUT_TABLE
val df = spark.sqlContext
  .read
  .format("phoenix")
  .options(Map("table" -> "INPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
  .load

// Save to OUTPUT_TABLE
df.write
  .format("phoenix")
  .mode(SaveMode.Overwrite)
  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
  .save()

Java example:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SQLContext;

import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;

public class PhoenixSparkWriteFromInputTable {
    
    public static void main() throws Exception {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(jsc);
        
        // Load INPUT_TABLE
        Dataset<Row> df = sqlContext
            .read()
            .format("phoenix")
            .option("table", "INPUT_TABLE")
            .option(ZOOKEEPER_URL, "phoenix-server:2181")
            .load();
        
        // Save to OUTPUT_TABLE
        df.write()
          .format("phoenix")
          .mode(SaveMode.Overwrite)
          .option("table", "OUTPUT_TABLE")
          .option(ZOOKEEPER_URL, "phoenix-server:2181")
          .save();
        jsc.stop();
    }
}

Save from an external RDD with a schema to a Phoenix table

Just like the previous example, you can pass in the data source type as phoenix and specify the table and zkUrl parameters indicating which table and server to persist the DataFrame to.

Note that the schema of the RDD must match its column data and this must match the schema of the Phoenix table that you save to.

Given an output Phoenix table with the following DDL:

CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);

you can save a dataframe from an RDD as follows in Scala:

import org.apache.spark.SparkContext
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, StructField}
import org.apache.spark.sql.{Row, SQLContext, SparkSession, SaveMode}
import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource

val spark = SparkSession
  .builder()
  .appName("phoenix-test")
  .master("local")
  .getOrCreate()
  
val dataSet = List(Row(1L, "1", 1), Row(2L, "2", 2), Row(3L, "3", 3))

val schema = StructType(
  Seq(StructField("ID", LongType, nullable = false),
    StructField("COL1", StringType),
    StructField("COL2", IntegerType)))

val rowRDD = spark.sparkContext.parallelize(dataSet)

// Apply the schema to the RDD.
val df = spark.sqlContext.createDataFrame(rowRDD, schema)

df.write
  .format("phoenix")
  .options(Map("table" -> "OUTPUT_TABLE", PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
  .mode(SaveMode.Overwrite)
  .save()

Java example:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

import static org.apache.phoenix.spark.datasource.v2.PhoenixDataSource.ZOOKEEPER_URL;

public class PhoenixSparkWriteFromRDDWithSchema {
 
    public static void main() throws Exception {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("phoenix-test");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SQLContext sqlContext = new SQLContext(jsc);
        SparkSession spark = sqlContext.sparkSession();
        Dataset<Row> df;
  
        // Generate the schema based on the fields
        List<StructField> fields = new ArrayList<>();
        fields.add(DataTypes.createStructField("ID", DataTypes.LongType, false));
        fields.add(DataTypes.createStructField("COL1", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("COL2", DataTypes.IntegerType, true));
        StructType schema = DataTypes.createStructType(fields);
  
        // Generate the rows with the same exact schema
        List<Row> rows = new ArrayList<>();
        for (int i = 1; i < 4; i++) {
            rows.add(RowFactory.create(Long.valueOf(i), String.valueOf(i), i));
        }
  
        // Create a DataFrame from the rows and the specified schema
        df = spark.createDataFrame(rows, schema);
        df.write()
            .format("phoenix")
            .mode(SaveMode.Overwrite)
            .option("table", "OUTPUT_TABLE")
            .option(ZOOKEEPER_URL,  "phoenix-server:2181")
            .save();
  
        jsc.stop();
    }
}

PySpark

With Spark’s DataFrame support, you can also use pyspark to read and write from Phoenix tables.

Load a DataFrame

Given a table TABLE1 and a Zookeeper url of phoenix-server:2181 you can load the table as a DataFrame using the following Python code in pyspark

df = sqlContext.read \
  .format("phoenix") \
  .option("table", "TABLE1") \
  .option("zkUrl", "phoenix-server:2181") \
  .load()

Save a DataFrame

Given the same table and Zookeeper URLs above, you can save a DataFrame to a Phoenix table using the following code

df.write \
  .format("phoenix") \
  .mode("overwrite") \
  .option("table", "TABLE1") \
  .option("zkUrl", "phoenix-server:2181") \
  .save()

Notes

  • If you want to use DataSourceV1, you can use source type "org.apache.phoenix.spark" instead of "phoenix", however this is deprecated as of connectors-1.0.0.
  • The (deprecated) functions phoenixTableAsDataFrame, phoenixTableAsRDD and saveToPhoenix all support optionally specifying a conf Hadoop configuration parameter with custom Phoenix client settings, as well as an optional zkUrl parameter for the Phoenix connection URL.
  • If zkUrl isn’t specified, it’s assumed that the “hbase.zookeeper.quorum” property has been set in the conf parameter. Similarly, if no configuration is passed in, zkUrl must be specified.
  • As of PHOENIX-5197, you can pass configurations from the driver to executors as a comma-separated list against the key phoenixConfigs i.e (PhoenixDataSource.PHOENIX_CONFIGS), for ex:
df = spark
  .sqlContext
  .read
  .format("phoenix")
  .options(Map("table" -> "Table1", "zkUrl" -> "phoenix-server:2181", 
    "phoenixConfigs" -> "hbase.client.retries.number=10,hbase.client.pause=10000"))
  .load;

This list of properties is parsed and populated into a properties map which is passed to DriverManager.getConnection(connString, propsMap). Note that the same property values will be used for both the driver and all executors and these configurations are used each time a connection is made (both on the driver and executors).

Limitations

  • Basic support for column and predicate pushdown using the Data Source API
  • The Data Source API does not support passing custom Phoenix settings in configuration, you must create the DataFrame or RDD directly if you need fine-grained configuration.
  • No support for aggregate or distinct queries as explained in our Map Reduce Integration documentation.

PageRank example

This example makes use of the Enron email data set, provided by the Stanford Network Analysis Project, and executes the GraphX implementation of PageRank on it to find interesting entities. It then saves the results back to Phoenix.

  1. Download and extract the file enron.csv.gz

  2. Create the necessary Phoenix schema

    CREATE TABLE EMAIL_ENRON(MAIL_FROM BIGINT NOT NULL, MAIL_TO BIGINT NOT NULL CONSTRAINT pk PRIMARY KEY(MAIL_FROM, MAIL_TO));
    CREATE TABLE EMAIL_ENRON_PAGERANK(ID BIGINT NOT NULL, RANK DOUBLE CONSTRAINT pk PRIMARY KEY(ID));
    
  3. Load the email data into Phoenix (assuming localhost for Zookeeper Quroum URL)

    gunzip /tmp/enron.csv.gz
    cd /path/to/phoenix/bin
    ./psql.py -t EMAIL_ENRON localhost /tmp/enron.csv
    
  4. In spark-shell, with the phoenix-client in the Spark driver classpath, run the following:

    import org.apache.spark.graphx._
    import org.apache.phoenix.spark._
    val rdd = sc.phoenixTableAsRDD("EMAIL_ENRON", Seq("MAIL_FROM", "MAIL_TO"), zkUrl=Some("localhost"))           // load from phoenix
    val rawEdges = rdd.map{ e => (e("MAIL_FROM").asInstanceOf[VertexId], e("MAIL_TO").asInstanceOf[VertexId]) }   // map to vertexids
    val graph = Graph.fromEdgeTuples(rawEdges, 1.0)                                                               // create a graph
    val pr = graph.pageRank(0.001)                                                                                // run pagerank
    pr.vertices.saveToPhoenix("EMAIL_ENRON_PAGERANK", Seq("ID", "RANK"), zkUrl = Some("localhost"))               // save to phoenix
    
  5. Query the top ranked entities in SQL

    SELECT * FROM EMAIL_ENRON_PAGERANK ORDER BY RANK DESC LIMIT 5;
    +------------------------------------------+------------------------------------------+
    |                    ID                    |                   RANK                   |
    +------------------------------------------+------------------------------------------+
    | 5038                                     | 497.2989872977676                        |
    | 273                                      | 117.18141799210386                       |
    | 140                                      | 108.63091596789913                       |
    | 458                                      | 107.2728800448782                        |
    | 588                                      | 106.11840798585399                       |
    +------------------------------------------+------------------------------------------+
    

Deprecated Usages

Load as a DataFrame directly using a Configuration object

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._

val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'

val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)

// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
val df = sqlContext.phoenixTableAsDataFrame(
  "TABLE1", Array("ID", "COL1"), conf = configuration
)

df.show

Load as an RDD, using a Zookeeper URL

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._
import org.apache.spark.rdd.RDD

val sc = new SparkContext("local", "phoenix-test")

// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
  "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
)

rdd.count()

val firstId = rdd.first()("ID").asInstanceOf[Long]
val firstCol = rdd.first()("COL1").asInstanceOf[String]

Saving RDDs to Phoenix

saveToPhoenix is an implicit method on RDD[Product], or an RDD of Tuples. The data types must correspond to the Java types Phoenix supports (language/datatypes.html)

Given a Phoenix table with the following DDL:

CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
import org.apache.spark.SparkContext
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "phoenix-test")
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))

sc
  .parallelize(dataSet)
  .saveToPhoenix(
    "OUTPUT_TEST_TABLE",
    Seq("ID","COL1","COL2"),
    zkUrl = Some("phoenix-server:2181")
  )

Back to top