Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-6587

Qpid crash when trying to convert a message

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 0.32
    • qpid-java-6.0
    • Broker-J
    • Linux localhost 3.17.8-gentoo-r1 #6 SMP Sat May 16 15:11:24 GST 2015 x86_64 Intel(R) Core(TM) i5-2540M CPU @ 2.60GHz GenuineIntel GNU/Linux

    • Important

    Description

      Related Issues: QPID-5536

      This issue may be related to converting message format.

      To generate this problem, create a new port (ie AMQP_0_10) and give it ampq_0-10, assign it another port number (for example 5673).
      Setup the security provider to Anonymous (create a provider for this).
      Create a queue "examples".
      Send few messages to "examples" to AMQP port.
      Try to read from AMQP_0_10.

      The broker will crash end exit.

      In my case, the writing was done from Java, and reading from Python.

      These code can be used:

      For maven:

      <dependency>
      <groupId>org.apache.qpid</groupId>
      <artifactId>qpid-jms-client</artifactId>
      <version>0.2.0</version>
      </dependency>

      import java.util.Properties;

      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.DeliveryMode;
      import javax.jms.Destination;
      import javax.jms.ExceptionListener;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.naming.Context;
      import javax.naming.InitialContext;

      public class Hello
      {
      private static final String USER = "guest";
      private static final String PASSWORD = "guest";

      public static void main(String[] args)
      {
      try
      {
      Properties prop = new Properties();
      prop.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
      prop.put("connectionfactory.localhost", "amqp://localhost:5672/");
      prop.put("queue.myQueue", "examples");

      Context context = new InitialContext(prop);
      ConnectionFactory factory = (ConnectionFactory) context.lookup("localhost");
      Destination queue = (Destination) context.lookup("myQueue");

      Connection connection = factory.createConnection(USER, PASSWORD);

      connection.setExceptionListener(new MyExceptionListener());
      connection.start();

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      for (int i = 0; i <= 10; i++)

      { MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage("Hello world from Java " + i + " !"); Thread.sleep(1000L); producer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); }

      connection.close();
      } catch (Exception exp)

      { System.out.println("Caught exception, exiting."); exp.printStackTrace(); System.exit(1); }

      }

      private static class MyExceptionListener implements ExceptionListener
      {

      public void onException(JMSException exception)

      { System.out.println("Connection ExceptionListener fired, exiting."); exception.printStackTrace(); System.exit(1); }

      }
      }

      #!/usr/bin/env python
      from qpid.messaging import *

      url = "localhost:5673"
      queue = "examples"
      connection = Connection(url)
      try:
      connection.open()
      session = connection.session()
      while True:
      receiver = session.receiver(queue)
      message = receiver.fetch()
      print "got message"
      print message.content
      #session.acknowledge()
      except MessagingError,m:
      print "exeption: " , m

      connection.close()

      Note: If the queue is empty and we try to read, no writing can be done. The client (java), will throw an exception:

      javax.jms.JMSException: java.lang.NullPointerException [condition = amqp:connection:forced]
      at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.getRemoteError(AmqpAbstractResource.java:234)
      at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.remotelyClosed(AmqpAbstractResource.java:170)
      at org.apache.qpid.jms.provider.amqp.AmqpAbstractResource.processRemoteClose(AmqpAbstractResource.java:291)
      at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:734)
      at org.apache.qpid.jms.provider.amqp.AmqpProvider.access$1500(AmqpProvider.java:87)
      at org.apache.qpid.jms.provider.amqp.AmqpProvider$16.run(AmqpProvider.java:667)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
      at java.util.concurrent.FutureTask.run(FutureTask.java:262)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

      If the queue has messages, the broker will crash when we consuming messages (python).
      The following trace generated by the broker:

      2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469] (transport.Connection) - RECV: [conn:29464cc5] ch=0 MessageFlow(destination=0, unit=MESSAGE, value=1)
      2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469] (transport.Session) - identify: ch=0, commandId=5
      2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469] (transport.Session) - ssn:"fb01ce9e-fc06-455a-abd7-c1c7df6b8664:0" ch=0 processed([5,5]) 4 4
      2015-06-13 11:48:57,262 DEBUG [IoReceiver - /127.0.0.1:48469] (transport.Session) - ssn:"fb01ce9e-fc06-455a-abd7-c1c7df6b8664:0" processed:

      {[0, 4]}

      2015-06-13 11:48:57,262 DEBUG [pool-1-thread-15] (transport.Connection) - FLUSH: [conn:29464cc5]
      2015-06-13 11:48:57,262 DEBUG [pool-1-thread-15] (transport.Connection) - FLUSH: [conn:29464cc5]
      ########################################################################
      #

      1. Unhandled Exception java.lang.NullPointerException in Thread pool-1-thread-15
        #
      2. Exiting
        #
        ########################################################################
        java.lang.NullPointerException
        at java.util.HashMap.putMapEntries(HashMap.java:500)
        at java.util.HashMap.<init>(HashMap.java:489)
        at org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0$MessageHeader_1_0.getHeadersAsMap(MessageMetaData_1_0.java:586)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertMetaData(MessageConverter_1_0_to_v0_10.java:181)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertToStoredMessage(MessageConverter_1_0_to_v0_10.java:70)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:60)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:42)
        at org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:218)
        at org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:469)
        at org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1323)
        at org.apache.qpid.server.queue.AbstractQueue.attemptDelivery(AbstractQueue.java:2069)
        at org.apache.qpid.server.queue.AbstractQueue.processQueue(AbstractQueue.java:2237)
        at org.apache.qpid.server.queue.QueueRunner$1.run(QueueRunner.java:77)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:360)
        at org.apache.qpid.server.queue.QueueRunner.run(QueueRunner.java:68)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
        2015-06-13 11:48:57,264 ERROR [pool-1-thread-15] (server.Main) - Uncaught exception, shutting down.
        java.lang.NullPointerException
        at java.util.HashMap.putMapEntries(HashMap.java:500)
        at java.util.HashMap.<init>(HashMap.java:489)
        at org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0$MessageHeader_1_0.getHeadersAsMap(MessageMetaData_1_0.java:586)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertMetaData(MessageConverter_1_0_to_v0_10.java:181)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convertToStoredMessage(MessageConverter_1_0_to_v0_10.java:70)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:60)
        at org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10.convert(MessageConverter_1_0_to_v0_10.java:42)
        at org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:218)
        at org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:469)
        at org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1323)
        at org.apache.qpid.server.queue.AbstractQueue.attemptDelivery(AbstractQueue.java:2069)
        at org.apache.qpid.server.queue.AbstractQueue.processQueue(AbstractQueue.java:2237)
        at org.apache.qpid.server.queue.QueueRunner$1.run(QueueRunner.java:77)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:360)
        at org.apache.qpid.server.queue.QueueRunner.run(QueueRunner.java:68)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
        2015-06-13 11:48:57,265 INFO [pool-1-thread-16] (subscription.state) - [Queue Delivery] [sub:1(vh(/default)/qu(examples)] SUB-1003 : State : SUSPENDED

      The reciever will produce this error (python):

      exeption: connection aborted
      Traceback (most recent call last):
      File "examples/api/hello", line 27, in <module>
      connection.close()
      File "<string>", line 6, in close
      File "/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py", line 345, in close
      ssn.close(timeout=timeout)
      File "<string>", line 6, in close
      File "/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py", line 795, in close
      self.sync(timeout=timeout)
      File "<string>", line 6, in sync
      File "/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py", line 786, in sync
      if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
      File "/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py", line 596, in _ewait
      result = self.connection._ewait(lambda: self.error or predicate(), timeout)
      File "/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py", line 234, in _ewait
      self.check_error()
      File "/home/mansour/workspace/sources/qpid-python-0.32/qpid/messaging/endpoints.py", line 227, in check_error
      raise e
      qpid.messaging.exceptions.ConnectionError: connection aborted

      Attachments

        Activity

          People

            rgodfrey Robert Godfrey
            mansour Mansour Al Akeel
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: