Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-7050

ConsumeJMS is not yielded in case of exception

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.10.0
    • 1.12.0, 1.11.4
    • Extensions
    • None

    Description

      If any exception happens when ConsumerJMS tries to read messages, the process tries again immediately.

        try {
                  consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
                      @Override
                      public void accept(final JMSResponse response) {
                          if (response == null) {
                              return;
                          }
      
                          FlowFile flowFile = processSession.create();
                          flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
      
                          final Map<String, String> jmsHeaders = response.getMessageHeaders();
                          final Map<String, String> jmsProperties = response.getMessageProperties();
      
                          flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
                          flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
                          flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
      
                          processSession.getProvenanceReporter().receive(flowFile, destinationName);
                          processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
                          processSession.transfer(flowFile, REL_SUCCESS);
                          processSession.commit();
                      }
                  });
              } catch(Exception e) {
                  consumer.setValid(false);
                  throw e; // for backward compatibility with exception handling in flows
              }
          }
      

      It should call context.yield in exception block. Notice PublishJMS is yielded in the same scenario. It is requires to do in the ConsumeJMS processor only.

      Attachments

        Issue Links

          Activity

            People

              gardellajuanpablo Gardella Juan Pablo
              gardellajuanpablo Gardella Juan Pablo
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h