Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28336

Tried running same code in local machine in IDE pycharm it running fine but issue arises when i setup all on EC2 my RDD has Json Value and convert it to data frame and show dataframe by Show method it fails to show my data frame.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Invalid
    • 2.4.3
    • None
    • Using EC2 Ubuntu 18.04.2 LTS

      Spark version : Spark 2.4.3 built for Hadoop 2.7.3

      Kafka version : kafka_2.12-2.2.1

    • Important

    Description

      I am a beginner to pyspark and I am creating a pilot project in spark i used pycharm IDE for developing my project and it runs fine on my IDE Let me explain my project I am producing JSON in Kafka topic and consuming topic in spark and converting RDD VALUE(which is i JSON) converting to data frame using this method (productInfo = sqlContext.read.json(rdd)) and working perfectly on my local machine after converting RDD to DataFrame I am displaying that DataFrame to my console using .Show() method and working fine.

      But my problem arises when I setup all this(Kafka,apache-spark) in EC2(Ubuntu 18.04.2 LTS) and tried to execute using spark-submit console stop when it reached my show() method and display nothing again starts and stops at show() method I can't figure out what is error not showing any error in console and also check if my data is coming in RDD or not it is in RDD

      My Code:

      # coding: utf-8 
      from pyspark import SparkContext
      from pyspark import SparkConf
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      from pyspark.sql import Row, DataFrame, SQLContext
      import pandas as pd
      
      def getSqlContextInstance(sparkContext):
          if ('sqlContextSingletonInstance' not in globals()):
              globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
          return globals()['sqlContextSingletonInstance']
      
      def process(time, rdd):
          print("========= %s =========" % str(time))
      
      try:
          #print("--------------Also cross check my data is present in rdd I checked by printing ----------------")
          #results = rdd.collect()
          #for result in results:
          #print(result)
      
          # Get the singleton instance of SparkSession
          sqlContext = getSqlContextInstance(rdd.context)
          productInfo = sqlContext.read.json(rdd)
      
          # problem comes here when i try to show it
          productInfo.show()
      except:
          pass
      
      if _name_ == '_main_':
          conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1")
          sc = SparkContext(conf = conf)
          sc.setLogLevel("WARN")
          sqlContext = SQLContext(sc)
          ssc = StreamingContext(sc,10)
          kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'new_topic':1})
          lines = kafkaStream.map(lambda x: x[1])
          lines.foreachRDD(process)
          #lines.pprint()
          ssc.start()
          ssc.awaitTermination()
      

       

      My console:

      ./spark-submit ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py
       19/07/10 11:13:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
       Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
       19/07/10 11:13:15 INFO SparkContext: Running Spark version 2.4.3
       19/07/10 11:13:15 INFO SparkContext: Submitted application: ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py
       19/07/10 11:13:15 INFO SecurityManager: Changing view acls to: kafka
       19/07/10 11:13:15 INFO SecurityManager: Changing modify acls to: kafka
       19/07/10 11:13:15 INFO SecurityManager: Changing view acls groups to: 
       19/07/10 11:13:15 INFO SecurityManager: Changing modify acls groups to: 
       19/07/10 11:13:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(kafka); groups with view permissions: Set(); users with modify permissions: Set(kafka); groups with modify permissions: Set()
       19/07/10 11:13:16 INFO Utils: Successfully started service 'sparkDriver' on port 41655.
       19/07/10 11:13:16 INFO SparkEnv: Registering MapOutputTracker
       19/07/10 11:13:16 INFO SparkEnv: Registering BlockManagerMaster
       19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
       19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
       19/07/10 11:13:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-33f848fe-88d7-4c8f-8440-8384e094c59c
       19/07/10 11:13:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
       19/07/10 11:13:16 INFO SparkEnv: Registering OutputCommitCoordinator
       19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
       19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
       19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
       19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
       19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
       19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
       19/07/10 11:13:16 INFO Utils: Successfully started service 'SparkUI' on port 4046.
       19/07/10 11:13:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at [http://ip-172-31-92-134.ec2.internal:4046|http://ip-172-31-92-134.ec2.internal:4046/]
       19/07/10 11:13:16 INFO Executor: Starting executor ID driver on host localhost
       19/07/10 11:13:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34719.
       19/07/10 11:13:16 INFO NettyBlockTransferService: Server created on ip-172-31-92-134.ec2.internal:34719
       19/07/10 11:13:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
       19/07/10 11:13:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
       19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-92-134.ec2.internal:34719 with 366.3 MB RAM, BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
       19/07/10 11:13:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
       19/07/10 11:13:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
       19/07/10 11:13:17 WARN AppInfo$: Can't read Kafka version from MANIFEST.MF. Possible cause: java.lang.NullPointerException
       19/07/10 11:13:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:13:18 WARN BlockManager: Block input-0-1562757198000 replicated to only 0 peer(s) instead of 1 peers
      

      This is when I am not producing in data in my kafka topic

      ========= 2019-07-10 11:13:20 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       ========= 2019-07-10 11:13:30 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       ++
       
      ++
       ++
      
      ------------------------after printing-----------------------
       ========= 2019-07-10 11:13:40 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       ++
       
      ++
       ++
      
      ------------------------after printing-----------------------
       ========= 2019-07-10 11:15:40 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       ++
       
      ++
       ++
      
      ------------------------after printing-----------------------
       19/07/10 11:15:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:15:47 WARN BlockManager: Block input-0-1562757347200 replicated to only 0 peer(s) instead of 1 peers
      

      This is when I start producing my data in kafka topic

      ========= 2019-07-10 11:15:50 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       19/07/10 11:15:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:15:52 WARN BlockManager: Block input-0-1562757352200 replicated to only 0 peer(s) instead of 1 peers
       19/07/10 11:15:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:15:57 WARN BlockManager: Block input-0-1562757357200 replicated to only 0 peer(s) instead of 1 peers
       ========= 2019-07-10 11:16:00 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       19/07/10 11:16:02 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:16:02 WARN BlockManager: Block input-0-1562757362200 replicated to only 0 peer(s) instead of 1 peers
       19/07/10 11:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:16:07 WARN BlockManager: Block input-0-1562757367400 replicated to only 0 peer(s) instead of 1 peers
       ========= 2019-07-10 11:16:10 =========
       ---------------------in function procces----------------------
       -----------------------before printing----------------------
       19/07/10 11:16:12 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:16:12 WARN BlockManager: Block input-0-1562757372400 replicated to only 0 peer(s) instead of 1 peers
       19/07/10 11:16:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
       19/07/10 11:16:17 WARN BlockManager: Block input-0-1562757377400 replicated to only 0 peer(s) instead of 1 peers
       

      I don't how to figure out can anyone help me really appreciated.

      Thank you

      Attachments

        Activity

          People

            Unassigned Unassigned
            Aditya.Takawale Aditya
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: