Uploaded image for project: 'Livy'
  1. Livy
  2. LIVY-995

JsonParseException is thrown when closing Livy session when using python profile

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • 0.9.0
    • REPL
    • None

    Description

      Startup  and enable spark.python.profile.

      ./bin/pyspark --master local --conf spark.python.profile=true
      

       
      Execute code related to Spark RDD. When pyspark is closed, Pyspark will output profile information.

      >>> rdd = sc.parallelize(range(100)).map(str)
      >>> rdd.count()
      [Stage 0:>                                                          (0 + 1) / 1]
      100
      >>>
      ============================================================
      Profile of RDD<id=1>
      ============================================================
               244 function calls (241 primitive calls) in 0.001 seconds
       
         Ordered by: internal time, cumulative time
       
         ncalls  tottime  percall  cumtime  percall filename:lineno(function)
            101    0.000    0.000    0.000    0.000 rdd.py:1237(<genexpr>)
            101    0.000    0.000    0.000    0.000 util.py:72(wrapper)
              1    0.000    0.000    0.000    0.000 serializers.py:255(dump_stream)
              1    0.000    0.000    0.000    0.000 serializers.py:213(load_stream)
              2    0.000    0.000    0.000    0.000 \{built-in method builtins.sum}
              1    0.000    0.000    0.001    0.001 worker.py:607(process)
              1    0.000    0.000    0.000    0.000 context.py:549(f)
              1    0.000    0.000    0.000    0.000 \{built-in method _pickle.dumps}
              1    0.000    0.000    0.000    0.000 serializers.py:561(read_int)
              1    0.000    0.000    0.000    0.000 serializers.py:568(write_int)
            4/1    0.000    0.000    0.000    0.000 rdd.py:2917(pipeline_func)
              1    0.000    0.000    0.000    0.000 serializers.py:426(dumps)
              1    0.000    0.000    0.000    0.000 rdd.py:1237(<lambda>)
              1    0.000    0.000    0.000    0.000 serializers.py:135(load_stream)
              2    0.000    0.000    0.000    0.000 rdd.py:1072(func)
              1    0.000    0.000    0.000    0.000 rdd.py:384(func)
              1    0.000    0.000    0.000    0.000 util.py:67(fail_on_stopiteration)
              1    0.000    0.000    0.000    0.000 serializers.py:151(_read_with_length)
              2    0.000    0.000    0.000    0.000 context.py:546(getStart)
              3    0.000    0.000    0.000    0.000 rdd.py:416(func)
              1    0.000    0.000    0.000    0.000 serializers.py:216(_load_stream_without_unbatching)
              2    0.000    0.000    0.000    0.000 \{method 'write' of '_io.BufferedWriter' objects}
              1    0.000    0.000    0.000    0.000 \{method 'read' of '_io.BufferedReader' objects}
              1    0.000    0.000    0.000    0.000 \{built-in method _operator.add}
              1    0.000    0.000    0.000    0.000 \{built-in method builtins.hasattr}
              3    0.000    0.000    0.000    0.000 \{built-in method builtins.len}
              1    0.000    0.000    0.000    0.000 \{built-in method _struct.unpack}
              1    0.000    0.000    0.000    0.000 rdd.py:1226(<lambda>)
              1    0.000    0.000    0.000    0.000 \{method 'close' of 'generator' objects}
              1    0.000    0.000    0.000    0.000 \{built-in method from_iterable}
              1    0.000    0.000    0.000    0.000 \{built-in method _struct.pack}
              1    0.000    0.000    0.000    0.000 \{method 'disable' of '_lsprof.Profiler' objects}
              1    0.000    0.000    0.000    0.000 \{built-in method builtins.iter}
      

       
      This is because Spark register show_profiles when Spark exit in profile.py 

          def add_profiler(self, id, profiler):
              """Add a profiler for RDD/UDF `id`"""
              if not self.profilers:
                  if self.profile_dump_path:
                      atexit.register(self.dump_profiles, self.profile_dump_path)
                  else:
                      atexit.register(self.show_profiles)
       
              self.profilers.append([id, profiler, False])
      

       
       
      For Livy session, Livy does not convert the output to JSON format. And throw below exception:
       

      com.fasterxml.jackson.core.JsonParseException: Unexpected character ('=' (code 61)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
       at [Source: (String)"============================================================"; line: 1, column: 2]
      	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
      	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
      	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635)
      	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1952)
      	at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
      	at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:355)
      	at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2023)
      	at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1491)
      	at org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse(JsonMethods.scala:33)
      	at org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse$(JsonMethods.scala:20)
      	at org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:71)
      	at org.apache.livy.repl.PythonInterpreter.$anonfun$sendRequest$1(PythonInterpreter.scala:288)
      	at scala.Option.map(Option.scala:230)
      	at org.apache.livy.repl.PythonInterpreter.sendRequest(PythonInterpreter.scala:287)
      	at org.apache.livy.repl.PythonInterpreter.sendShutdownRequest(PythonInterpreter.scala:277)
      	at org.apache.livy.repl.ProcessInterpreter.close(ProcessInterpreter.scala:62)
      	at org.apache.livy.repl.PythonInterpreter.close(PythonInterpreter.scala:234)
      	at org.apache.livy.repl.Session.$anonfun$close$1(Session.scala:232)
      	at org.apache.livy.repl.Session.$anonfun$close$1$adapted(Session.scala:232)
      	at scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158)
      	at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
      	at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
      	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
      	at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158)
      	at org.apache.livy.repl.Session.close(Session.scala:232)
      	at org.apache.livy.toolkit.IpynbBootstrap.close(IpynbBootstrap.scala:246)
      	at org.apache.livy.toolkit.IpynbBootstrap$.main(IpynbBootstrap.scala:72)
      	at org.apache.livy.toolkit.IpynbBootstrap.main(IpynbBootstrap.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:764) 

       Livy sendShutdownRequest in PythonInterpreter.scala

        override protected def sendShutdownRequest(): Unit = {
          sendRequest(Map(
            "msg_type" -> "shutdown_request",
            "content" -> ()
          )).foreach { case rep =>
            warn(f"process failed to shut down while returning $rep")
          }
        }
      
        private def sendRequest(request: Map[String, Any]): Option[JValue] = {
          stdin.println(write(request))
          stdin.flush()
      
          Option(stdout.readLine()).map { case line =>
            parse(line)
          }
        }
      

      Livy does not convert stdout to json when exit in fake_shell.py 

      def shutdown_request(_content):
          sys.exit()
      
      msg_type_router = {
          'execute_request': execute_request,
          'shutdown_request': shutdown_request,
      }
      
                  try:
                      handler = msg_type_router[msg_type]
                  except KeyError:
                      LOG.error('unknown message type: %s', msg_type)
                      continue
      
                  response = handler(content)
      

      Attachments

        Activity

          People

            jianzhenwu Jianzhen Wu
            jianzhenwu Jianzhen Wu
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1h 20m
                1h 20m