Details
-
Type:
Bug
-
Status: Open
-
Priority:
Major
-
Resolution: Unresolved
-
Affects Version/s: CDH5 5.6.0
-
Fix Version/s: None
-
Component/s: Spark
-
Labels:None
-
Environment:datanode and nodemanager host number is 8.
Description
Hi,
I reported this problem at user community.
http://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/CDH-Spark-SQL-window-functions-return-wrong-result-at-cluster/m-p/38254#U38254
However I think this is not application problem, but CDH Spark bug.
So, I report to Cloudera JIRA.
- Main.scala is reproduction code.
- cdh560_result.log is the stdout result.
- fixed_expected_result.log is the expected result.
- this fixed result is created with SPARK-11080 patched cdh5.6.0 source built.
- executor_debug.log is a part of executor DEBUG level log.
- this log appears at tail.
I created attached logs with self built cdh spark package.
https://github.com/cloudera/spark/tree/cdh5-1.5.0_5.6.0
But, I confirmed same problem with the distributed rpm package.
The result in your environment might not match to cdh560_result.log.
I think result depends on executor num and spark configuration, but you can confirm some values are unexpectedly large.
If you does not reproduce, please increase rowNum and number of kinds of 'b' column.
The executor_debug.log indicates what occurs.
The column fields are 'a', 'b', 'count_a', 'count_b', and these types are String, String, Long, Long.
However generated code tries to set column 'b' value into 'count_b' column like below.
/* input[1, LongType] */ boolean isNull6 = i.isNullAt(1); long primitive7 = isNull6 ? -1L : (i.getLong(1)); if (isNull6) { mutableRow.setNullAt(3); } else { mutableRow.setLong(3, primitive7); }
This mismatch is caused because ExprID is used to find column position.
The root cause is that ExprID is not unique among executors.
And the comparision of ExprID at BoundReferernces.bindReference goes wrong.
Then, column ordinal of 'count_b' become wrong.
https://issues.apache.org/jira/browse/SPARK-11080
https://github.com/cloudera/spark/blob/cdh5-1.5.0_5.6.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala#L87
Window function uses BoundReferernces.bindReference below.
https://github.com/cloudera/spark/blob/cdh5-1.5.0_5.6.0/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala#L243