Apache Spark SQL : Running SQL Queries on DataFrame using Scala
Apache Spark is a Big data processing engine which has components like "Spark SQL", "Spark Mlib" & "Spark streaming", we generally uses Apache spark for processing big data which process in-memory, batch wise and real time, general use case is to query large data set either write map or reduce logic on data which will be used for Analytics, ETL, or prepping data for machine learning.
One Such feature in i am going to explain is Spark SQL, its an module for working on Structure data, Where Spark SQL which lets you query structure data by using DataFrame and its underlying API. So in this tutorial we shall read data from CSV load to data frame and write bunch of SQL queries in Scala.first download sample listing.csv which will be used in the tutorial.
Step 1. Creating stand alone spark session in Scala
As we know Spark session is an entry point to access various functionality provided by Spark, you create Spark instances which are hosted or create stand alone instance.in this tutorial we shall create sand alone instance for its simplicity.
val spark = SparkSession
.builder()
.appName("test")
.config("spark.master", "local")
.getOrCreate();
the following snippet create the spark session which we will be using to load data from csv.
Step 2. Load CSV file as DataFrame.
So data frame is distributed collection of data which are store row and columns just like sql table.which can be used for ETL, Aggregations, grouping and filtering, which can be constructed from HDFS, database, CVS, JSON files and many others.so now we shall read csv file and convert it to DataFrame.
var data = spark.read.format("csv").
option("header", true).load("/<path>/listings.csv").toDF();
the following Scala snippet will use spark session to read the data from csv and store to DataFrame.
Step 3. Running Spark SQL queries on data frame.
To run sql query on data frame we have to create temporary table from data frame, and using the temp table we can run sql queries.
Creating Temp table.
data.createOrReplaceTempView("listings");
Running count query.
val selectedData = spark.sql("SELECT count(*) FROM listings");
selectedData.show();
And the output will be
+--------+
|count(1)|
+--------+
| 20703|
+--------+
Running Aggregation query.
We all know the use of Aggregation is used select min or max or avg value from the records as we use typical SQL.
- Finding minimum Value
spark.sql("SELECT min(number_of_reviews) from listings").show();
spark.sql("SELECT AVG(number_of_reviews) from listings").show();
- Finding MAX Value
spark.sql("SELECT Max(number_of_reviews) from listings").show();
- Finding Average Value
spark.sql("SELECT AVG(number_of_reviews) from listings").show();
Where Clauses
where conditions are use to filter the large set of records,to focused set of records which we need using conditions.
- Filtering Null value
spark.sql("SELECT * from listings where price is not NULL ").show(3);
-
General where conditions
AND & OR operator
spark.sql("SELECT * from listings where price is not NULL AND price > 100 AND price < 200").show(3);
spark.sql("SELECT * from listings where price is not NULL OR (reviews_per_month > 0 OR reviews_per_month is not null )").show(3);
Order By and Group BY Clauses.
spark.sql("SELECT * from listings order by price desc").show(3);
spark.sql("SELECT id, MAX(number_of_reviews) from listings GROUP BY id").show(3);
Consolidate code
object Main extends App {
print("hello world");
val spark = SparkSession
.builder()
.appName("test")
.config("spark.master", "local")
.getOrCreate();
var data = spark.read.format("csv").
option("header", true).load("/home/ashrith/listings.csv").toDF();
data.createOrReplaceTempView("listings");
spark.sql("SELECT Max(number_of_reviews) from listings").show();
spark.sql("SELECT min(number_of_reviews) from listings").show();
spark.sql("SELECT AVG(number_of_reviews) from listings").show();
spark.sql("SELECT * from listings where price is not NULL ").show(3);
spark.sql("SELECT * from listings where price is not NULL AND price > 100 AND price < 200").show(3);
spark.sql("SELECT * from listings where price is not NULL OR (reviews_per_month > 0 OR reviews_per_month is not null )").show(3);
spark.sql("SELECT * from listings order by price desc").show(3);
spark.sql("SELECT id, MAX(number_of_reviews) from listings GROUP BY id").show(3);
}