Details
-
Type:
Bug
-
Status: Open
-
Priority:
Major
-
Resolution: Unresolved
-
Affects Version/s: None
-
Fix Version/s: None
-
Component/s: Data Module
-
Labels:None
Description
Attempting to write data to a dataset that did not have any partitions defined failed with the following exception stack trace,
java.io.IOException: Spill failed at org.apache.crunch.impl.mr.emit.OutputEmitter.emit(OutputEmitter.java:43) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) at org.kitesdk.data.crunch.CrunchDatasets$AsKeyTable.process(CrunchDatasets.java:250) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56) at org.apache.crunch.MapFn.process(MapFn.java:34) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:98) at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:109) at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.io.IOException: Spill failed at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.checkSpillException(MapTask.java:1559) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1468) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723) at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2016) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:797) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: org.apache.avro.AvroRuntimeException: Can't compare maps! at org.apache.avro.io.BinaryData.compare(BinaryData.java:134) at org.apache.avro.io.BinaryData.compare(BinaryData.java:139) at org.apache.avro.io.BinaryData.compare(BinaryData.java:92) at org.apache.avro.io.BinaryData.compare(BinaryData.java:72) at org.apache.avro.mapred.AvroKeyComparator.compare(AvroKeyComparator.java:43) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1269) at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:99) at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:129) at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1597) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.access$900(MapTask.java:876) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer$SpillThread.run(MapTask.java:1529)
We're using the Avro format, and our Avro schema does have map fields.
Here are some comments from an internal discussion about this,
Essentially the reason is when using the CrunchDatasets.partition(...) API the goal is to group data for the same partition together. So it will use the partition strategy to generate a key for the partitions and the use that as the key when doing the MR reduce. In the case of no partition strategy it still does the reduce and has to make a Key so uses the entire payload. Since keys get shuffled and sorted they need to be comparable and Avro throws errors when trying to compare maps.
So enhancement to be made would be for Kite to potentially no-op when partition is called with a dataset that doesn't have partitions (though then each mapper would write a file) or for Kite to parse the avro generate a key based off all fields that aren't maps.