Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13751

On the broker side, OAUTHBEARER is not compatible with other SASL mechanisms

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 3.0.1
    • None
    • security
    • None

    Description

      Phenomenon:

       SASL/OAUTHBEARER, whether implemented by default or customized by the user, is not compatible with other SASL mechanisms.

       

      case1:

      kafka_server_jaas_oauth.conf

      KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin"
        user_admin="admin"
        user_alice="alice"; 
      
         org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
      
         org.apache.kafka.common.security.scram.ScramLoginModule required
        username="admin"
        password="admin_scram";
      }; 

       server.properties

      advertised.listeners=SASL_PLAINTEXT://publicIp:8779,SASL_SSL://publicIp:8889,OAUTH://publicIp:8669 
      listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OAUTH:SASL_SSL
      sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER

      Error when starting kafka:
      server.log

      [2022-03-16 13:18:42,658] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
      org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
              at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
              at kafka.network.Processor.<init>(SocketServer.scala:724)
              at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
              at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
              at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
              at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
              at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
              at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
              at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
              at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
              at kafka.network.SocketServer.startup(SocketServer.scala:122)
              at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
              at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
              at kafka.Kafka$.main(Kafka.scala:82)
              at kafka.Kafka.main(Kafka.scala)
      Caused by: java.lang.IllegalArgumentException: Must supply exactly 1 non-null JAAS mechanism configuration (size was 3)
              at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler.configure(OAuthBearerUnsecuredValidatorCallbackHandler.java:117)
              at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:139)
              ... 17 more
      [2022-03-16 13:18:42,662] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
      [2022-03-16 13:18:42,664] INFO [SocketServer brokerId=1] Stopping socket server request processors (kafka.network.SocketServer) 

      The default implementation class of oauthbearer's `sasl.server.callback.handler.class` is OAuthBearerUnsecuredValidatorCallbackHandler.
      In the OAuthBearerUnsecuredValidatorCallbackHandler#configure(...) method, the jaasConfigEntries parameter is verified.
      What I want to say is that the verification logic here is completely reasonable, but the jaasConfigEntries passed in from the upper layer should not contain the AppConfigurationEntry of other loginModules. There are several other codes for the check of the same keyword "Must supply exactly 1 non-null JAAS mechanism configuration".

      Rootcause elaborates later.

      By the way, at present, KafkaServer allows the same LoginModule to be configured multiple times in kafkaJaasConfigFile, which will also lead to the phenomenon of case1.

      kafka_server_jaas_oauth.conf eg:

      KafkaServer {
         org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
      
        org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
       
      }; 

       

      case2:

      On the basis of case1, modify the default implementation of oauthbearer's `sasl.server.callback.handler.class`

       server.properties add new configuration

      listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
      listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler
      listener.name.oauth.oauthbearer.sasl.server.callback.handler.class=us.zoom.mq.security.oauth.AsyncMQOAuthCallbackHandler 

      The specific implementation class code is omitted here.

      When we start kafka again, we still encounter exceptions
      server.log

      [2022-03-16 14:47:46,037] ERROR Unrecognized SASL Login callback (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
      javax.security.auth.callback.UnsupportedCallbackException: Unrecognized SASL Login callback
              at org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler.handle(AbstractLogin.java:105)
              at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
              at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
              at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
              at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
              at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
              at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
              at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
              at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
              at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
              at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
              at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
              at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
              at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
              at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
              at kafka.network.Processor.<init>(SocketServer.scala:724)
              at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
              at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
              at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
              at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
              at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
              at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
              at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
              at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
              at kafka.network.SocketServer.startup(SocketServer.scala:122)
              at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
              at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
              at kafka.Kafka$.main(Kafka.scala:82)
              at kafka.Kafka.main(Kafka.scala)
      [2022-03-16 14:47:46,048] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
      org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
              at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:172)
              at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
              at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
              at kafka.network.Processor.<init>(SocketServer.scala:724)
              at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
              at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
              at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
              at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:214)
              at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:211)
              at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
              at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
              at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:211)
              at kafka.network.SocketServer.startup(SocketServer.scala:122)
              at kafka.server.KafkaServer.startup(KafkaServer.scala:266)
              at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
              at kafka.Kafka$.main(Kafka.scala:82)
              at kafka.Kafka.main(Kafka.scala)
      Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
              at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
              at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
              at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:726)
              at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:665)
              at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:663)
              at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
              at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:663)
              at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:574)
              at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
              at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
              at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:112)
              at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
              ... 17 more
      [2022-03-16 14:47:46,049] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
      [2022-03-16 14:47:46,052] INFO [SocketServer brokerId=1] Stopping socket server request processors (kafka.network.SocketServer) 

      By analyzing the stack, it is easy to see the problem:
      1) AbstractLogin.login should not call the OAuthBearerLoginModule.login method, only OAuthBearerRefreshingLogin.login can call the OAuthBearerLoginModule.login method.
      2) OAuthBearerLoginModule.login should not call the AbstractLogin$DefaultLoginCallbackHandler.handle method, it should call the OAuthBearerUnsecuredLoginCallbackHandler.handle method.

       

      Summary

      Analyze the stack, read the source code, the root cause is:
      In the ChannelBuilders#create method, if the corresponding JaasContext of each SASL mechanism is returned by the JaasContext#defaultContext() method, the JaasContext.configuration and JaasContext.configurationEntries fields both contain the AppConfigurationEntry of other mechanisms. And then in the subsequent code stack, resulting in case1 or case2.
      In fact, you can find from some source codes of Kafka saslModule that Kafka has this original intention: JaasContext per saslMechanism should be pure.
      1) In the ChannelBuilders#create(...) method, when constructing the SaslChannelBuilder, `Map<String, JaasContext> jaasContexts` is passed in
      2) In the SaslChannelBuilder#configure(...) method, after constructing LoginManager, LoginManager is also stored with `Map<String, LoginManager> loginManagers`.
      The keys of these two maps are saslMechanism, but their values ​​both contain all saslMechanism information. Affected by this behavior, jaasConfigEntries in OAuthBearerUnsecuredValidatorCallbackHandler also contains other saslMechanism information, other implementation classes of sasl.server.callback.handler.class have the same problem.

      In fact, I have reported an issue a long time ago, and KAFKA-13422 has the same RC and Solution as this issue. 
      https://issues.apache.org/jira/browse/KAFKA-13422

      For more detailed RC analysis and solutions you can read the last two parts of the description of KAFKA-13422: Root Cause and Suggestion & Solutions.

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            RivenSun RivenSun
            Ismael Juma Ismael Juma
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: