Uploaded image for project: 'ZooKeeper'
  1. ZooKeeper
  2. ZOOKEEPER-4711

a data race in org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 3.9.0
    • None
    • server
    • None

    Description

      When we run :

      mvn test -Dmaven.test.failure.ignore=true -Dtest=org.apache.zookeeper.server.watch.WatchManagerTest#testDeadWatchers -DfailIfNoTests=false -DredirectTestOutputToFile=false

      The following method in class : org.apache.zookeeper.server.watch.WatcherCleaner

      public void addDeadWatcher(int watcherBit) {
              // Wait if there are too many watchers waiting to be closed,
              // this is will slow down the socket packet processing and
              // the adding watches in the ZK pipeline.
              while (maxInProcessingDeadWatchers > 0 && !stopped && totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
                  try {
                      RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
                      long startTime = Time.currentElapsedTime();
                      synchronized (processingCompletedEvent) {
                          processingCompletedEvent.wait(100);
                      }
                      long latency = Time.currentElapsedTime() - startTime;
                      ServerMetrics.getMetrics().ADD_DEAD_WATCHER_STALL_TIME.add(latency);
                  } catch (InterruptedException e) {
                      LOG.info("Got interrupted while waiting for dead watches queue size");
                      break;
                  }
              }
              synchronized (this) {
                  
                  if (deadWatchers.add(watcherBit)) {
                      totalDeadWatchers.incrementAndGet();
                      ServerMetrics.getMetrics().DEAD_WATCHERS_QUEUED.add(1);
                      if (deadWatchers.size() >= watcherCleanThreshold) {
                          synchronized (cleanEvent) {
                              cleanEvent.notifyAll();
                          }
                      }
                  }
      
              }
          }
      @Override
          public void run() {
              while (!stopped) {
                  synchronized (cleanEvent) {
                      try {
                          // add some jitter to avoid cleaning dead watchers at the
                          // same time in the quorum
                          if (!stopped && deadWatchers.size() < watcherCleanThreshold) {
                              
                              int maxWaitMs = (watcherCleanIntervalInSeconds
                                               + ThreadLocalRandom.current().nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
                              cleanEvent.wait(maxWaitMs);
                          }
                      } catch (InterruptedException e) {
                          LOG.info("Received InterruptedException while waiting for cleanEvent");
                          break;
                      }
                  }            if (deadWatchers.isEmpty()) {
                      continue;
                  }            synchronized (this) {
                      // Clean the dead watchers need to go through all the current
                      // watches, which is pretty heavy and may take a second if
                      // there are millions of watches, that's why we're doing lazily
                      // batch clean up in a separate thread with a snapshot of the
                      // current dead watchers.
                      final Set<Integer> snapshot = new HashSet<>(deadWatchers);
                      deadWatchers.clear();
                      int total = snapshot.size();
                      LOG.info("Processing {} dead watchers", total);
                      cleaners.schedule(new WorkRequest() {
                          @Override
                          public void doWork() throws Exception {
                              long startTime = Time.currentElapsedTime();
                              listener.processDeadWatchers(snapshot);
                              long latency = Time.currentElapsedTime() - startTime;
                              LOG.info("Takes {} to process {} watches", latency, total);
                              ServerMetrics.getMetrics().DEAD_WATCHERS_CLEANER_LATENCY.add(latency);
                              ServerMetrics.getMetrics().DEAD_WATCHERS_CLEARED.add(total);
                              totalDeadWatchers.addAndGet(-total);
                              synchronized (processingCompletedEvent) {
                                  processingCompletedEvent.notifyAll();
                              }
                          }
                      });
                  }
              }
              LOG.info("WatcherCleaner thread exited");
          }

      As we can see, the two methods visist deadWatchers Object by different thread. Thread in run() is read operation on deadWachers and Thread in addDeadWatcher is write operation on deadWachers. This causes a data race without any lock.

      Attachments

        Activity

          People

            Unassigned Unassigned
            xiaoheipangzi lujie
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: