Accessing Avro Data Files From Spark SQL Applications
Spark SQL supports loading and saving DataFrames from and to a variety of data sources. With the spark-avro library, you can process data encoded in the Avro format using Spark.
The spark-avro library supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark. The library automatically performs the schema conversion. Spark SQL reads the data and converts it to Spark's internal representation; the Avro conversion is performed only during reading and writing data.
By default, when pointed at a directory, read methods silently skip any files that do not have the .avro extension. To include all files, set the avro.mapred.ignore.inputs.without.extension property to false. See Configuring Spark Applications.
Continue reading:
- Writing Compressed Data Files
- Accessing Partitioned Data Files
- Specifying Record Name and Namespace
- Spark SQL
- Avro to Spark SQL Conversion
- Spark SQL to Avro Conversion
- Limitations
- API Examples
Writing Compressed Data Files
sqlContext.setConf("spark.sql.avro.compression.codec","codec")
Accessing Partitioned Data Files
The spark-avro library supports writing and reading partitioned data. As you do when writing Parquet, pass the columns by which to partition to the writer. For examples, see Writing Partitioned Data and Reading Partitioned Data.Specifying Record Name and Namespace
Specify the record name and namespace to use when writing to disk by passing recordName and recordNamespace as optional parameters. For an example, see Specifying a Record Name.Spark SQL
sqlContext.sql("CREATE TEMPORARY TABLE table_name USING com.databricks.spark.avro OPTIONS (path "input dir")) df = sqlContext.sql("SELECT * FROM table_name")
Avro to Spark SQL Conversion
The spark-avro library supports conversion for all Avro data types:
- boolean -> BooleanType
- int -> IntegerType
- long -> LongType
- float -> FloatType
- double -> DoubleType
- bytes -> BinaryType
- string -> StringType
- record -> StructType
- enum -> StringType
- array -> ArrayType
- map -> MapType
- fixed -> BinaryType
- union(int, long) -> LongType
- union(float, double) -> DoubleType
- union(any, null) -> any
All doc, aliases, and other fields are stripped when they are loaded into Spark.
Spark SQL to Avro Conversion
- BooleanType -> boolean
- IntegerType -> int
- LongType -> long
- FloatType -> float
- DoubleType -> double
- BinaryType -> bytes
- StringType -> string
- StructType -> record
- ArrayType -> array
- MapType -> map
- ByteType -> int
- ShortType -> int
- DecimalType -> string
- BinaryType -> bytes
- TimestampType -> long
Limitations
- Enumerated types are erased - Avro enumerated types become strings when they are read into Spark because Spark does not support enumerated types.
- Unions on output - Spark writes everything as unions of the given type along with a null option.
- Avro schema changes - Spark reads everything into an internal representation. Even if you just read and then write the data, the schema for the output is different.
- Spark schema reordering - Spark reorders the elements in its schema when writing them to disk so that the elements being partitioned on are the last elements. For an example, see Writing Partitioned Data .
API Examples
This section provides examples of using the spark-avro API in all supported languages.
Scala Examples
The easiest way to work with Avro data files in Spark applications is by using the DataFrame API. The spark-avro library includes avro methods in SQLContext for reading and writing Avro files:
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) // The Avro records are converted to Spark types, filtered, and // then written back out as Avro records val df = sqlContext.read.avro("input dir") df.filter("age > 5").write.avro("output dir")
You can also specify "com.databricks.spark.avro" in the format method:
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.avro").load("input dir") df.filter("age > 5").write.format("com.databricks.spark.avro").save("output dir")
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) // configuration to use deflate compression sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") sqlContext.setConf("spark.sql.avro.deflate.level", "5") val df = sqlContext.read.avro("input dir") // writes out compressed Avro records df.write.avro("output dir")
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val df = Seq( (2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating") df.write.partitionBy("year", "month").avro("output dir")
-rw-r--r-- 3 hdfs supergroup 0 2015-11-03 14:58 /tmp/output/_SUCCESS drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2011 drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2011/month=7 -rw-r--r-- 3 hdfs supergroup 229 2015-11-03 14:58 /tmp/output/year=2011/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012 drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012/month=7 -rw-r--r-- 3 hdfs supergroup 231 2015-11-03 14:58 /tmp/output/year=2012/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro drwxr-xr-x - hdfs supergroup 0 2015-11-03 14:58 /tmp/output/year=2012/month=8 -rw-r--r-- 3 hdfs supergroup 246 2015-11-03 14:58 /tmp/output/year=2012/month=8/part-r-00000-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.avro("input dir") df.printSchema() df.filter("year = 2011").collect().foreach(println)
root |-- title: string (nullable = true) |-- rating: double (nullable = true) |-- year: integer (nullable = true) |-- month: integer (nullable = true) [Git,2.0,2011,7]
import com.databricks.spark.avro._ val sqlContext = new SQLContext(sc) val df = sqlContext.read.avro("input dir") val name = "AvroTest" val namespace = "com.cloudera.spark" val parameters = Map("recordName" -> name, "recordNamespace" -> namespace) df.write.options(parameters).avro("output dir")
Java Example
import org.apache.spark.sql.*; SQLContext sqlContext = new SQLContext(sc); // Creates a DataFrame from a file DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load("input dir"); // Saves the subset of the Avro records read in df.filter("age > 5").write().format("com.databricks.spark.avro").save("output dir");
Python Example
# Creates a DataFrame from a directory df = sqlContext.read.format("com.databricks.spark.avro").load("input dir") # Saves the subset of the Avro records read in df.where("age > 5").write.format("com.databricks.spark.avro").save("output dir")
<< Accessing Data Stored in Amazon S3 | ©2016 Cloudera, Inc. All rights reserved | Accessing Parquet Files From Spark SQL Applications >> |
Terms and Conditions Privacy Policy |