diff --git a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java index 132c0c0..263486e 100644 --- a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java +++ b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java @@ -355,6 +355,23 @@ public abstract class PipeMapRed { Reporter reporter = Reporter.NULL;//dummy reporter startOutputThreads(collector, reporter); } + checkExitValue(); + if (outThread_ != null) { + outThread_.join(joinDelay_); + } + if (errThread_ != null) { + errThread_.join(joinDelay_); + } + if (outerrThreadsThrowable != null) { + throw new RuntimeException(outerrThreadsThrowable); + } + } catch (InterruptedException e) { + //ignore + } + } + + void checkExitValue() { + try{ int exitVal = sim.waitFor(); // how'd it go? if (exitVal != 0) { @@ -366,15 +383,6 @@ public abstract class PipeMapRed { + " in " + PipeMapRed.class.getName()); } } - if (outThread_ != null) { - outThread_.join(joinDelay_); - } - if (errThread_ != null) { - errThread_.join(joinDelay_); - } - if (outerrThreadsThrowable != null) { - throw new RuntimeException(outerrThreadsThrowable); - } } catch (InterruptedException e) { //ignore } @@ -572,7 +580,8 @@ public abstract class PipeMapRed { waitOutputThreads(); } catch (IOException io) { LOG.warn(StringUtils.stringifyException(io)); - } + checkExitValue(); + } if (sim != null) sim.destroy(); logprintln("mapRedFinished"); } catch (RuntimeException e) {