Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-6067

OutOfMemoryError when expiring big amount of topic messages

    XMLWordPrintableJSON

Details

    Description

      There is a problem in

      org.apache.activemq.broker.region.Topic.expireMessagesTask
      

      When there are big amount of topic messages that are going to be expired, this expireMessagesTask loads all of the messages to memory. This causes

      2015-11-24 11:05:46.359 WARN  [ActiveMQ Broker[JmsEngineActivemqBroker] Scheduler] [Topic] Failed to browse Topic: test-topic
      java.lang.OutOfMemoryError: Java heap space
      	at oracle.sql.BLOB.getBytes(BLOB.java:204)
      	at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
      	at oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
      	at org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
      	at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
      	at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      	at java.lang.reflect.Method.invoke(Method.java:597)
      	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
      	at org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
      	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
      	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
      	at $Proxy14.doRecover(Unknown Source)
      	at org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
      	at org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
      	at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
      	at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
      	at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
      	at org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
      	at java.util.TimerThread.mainLoop(Timer.java:512)
      	at java.util.TimerThread.run(Timer.java:462)
      

      The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a short look to source code, the same problem could be also with 5.12.1.

      Test case:

      • run ActiveMQ broker with JDBC persistency
      • create subscription to a topic, but do not receive the messages
      • send enough number of messages with short TimeToLive
      • when expireMessagesTask is scheduled, it tries to load all of the messages and causes the OutOfMemoryError

      It would be fine if

      org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
      

      will be updated like this:

      public void recover(final MessageRecoveryListener listener) throws Exception {
        // Get all the Message ids out of the database.
        TransactionContext c = persistenceAdapter.getTransactionContext();
        try {
          c = persistenceAdapter.getTransactionContext();
          adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
            public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
              if (listener.hasSpace()) {
                Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                msg.getMessageId().setBrokerSequenceId(sequenceId);
                return listener.recoverMessage(msg);
              } else {
                logger.debug("Recovery limit of the messages has exceeded.");
                return false;
              }                    
            }
      
            public boolean recoverMessageReference(String reference) throws Exception {
              if (listener.hasSpace()) {
                return listener.recoverMessageReference(new MessageId(reference));
              } else {
                logger.debug("Recovery limit of the message references has exceeded.");
                return false;
              }
            }
          });
        } catch (SQLException e) {
          JDBCPersistenceAdapter.log("JDBC Failure: ", e);
          throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
        } finally {
          c.close();
        }
      }
      

      But I am not sure if this limitation is the best way, because there will be some messages that should be expired, but need to wait. So better solution might be to do this job in more separated transactions.

      Attachments

        Activity

          People

            cshannon Christopher L. Shannon
            phavran Petr Havránek
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: