Using Spark SQL
Spark SQL lets you query structured data inside Spark programs using either SQL or using DataFrames.
The entry point to all Spark SQL functionality is the SQLContext class or one of its descendants. You create a SQLContext from a SparkContext. With an SQLContext, you can create a DataFrame from an RDD, a Hive table, or a data source.
To work with data stored in Hive from Spark applications, construct a HiveContext, which inherits from SQLContext. With a HiveContext, you can access tables in the Hive Metastore and write queries using HiveQL. If you use spark-shell, a HiveContext is already created for you and is available as the sqlContext variable. To access Hive tables you must also perform the steps in Accessing Hive from Spark.
For detailed information on Spark SQL, see the Spark SQL and DataFrame Guide.
Continue reading:
Querying a Files Into a DataFrame
You can use SQL to directly read JSON or Parquet files into a DataFrame:- JSON
df = sqlContext.sql("SELECT * FROM json.`input dir`")
- Parquet
df = sqlContext.sql("SELECT * FROM parquet.`input dir`")
Spark SQL Example
- At the command line, copy the Hue sample_07 and sample_08 CSV files to HDFS:
$ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_07.csv /user/hdfs $ hdfs dfs -put HUE_HOME/apps/beeswax/data/sample_08.csv /user/hdfs
where HUE_HOME defaults to /opt/cloudera/parcels/CDH/lib/hue (parcel installation) or /usr/lib/hue (package installation). - Start spark-shell:
$ spark-shell
- Create Hive tables sample_07 and sample_08:
scala> sqlContext.sql("CREATE TABLE sample_07 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile") scala> sqlContext.sql("CREATE TABLE sample_08 (code string,description string,total_emp int,salary int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TextFile")
- In Beeline, show the Hive tables:
[0: jdbc:hive2://hostname.com:> show tables; +------------+--+ | tab_name | +------------+--+ | sample_07 | | sample_08 | +------------+--+
- Load the data in the CSV files into the tables:
scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_07.csv' OVERWRITE INTO TABLE sample_07") scala> sqlContext.sql("LOAD DATA INPATH '/user/hdfs/sample_08.csv' OVERWRITE INTO TABLE sample_08")
- Create DataFrames containing the contents of the sample_07 and sample_08 tables:
scala> val df_07 = sqlContext.sql("SELECT * from sample_07") scala> val df_08 = sqlContext.sql("SELECT * from sample_08")
- Show all rows in df_07 with salary greater than 150,000:
scala> df_07.filter(df_07("salary") > 150000).show()
The output should be:+-------+--------------------+---------+------+ | code| description|total_emp|salary| +-------+--------------------+---------+------+ |11-1011| Chief executives| 299160|151370| |29-1022|Oral and maxillof...| 5040|178440| |29-1023| Orthodontists| 5350|185340| |29-1024| Prosthodontists| 380|169360| |29-1061| Anesthesiologists| 31030|192780| |29-1062|Family and genera...| 113250|153640| |29-1063| Internists, general| 46260|167270| |29-1064|Obstetricians and...| 21340|183600| |29-1067| Surgeons| 50260|191410| |29-1069|Physicians and su...| 237400|155150| +-------+--------------------+---------+------+
- Create the DataFrame df_09 by joining df_07 and df_08, retaining only the code and description columns.
scala> val df_09 = df_07.join(df_08, df_07("code") === df_08("code")).select(df_07.col("code"),df_07.col("description")) scala> df_09.show()
The new DataFrame looks like:+-------+--------------------+ | code| description| +-------+--------------------+ |00-0000| All Occupations| |11-0000|Management occupa...| |11-1011| Chief executives| |11-1021|General and opera...| |11-1031| Legislators| |11-2011|Advertising and p...| |11-2021| Marketing managers| |11-2022| Sales managers| |11-2031|Public relations ...| |11-3011|Administrative se...| |11-3021|Computer and info...| |11-3031| Financial managers| |11-3041|Compensation and ...| |11-3042|Training and deve...| |11-3049|Human resources m...| |11-3051|Industrial produc...| |11-3061| Purchasing managers| |11-3071|Transportation, s...| |11-9011|Farm, ranch, and ...| |11-9012|Farmers and ranchers| +-------+--------------------+
- Save DataFrame df_09 as the Hive table sample_09:
scala> df_09.write.saveAsTable("sample_09")
- In Beeline, show the Hive tables:
[0: jdbc:hive2://hostname.com:> show tables; +------------+--+ | tab_name | +------------+--+ | sample_07 | | sample_08 | | sample_09 | +------------+--+
from pyspark import SparkContext, SparkConf, HiveContext if __name__ == "__main__": # create Spark context with Spark configuration conf = SparkConf().setAppName("Data Frame Join") sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) df_07 = sqlContext.sql("SELECT * from sample_07") df_07.filter(df_07.salary > 150000).show() df_08 = sqlContext.sql("SELECT * from sample_08") tbls = sqlContext.sql("show tables") tbls.show() df_09 = df_07.join(df_08, df_07.code == df_08.code).select(df_07.code,df_07.description) df_09.show() df_09.write.saveAsTable("sample_09") tbls = sqlContext.sql("show tables") tbls.show()Instead of displaying the tables using Beeline, the show tables query is run using the Spark SQL API.
Ensuring HiveContext Enforces Secure Access
To ensure that HiveContext enforces ACLs, enable the HDFS-Sentry plug-in as described in Synchronizing HDFS ACLs and Sentry Permissions. Column-level access control for access from Spark SQL is not supported by the HDFS-Sentry plug-in.
<< Using Spark Streaming | ©2016 Cloudera, Inc. All rights reserved | Using Spark MLlib >> |
Terms and Conditions Privacy Policy |