sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [sling-org-apache-sling-distribution-journal] branch master updated: SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move PackageHandling into BookKeeper. Extract editable from CommandPoller. (#47)
Date Tue, 30 Jun 2020 16:53:47 GMT
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 757422e  SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move PackageHandling
into BookKeeper. Extract editable from CommandPoller. (#47)
757422e is described below

commit 757422e40ea79fa05140af991c1f627cd0c96784
Author: Christian Schneider <chris@die-schneider.net>
AuthorDate: Tue Jun 30 18:53:38 2020 +0200

    SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move PackageHandling into BookKeeper.
Extract editable from CommandPoller. (#47)
---
 .../journal/impl/subscriber/BookKeeper.java        | 54 ++++++++++----------
 .../journal/impl/subscriber/BookKeeperConfig.java  | 59 ++++++++++++++++++++++
 .../journal/impl/subscriber/CommandPoller.java     | 18 +------
 .../impl/subscriber/DistributionSubscriber.java    | 30 +++++------
 .../journal/impl/subscriber/BookKeeperTest.java    | 15 ++++--
 .../journal/impl/subscriber/CommandPollerTest.java | 39 +++++---------
 6 files changed, 125 insertions(+), 90 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 18fe950..49e44bb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -45,6 +46,7 @@ import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -87,43 +89,40 @@ public class BookKeeper implements Closeable {
     private final PackageHandler packageHandler;
     private final EventAdmin eventAdmin;
     private final Consumer<PackageStatusMessage> sender;
-    private final boolean editable;
-    private final int maxRetries;
+    private final BookKeeperConfig config;
     private final boolean errorQueueEnabled;
 
     private final PackageRetries packageRetries = new PackageRetries();
     private final LocalStore statusStore;
     private final LocalStore processedOffsets;
-    private final String subAgentName;
-    private final String subSlingId;
     private final GaugeService<Integer> retriesGauge;
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
             DistributionMetricsService distributionMetricsService,
-            PackageHandler packageHandler,
+            Packaging packaging,
+            DistributionPackageBuilder packageBuilder,
             EventAdmin eventAdmin,
             Consumer<PackageStatusMessage> sender,
-            String subAgentName,
-            String subSlingId,
-            boolean editable, 
-            int maxRetries) { 
+            BookKeeperConfig config) { 
+        String pkgType = packageBuilder.getType();
+        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.getPackageHandling());
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
-        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name="
+ subAgentName;
+        this.sender = sender;
+        this.config = config;
+        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name="
+ config.getSubAgentName();
         this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries
of current package", packageRetries::getSum);
         this.resolverFactory = resolverFactory;
         this.distributionMetricsService = distributionMetricsService;
-        this.sender = sender;
-        this.subAgentName = subAgentName;
-        this.subSlingId = subSlingId;
-        this.editable = editable;
-        this.maxRetries = maxRetries;
         // Error queues are enabled when the number
         // of retry attempts is limited ; disabled otherwise
-        this.errorQueueEnabled = (maxRetries >= 0);
-        this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, subAgentName);
-        this.processedOffsets = new LocalStore(resolverFactory, STORE_TYPE_PACKAGE, subAgentName);
+        this.errorQueueEnabled = (config.getMaxRetries() >= 0);
+        this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName());
+        this.processedOffsets = new LocalStore(resolverFactory, STORE_TYPE_PACKAGE, config.getSubAgentName());
+        log.info("Started bookkeeper {} with package builder {} editable {} maxRetries {}",
+                config.getSubAgentName(), pkgType, config.isEditable(), config.getMaxRetries());
     }
     
     /**
@@ -149,7 +148,7 @@ public class BookKeeper implements Closeable {
         try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER))
{
             packageHandler.apply(importerResolver, pkgMsg);
-            if (editable) {
+            if (config.isEditable()) {
                 storeStatus(importerResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED,
offset, pkgMsg.getPubAgentName()));
             }
             storeOffset(importerResolver, offset);
@@ -157,7 +156,7 @@ public class BookKeeper implements Closeable {
             distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
             distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis()
- createdTime), TimeUnit.MILLISECONDS);
             packageRetries.clear(pkgMsg.getPubAgentName());
-            Event event = new ImportedEvent(pkgMsg, subAgentName).toEvent();
+            Event event = new ImportedEvent(pkgMsg, config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
         } catch (DistributionException | LoginException | IOException | RuntimeException
e) {
             failure(pkgMsg, offset, e);
@@ -176,8 +175,8 @@ public class BookKeeper implements Closeable {
         MDC.put("pub-agent-name", pubAgentName);
         MDC.put("distribution-message-type", pkgMsg.getReqType().name());
         MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
-        MDC.put("sub-sling-id", subSlingId);
-        MDC.put("sub-agent-name", subAgentName);
+        MDC.put("sub-sling-id", config.getSubSlingId());
+        MDC.put("sub-agent-name", config.getSubAgentName());
     }
     
     /**
@@ -194,13 +193,14 @@ public class BookKeeper implements Closeable {
 
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
-        if (errorQueueEnabled && retries >= maxRetries) {
+        if (errorQueueEnabled && retries >= config.getMaxRetries()) {
             log.warn("Failed to import distribution package {} at offset {} after {} retries,
removing the package.", 
                     pkgMsg.getPkgId(), offset, retries);
             removeFailedPackage(pkgMsg, offset);
         } else {
             packageRetries.increase(pubAgentName);
-            String msg = format("Error processing distribution package %s. Retry attempts
%s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite");
+            String retriesSt = errorQueueEnabled ? Integer.toString(config.getMaxRetries())
: "infinite";
+            String msg = format("Error processing distribution package %s. Retry attempts
%s/%s.", pkgMsg.getPkgId(), retries, retriesSt);
             throw new DistributionException(msg, e);
         }
     }
@@ -210,7 +210,7 @@ public class BookKeeper implements Closeable {
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
-            if (editable) {
+            if (config.isEditable()) {
                 storeStatus(resolver, new PackageStatus(Status.REMOVED, offset, pkgMsg.getPubAgentName()));
             }
             storeOffset(resolver, offset);
@@ -263,8 +263,8 @@ public class BookKeeper implements Closeable {
     
     private void sendStatusMessage(PackageStatus status) {
         PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder()
-                .subSlingId(subSlingId)
-                .subAgentName(subAgentName)
+                .subSlingId(config.getSubSlingId())
+                .subAgentName(config.getSubAgentName())
                 .pubAgentName(status.pubAgentName)
                 .offset(status.offset)
                 .status(status.status)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
new file mode 100644
index 0000000..334f24f
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+public class BookKeeperConfig {
+    private final String subAgentName;
+    private final String subSlingId;
+    private final boolean editable;
+    private final int maxRetries;
+    private final PackageHandling packageHandling;
+
+    public BookKeeperConfig(String subAgentName,
+            String subSlingId,
+            boolean editable, 
+            int maxRetries,
+            PackageHandling packageHandling) {
+                this.subAgentName = subAgentName;
+                this.subSlingId = subSlingId;
+                this.editable = editable;
+                this.maxRetries = maxRetries;
+                this.packageHandling = packageHandling;
+    }
+    
+    public String getSubAgentName() {
+        return subAgentName;
+    }
+    
+    public String getSubSlingId() {
+        return subSlingId;
+    }
+    
+    public boolean isEditable() {
+        return editable;
+    }
+    
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+    
+    public PackageHandling getPackageHandling() {
+        return packageHandling;
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 703a9d1..45011a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -40,28 +40,14 @@ public class CommandPoller implements Closeable {
     private final Closeable poller;
     private final AtomicLong clearOffset = new AtomicLong(-1);
 
-    public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId,
String subAgentName, boolean editable) {
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId,
String subAgentName) {
         this.subSlingId = subSlingId;
         this.subAgentName = subAgentName;
-        if (editable) {
-
-            /*
-             * We currently only support commands requiring editable mode.
-             * As an optimisation, we don't register a poller for non
-             * editable subscribers.
-             *
-             * When supporting commands independent from editable mode,
-             * this optimisation will be removed.
-             */
-
-            poller = messagingProvider.createPoller(
+        this.poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
                     create(ClearCommand.class, this::handleCommandMessage)
                     );
-        } else {
-            poller = null;
-        }
     }
     
     public boolean isCleared(long offset) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b739dcb..1999a2f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -124,7 +124,7 @@ public class DistributionSubscriber {
     
     private Closeable packagePoller;
 
-    private CommandPoller commandPoller;
+    private Optional<CommandPoller> commandPoller;
 
     private BookKeeper bookKeeper;
 
@@ -167,15 +167,12 @@ public class DistributionSubscriber {
         }
         
         queueNames = getNotEmpty(config.agentNames());
-        int maxRetries = config.maxRetries();
-        boolean editable = config.editable();
         pkgType = requireNonNull(packageBuilder.getType());
 
-        ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling());
-        PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor);
         Consumer<PackageStatusMessage> sender = messagingProvider.createSender(topics.getStatusTopic());
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler,
eventAdmin,
-                sender, subAgentName, subSlingId, editable, maxRetries);
+        BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(),
config.maxRetries(), config.packageHandling());
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging,
packageBuilder, eventAdmin,
+                sender, bkConfig);
         
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
@@ -183,20 +180,20 @@ public class DistributionSubscriber {
         packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest,
assign,
                 HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage));
 
-        commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName,
editable);
+        if (config.editable()) {
+            commandPoller = Optional.of(new CommandPoller(messagingProvider, topics, subSlingId,
subAgentName));
+        } else {
+            commandPoller = Optional.empty();
+        }
 
         queueThread = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", subAgentName));
 
         int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
         announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(topics.getDiscoveryTopic()),
bookKeeper,
-                maxRetries, config.editable(), announceDelay);
+                config.maxRetries(), config.editable(), announceDelay);
 
-        boolean errorQueueEnabled = (maxRetries >= 0);
-        String msg = format(
-                "Started Subscriber agent %s at offset %s, subscribed to agent names %s with
package builder %s editable %s maxRetries %s errorQueueEnabled %s",
-                subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries,
errorQueueEnabled);
-        LOG.info(msg);
+        LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}",
subAgentName, startOffset, queueNames);
     }
 
     private Set<String> getNotEmpty(String[] agentNames) {
@@ -214,7 +211,8 @@ public class DistributionSubscriber {
          */
 
         IOUtils.closeQuietly(announcer, bookKeeper, 
-                packagePoller, commandPoller);
+                packagePoller);
+        commandPoller.ifPresent(IOUtils::closeQuietly);
         subscriberIdle.ifPresent(IOUtils::closeQuietly);
         running = false;
         try {
@@ -363,7 +361,7 @@ public class DistributionSubscriber {
     }
 
     private boolean shouldSkip(long offset) {
-        boolean cleared = commandPoller.isCleared(offset);
+        boolean cleared = commandPoller.isPresent() ? commandPoller.get().isCleared(offset)
: false;
         Decision decision = waitPrecondition(offset);
         return cleared || decision == Decision.SKIP;
     }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 3afa938..e2688a7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -23,11 +23,13 @@ import static org.junit.Assert.assertThat;
 
 import java.util.function.Consumer;
 
+import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,9 +49,6 @@ public class BookKeeperTest {
     private DistributionMetricsService distributionMetricsService;
 
     @Mock
-    private PackageHandler packageHandler;
-
-    @Mock
     private EventAdmin eventAdmin;
 
     @Mock
@@ -57,10 +56,16 @@ public class BookKeeperTest {
 
     private BookKeeper bookKeeper;
 
+    @Mock
+    private Packaging packaging;
+
+    @Mock
+    private DistributionPackageBuilder packageBuilder;
+
     @Before
     public void before() {
-        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler,
eventAdmin, sender,
-                "subAgentName", "subSlingId", true, 10);
+        BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true,
10, PackageHandling.Extract);
+        bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging,
packageBuilder, eventAdmin, sender, bkConfig);
     }
 
     @Test
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 9cc3503..a596dcf 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -83,7 +82,7 @@ public class CommandPollerTest {
 
     @Test
     public void testSkipped() throws DistributionException, InterruptedException, IOException
{
-        createCommandPoller(true);
+        createCommandPoller();
         
         commandHandler.handle(info, commandMessage(SUBSLING_ID_OTHER, SUB_AGENT_OTHER, 1L));
         assertSkipped();
@@ -93,11 +92,15 @@ public class CommandPollerTest {
         
         commandHandler.handle(info, commandMessage(SUB_SLING_ID, SUB_AGENT_OTHER, 1L));
         assertSkipped();
+        
+        commandPoller.close();
+        
+        verify(poller).close();
     }
     
     @Test
     public void testClearOffsets() throws DistributionException, InterruptedException, IOException
{
-        createCommandPoller(true);
+        createCommandPoller();
 
         commandHandler.handle(info, commandMessage(10L));
         assertClearedUpTo(10);
@@ -108,6 +111,10 @@ public class CommandPollerTest {
         // Clearing lower offset should not change cleared offset
         commandHandler.handle(info, commandMessage(1L));
         assertClearedUpTo(11);
+        
+        commandPoller.close();
+        
+        verify(poller).close();
     }
 
     private void assertClearedUpTo(int max) {
@@ -118,24 +125,6 @@ public class CommandPollerTest {
 
     }
 
-    @Test
-    public void testEditable() throws DistributionException, InterruptedException, IOException
{
-        createCommandPoller(true);
-        
-        commandPoller.close();
-        
-        verify(poller).close();
-    }
-    
-    @Test
-    public void testNotEditable() throws DistributionException, InterruptedException, IOException
{
-        createCommandPoller(false);
-        
-        commandPoller.close();
-        
-        verify(poller, never()).close();
-    }
-
     private void assertSkipped() {
         assertThat(commandPoller.isCleared(1), equalTo(false));
     }
@@ -152,16 +141,14 @@ public class CommandPollerTest {
                 .build();
     }
 
-    private void createCommandPoller(boolean editable) {
+    private void createCommandPoller() {
         when(clientProvider.createPoller(
                 Mockito.anyString(),
                 Mockito.eq(Reset.earliest), 
                 handlerCaptor.capture()))
             .thenReturn(poller);
-        commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME,
editable);
-        if (editable) {
-            commandHandler = handlerCaptor.getValue().getHandler();
-        }
+        commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME);
+        commandHandler = handlerCaptor.getValue().getHandler();
     }
 
 }


Mime
View raw message