From 7c2767426ea41464eeccf18718b8e739b5851995 Mon Sep 17 00:00:00 2001 From: Rick Bernotas Date: Thu, 2 Mar 2017 16:38:09 -0600 Subject: [PATCH] LIVY-322 - subprocess.call() commands in a PySpark snippet can potentially insert raw text into the sys_stdout in the fake_shell main(). This will then fail to be correctly parsed by PythonInterpreter in the sendRequest, as it will trigger a JsonParseException that is not caught. Added code to catch the JsonParseException and then retry reads of stdout until a valid line of JSON is reached, or 100 retries have been attempted. --- .../com/cloudera/livy/repl/PythonInterpreter.scala | 30 +++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala index 2195d0e..f5c0f45 100644 --- a/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala +++ b/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala @@ -42,6 +42,8 @@ import com.cloudera.livy.rsc.BaseProtocol import com.cloudera.livy.rsc.driver.BypassJobWrapper import com.cloudera.livy.sessions._ +import com.fasterxml.jackson.core.JsonParseException + // scalastyle:off println object PythonInterpreter extends Logging { @@ -254,7 +256,33 @@ private class PythonInterpreter(process: Process, gatewayServer: GatewayServer, stdin.flush() Option(stdout.readLine()).map { case line => - parse(line) + try { + parse(line) + } + catch { + // LIVY-322 - If a statement puts raw text in the stdout without fake_shell parsing it, + // retry the readLine up to 100 times in an attempt to find the next parsable line of JSON. + case e: JsonParseException => retryRead(100) + } + } + } + + // LIVY-322 - Method for retrying reads from stdout if a JsonParseException is encountered + // due to non-JSON formatted text in the stdout. Accepts a param of the number of times + // to retry the read, and recurses until either it finds a parsable line of JSON, or + // exhausts the maximum level of retries. If the maximum number of retries is exhausted, + // an Exception is thrown. + private def retryRead(maxRetries:Int): JValue = { + if (maxRetries > 0) { + try { + parse(stdout.readLine()) + } catch { + case e: JsonParseException => + retryRead(maxRetries - 1) + } + } + else { + throw new Exception("Livy is unable to find valid JSON in the response from fake_shell. Please recreate the session.") } } -- 1.8.1.3