I have Spark Context, SQL context, Hive context already!
Spark session is a unified entry point of a spark application from Spark 2.0. It provides a way to interact with various spark?s functionality with a lesser number of constructs. Instead of having a spark context, hive context, SQL context, now all of it is encapsulated in a Spark session.
Prior Spark 2.0, Spark Context was the entry point of any spark application and used to access all spark features and needed a sparkConf which had all the cluster configs and parameters to create a Spark Context object. We could primarily create just RDDs using Spark Context and we had to create specific spark contexts for any other spark interactions. For SQL SQLContext, hive HiveContext, streaming Streaming Application. In a nutshell, Spark session is a combination of all these different contexts. Internally, Spark session creates a new SparkContext for all the operations and also all the above-mentioned contexts can be accessed using the SparkSession object.
How do I create a Spark session?
A Spark Session can be created using a builder pattern.
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName(“SparkSessionExample”) .master(“local”) .config(“spark.sql.warehouse.dir”, “target/spark-warehouse”).enableHiveSupport().getOrCreate
The spark session builder will try to get a spark session if there is one already created or create a new one and assigns the newly created SparkSession as the global default. Note that enableHiveSupport here is similar to creating a HiveContext and all it does is enables access to Hive metastore, Hive serdes, and Hive udfs.
Note that, we don?t have to create a spark session object when using spark-shell. It is already created for us with the variable spark.
scala> sparkres1: org.apache.spark.sql.SparkSession = [email protected]
We can access spark context and other contexts using the spark session object.
scala> spark.sparkContextres2: org.apache.spark.SparkContext = [email protected]> spark.sqlContextres3: org.apache.spark.sql.SQLContext = [email protected]
Accessing Spark?s configuration:
We can still access the spark?s configurations using spark session the same way as we used using spark conf .
Why do I need Spark session when I already have Spark context?
A part of the answer would be that it unifies all the different contexts in spark and avoids the developer to worry about creating difference contexts. But apart from this big advantage, the developers of spark have tried to solve the problem when there are multiple users using the same spark context.
Let?s say we have multiple users accessing the same notebook environment which had shared spark context and the requirement was to have an isolated environment sharing the same spark context. Prior to 2.0, the solution to this was to create multiple spark contexts ie spark context per isolated environment or users and is an expensive operation(generally 1 per JVM). But with the introduction of the spark session, this issue has been addressed.
Note: we can have multiple spark contexts by setting spark.driver.allowMultipleContexts to true . But having multiple spark contexts in the same jvm is not encouraged and is not considered as a good practice as it makes it more unstable and crashing of 1 spark context can affect the other.
How do I create multiple sessions?
Spark gives a straight forward API to create a new session which shares the same spark context.spark.newSession() creates a new spark session object. If we look closely at the hash of the spark and session2 , they both are different. In contrast, the underneath spark context is the same.
scala> val session2 = spark.newSession()session2: org.apache.spark.sql.SparkSession = [email protected]> sparkres22: org.apache.spark.sql.SparkSession = [email protected]> spark.sparkContextres26: org.apache.spark.SparkContext = [email protected]> session2.sparkContextres27: org.apache.spark.SparkContext = [email protected]
Also, we can verify that the spark session gives a unified view of all the contexts and isolation of configuration and environment. We can directly query without creating a SQL Context like we used and run the queries similarly. Let?s say we have a table called people_session1 .This table will be only visible in the session spark . Let’s say we created a new session session2 .These tables won?t be visible for when we try to access them and also we can create another table with the same name without affecting the table in spark session.
scala> people.createOrReplaceTempView(“people_session1”)scala> spark.sql(“show tables”).show()+——–+—————+———–+|database| tableName|isTemporary|+——–+—————+———–+| |people_session1| true|+——–+—————+———–+scala> spark.catalog.listTables.show()+—————+——–+———–+———+———–+| name|database|description|tableType|isTemporary|+—————+——–+———–+———+———–+|people_session1| null| null|TEMPORARY| true|+—————+——–+———–+———+———–+scala> session2.sql(“show tables”).show()+——–+———+———–+|database|tableName|isTemporary|+——–+———+———–++——–+———+———–+scala> session2.catalog.listTables.show()+—-+——–+———–+———+———–+|name|database|description|tableType|isTemporary|+—-+——–+———–+———+———–++—-+——–+———–+———+———–+
This isolation is for the configurations as well. Both sessions can have their own configs.
scala> spark.conf.get(“spark.sql.crossJoin.enabled”)res21: String = truescala> session2.conf.get(“spark.sql.crossJoin.enabled”)res25: String = false
Get the existing configurations:
scala> spark.conf.set(“spark.sql.shuffle.partitions”, 100)scala> spark.conf.getAllres55: Map[String,String] = Map(spark.driver.host -> 19e0778ea843, spark.driver.port -> 38121, spark.repl.class.uri -> spark://19e0778ea843:38121/classes, spark.jars -> “”, spark.repl.class.outputDir -> /tmp/spark-cfe820cd-b2f1-4d23-9c9a-3ee42bc78e01/repl-fae1a516-761a-4f31-b957-f5860882478f, spark.sql.crossJoin.enabled -> true, spark.app.name -> Spark shell, spark.ui.showConsoleProgress -> true, spark.executor.id -> driver, spark.submit.deployMode -> client, spark.master -> local[*], spark.home -> /opt/spark, spark.notebook.name -> SparkSessionSimpleZipExample, spark.sql.catalogImplementation -> hive, spark.app.id -> local-1553489583142, spark.sql.shuffle.partitions -> 100)
To set any configurations, we can either set the configs when we create our spark session using the .config option or use the set method.
scala> spark.conf.get(“spark.sql.crossJoin.enabled”)res4: String = falsescala> spark.conf.set(“spark.sql.crossJoin.enabled”, “true”)scala> spark.conf.get(“spark.sql.crossJoin.enabled”)res6: String = true
Some common operations:
- Create a dataframe from a Seq.
val emp = Seq((101, “Amy”, Some(2)))val employee = spark.createDataFrame(emp).toDF(“employeeId”,”employeeName”,”managerId”)
2) Create a Dataset
scala> val employeeDs = spark.createDataset(emp).as[Employee]employeeDs: org.apache.spark.sql.Dataset[Employee] = [employeeId: int, employeeName: string … 1 more field]
3)Read data from a source: Read basically gives access to the dataframe reader which can be used to read from multiple sources such as csv, json, avro, JDBC, and many other 3rd party data source API implementations.
scala> val df = spark.read.option(“header”,”true”).csv(“src/main/resources/Characters.csv”)
4) Accessing the catalog and listenerManager.
The catalog provides information about the underlying databases, tables, functions and lot more for the current session.
scala> spark.catalog.listColumns(“people_session1”).show+——+———–+——–+——–+———–+——–+| name|description|dataType|nullable|isPartition|isBucket|+——+———–+——–+——–+———–+——–+| id| null| int| false| false| false||gender| null| string| true| false| false|| alive| null| boolean| false| false| false|+——+———–+——–+——–+———–+——–+
The listenerManager can be used to register custom QueryExecutionListener for execution metrics. It is still marked as experimental though.
5) Spark implicits
Spark session provides with spark.implicits._ which is 1 of the most useful imports in all of the spark packages which comes in handy with a lot of implicit methods for converting Scala objects to Datasets and some other handy utils.
import spark.implicits.StringToColumngot.select($”id”)import spark.implicits.symbolToColumngot.select(‘id)import spark.implicits.newProductEncoderimport spark.implicits.localSeqToDatasetHolderval people = Seq((1,”male”,false),(2,”female”,true)).toDF(“id”,”gender”,”alive”)
6) Create UDF?s
val capitalizeAndRemoveSpaces = spark.udf.register(“capitalizeAndRemoveSpaces”, capAndRemoveSpaces(_: String): String)
7) SQL handler to run SQL queries and a SQL context.
8) Utility method to calculate the time taken to run some function.
scala> spark.time(spark.catalog.cacheTable(“people_session1”))Time taken: 9 ms
9) The table method gives a way of converting a hive table into a dataframe.
val peopleFromTable = spark.table(“people_session1”)
10) The close and stop methods are provided to stop the underlying spark context.
scala> session2.closescala> session2.sparkContext.isStoppedres5: Boolean = true
Also, note that closing/stopping session2 will kill spark spark session as well because the underlying spark context is the same.
scala> spark.sparkContext.isStoppedres6: Boolean = true
As always, Thanks for reading! Please do share the article, if you liked it. Any comments or suggestions are welcome! Check out my other articles here.