skywalking-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [skywalking] wu-sheng commented on a change in pull request #7153: Support prepare and save metrics concurrency
Date Thu, 24 Jun 2021 09:14:18 GMT

wu-sheng commented on a change in pull request #7153:
URL: https://github.com/apache/skywalking/pull/7153#discussion_r657774208



##########
File path: oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
##########
@@ -92,73 +105,165 @@ public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig)
{
         }
     }
 
-    private void extractDataAndSave(IBatchDAO batchDAO) {
+    @VisibleForTesting
+    void extractDataAndSave(IBatchDAO batchDAO) {
+
         if (log.isDebugEnabled()) {
             log.debug("Extract data and save");
         }
 
         long startTime = System.currentTimeMillis();
+        HistogramMetrics.Timer allTimer = allLatency.createTimer();
+        // Use `stop` as a control signal to make fail-fast in the persistence process.
+        AtomicBoolean stop = new AtomicBoolean(false);
 
+        BlockingBatchQueue<PrepareRequest> prepareQueue = new BlockingBatchQueue(this.maxSyncoperationNum);
         try {
-            HistogramMetrics.Timer timer = prepareLatency.createTimer();
+            List<PersistenceWorker<? extends StorageData>> persistenceWorkers
= new ArrayList<>();
+            persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
+            persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
 
-            try {
-                List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
-                persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
-                persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
+            // CountDownLatch makes sure all prepare threads done eventually.
+            CountDownLatch prepareStageCountDownLatch = new CountDownLatch(persistenceWorkers.size());
 
-                persistenceWorkers.forEach(worker -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("extract {} worker data and save", worker.getClass().getName());
-                    }
+            /*
+                Here we use `this.prepareRequests` as a FIFO queue, for a producer-consumer
model.
+                The prepareExecutorService is for making the executable requests ready, and
batchExecutorService consumes from the queue to flush.
+                When the number of metrics produced reaches maxSyncoperationNum or the prepare
stage is done,
+                the data would flush into the storage.
 
-                    worker.buildBatchRequests(prepareRequests);
+                When the consumer ends or an exception occurs in the middle, the entire process
is completed.
+             */
 
-                    worker.endOfRound(System.currentTimeMillis() - lastTime);
-                });
+            persistenceWorkers.forEach(worker -> {
+                prepareExecutorService.submit(() -> {
+                    if (stop.get()) {
+                        prepareStageCountDownLatch.countDown();
+                        return;
+                    }
 
-                if (debug) {
-                    log.info("build batch persistence duration: {} ms", System.currentTimeMillis()
- startTime);
-                }
-            } finally {
-                timer.finish();
-            }
+                    HistogramMetrics.Timer timer = prepareLatency.createTimer();
+                    try {
+                        if (log.isDebugEnabled()) {
+                            log.debug("extract {} worker data and save", worker.getClass().getName());
+                        }
+                        List<PrepareRequest> innerPrepareRequests = new ArrayList<>(5000);
+                        worker.buildBatchRequests(innerPrepareRequests);
+                        prepareQueue.putMany(innerPrepareRequests);
+                        worker.endOfRound(System.currentTimeMillis() - lastTime);
+                    } finally {
+                        timer.finish();
+                        prepareStageCountDownLatch.countDown();
+                    }
+                });
+            });
 
-            HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
-            try {
-                List<List<PrepareRequest>> partitions = Lists.partition(prepareRequests,
maxSyncoperationNum);
-                CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
-                for (final List<PrepareRequest> partition : partitions) {
-                    executorService.submit(() -> {
+            Future<?> batchFuture = batchExecutorService.submit(() -> {

Review comment:
       I asked too. https://github.com/apache/skywalking/pull/7153#discussion_r657171941




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message