Spark SQL Joins: Beware With Tables Order

I decided to try how the combination of Excel -> Mondrian (XMLA) -> Spark SQL will be working. How to set up and configure this “trinity” – this is a different topic (I will post it in this blog as well). But here I would like to share some results.

I don’t remember exactly where did I get the Foodmart database for MySQL from, but here I published a link where you can get it. Here you will find a script to create a MySQL database, and a sqoop script to import the data into hive (run “create database foodmart” first).

I installed a VM with HDP 2.2 stack following the instruction and launched the Spark stand-alone cluster and then launched the thrift server using the command

export HIVE_SERVER2_THRIFT_PORT=10001
./start-thriftserver.sh --master spark://myvmserver:7077 --conf spark.executor.memory=6000m --conf spark.driver.memory=1024m

I dedicated 6GB of RAM for the executor, which is more than enough to handle requests for the Foodmart database which fits into 50MB SQL-dump file.

I configured Mondrian to use the Spark SQL with mondrian-xmla-spike (as a lightweight web application), by getting and compiling the Mondrian code from github (I used version 3.9) and putting all the jar files to the lib directory of the web application. The datasources.xml contained the following connection string:

<DataSourceInfo>Provider=mondrian;Jdbc=jdbc:hive2://myvmserver:10001/foodmart;JdbcUser=Ihor_Bobak;JdbcPassword=mypassword;JdbcDrivers=com.simba.hive.jdbc4.HS2Driver</DataSourceInfo>

Simba JDBC driver for Hive is used, but the connection is made to Spark SQL (port 10001 in my case).

So, let us connect in Excel to mondrian ans see some data from Spark:

So far so good. But let us now do the following:

And here is what happens with Spark SQL:

The SQL statement is next (let us format is a little bit):

SELECT `time_by_day`.`the_year` `c0`
,`product_class`.`product_family` `c1`
,SUM(`sales_fact_1997`.`unit_sales`) `m0`
,SUM(`sales_fact_1997`.`store_cost`) `m1`
,SUM(`sales_fact_1997`.`store_sales`) `m2`
,COUNT(`sales_fact_1997`.`product_id`) `m3`
,COUNT(DISTINCT `sales_fact_1997`.`customer_id`) `m4`
,SUM((
CASE
WHEN `sales_fact_1997`.`promotion_id` = 0
THEN 0
ELSE `sales_fact_1997`.`store_sales`
END
)) `m5`
FROM `foodmart`.`time_by_day` `time_by_day`
CROSS JOIN `foodmart`.`sales_fact_1997` `sales_fact_1997`
CROSS JOIN `foodmart`.`product_class` `product_class`
CROSS JOIN `foodmart`.`product` `product`
WHERE (
`product`.`product_class_id` = `product_class`.`product_class_id`
AND (
`sales_fact_1997`.`time_id` = `time_by_day`.`time_id`
AND `sales_fact_1997`.`product_id` = `product`.`product_id`
)
)
GROUP BY `time_by_day`.`the_year`
,`product_class`.`product_family`

 

And here is what we see on the linux Machine:

All memory is taken, all 4 cores are busy.

If we run the same SQL on the MySQL database, we have this:

The result is immediate.

The version of Spark is 1.2.2 in my case: this is the one which is installed on the HDP 2.2 using Ambari 2.0 .

OK, let us make the same experiment with the same database using Spark 1.3.1 (the latest one as for 03.06.2013).

Here you are:

Nothing different from 1.2.2

Logs from the failing executor on the machine with 1.2.2 and 1.3.1 are similar. Here is the log from 1.2.2:

15/06/03 10:33:36 ERROR Executor: Exception in task 2.0 in stage 1225.0 (TID 75530)
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.util.LineReader.<init>(LineReader.java:140)
at org.apache.hadoop.mapreduce.lib.input.SplitLineReader.<init>(SplitLineReader.java:37)
at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:127)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:234)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:211)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:99)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
at org.apache.spark.rdd.CartesianRDD$$anonfun$compute$1.apply(CartesianRDD.scala:76)
at org.apache.spark.rdd.CartesianRDD$$anonfun$compute$1.apply(CartesianRDD.scala:75)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.joins.HashJoin$$anon$1.fetchNext(HashJoin.scala:87)
at org.apache.spark.sql.execution.joins.HashJoin$$anon$1.hasNext(HashJoin.scala:66)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:616)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:616)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:278)

I’ve made a little application on Scala that runs


val query = hiveContext.sql(".... (the query) ...")

query.explain(true)

And here is what I noticed.

If we run a query

1

it will fail with building a good plan:

2

But as soon as we change the order of the tables

3

everything goes fine:

4

For me the conclusion is next:  Spark SQL is still missing necessary features which are available in relational SQL engines (like MySQL, SQL Server, Oracle and others).

MySQL did not care about order of the tables and successfully detected that A points to B, B points to C, and if we have all conditions in where clause – their order doesn’t matter.

Spark SQL doesn’t handle this:  if you specified tables in wrong order, it will fail building a good execution plan and will blow up with OutOfMemory  exception.

2 Comments

  1. nem

    Hi – did you raise a spark jira for this?

    Reply
  2. Ihor Bobak (Post author)

    I did. but seems as they simply don’t care.

    Reply

Leave a Reply to Ihor Bobak Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>