From dev-return-1969-archive-asf-public=cust-asf.ponee.io@servicecomb.apache.org Thu Feb 8 09:49:59 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 0A525180676 for ; Thu, 8 Feb 2018 09:49:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EE018160C4A; Thu, 8 Feb 2018 08:49:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A27B3160C3D for ; Thu, 8 Feb 2018 09:49:56 +0100 (CET) Received: (qmail 86792 invoked by uid 500); 8 Feb 2018 08:49:55 -0000 Mailing-List: contact dev-help@servicecomb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@servicecomb.apache.org Delivered-To: mailing list dev@servicecomb.apache.org Received: (qmail 86781 invoked by uid 99); 8 Feb 2018 08:49:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Feb 2018 08:49:55 +0000 From: GitBox To: dev@servicecomb.apache.org Subject: [GitHub] WillemJiang closed pull request #122: SCB-239 omega stateless Message-ID: <151807979474.23243.18142450854833585475.gitbox@gitbox.apache.org> WillemJiang closed pull request #122: SCB-239 omega stateless URL: https://github.com/apache/incubator-servicecomb-saga/pull/122 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java index 49c17560..1e6f21be 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/Command.java @@ -17,7 +17,7 @@ package org.apache.servicecomb.saga.alpha.core; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; import java.util.Date; diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java index 2d51a74f..a52ebe5a 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/EventScanner.java @@ -18,13 +18,14 @@ package org.apache.servicecomb.saga.alpha.core; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; +import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; import java.lang.invoke.MethodHandles; -import java.util.Date; import java.util.concurrent.ScheduledExecutorService; import org.slf4j.Logger; @@ -37,6 +38,7 @@ private final ScheduledExecutorService scheduler; private final TxEventRepository eventRepository; private final CommandRepository commandRepository; + private final TxTimeoutRepository timeoutRepository; private final OmegaCallback omegaCallback; private final int eventPollingInterval; @@ -46,12 +48,13 @@ public EventScanner(ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, + TxTimeoutRepository timeoutRepository, OmegaCallback omegaCallback, int eventPollingInterval) { - this.scheduler = scheduler; this.eventRepository = eventRepository; this.commandRepository = commandRepository; + this.timeoutRepository = timeoutRepository; this.omegaCallback = omegaCallback; this.eventPollingInterval = eventPollingInterval; } @@ -64,16 +67,32 @@ public void run() { private void pollEvents() { scheduler.scheduleWithFixedDelay( () -> { + updateTimeoutStatus(); + findTimeoutEvents(); + abortTimeoutEvents(); saveUncompensatedEventsToCommands(); compensate(); updateCompensatedCommands(); deleteDuplicateSagaEndedEvents(); + updateTransactionStatus(); }, 0, eventPollingInterval, MILLISECONDS); } + private void findTimeoutEvents() { + eventRepository.findTimeoutEvents() + .forEach(event -> { + log.info("Found timeout event {}", event); + timeoutRepository.save(txTimeoutOf(event)); + }); + } + + private void updateTimeoutStatus() { + timeoutRepository.markTimeoutAsDone(); + } + private void saveUncompensatedEventsToCommands() { eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name()) .forEach(event -> { @@ -96,7 +115,7 @@ private void deleteDuplicateSagaEndedEvents() { try { eventRepository.deleteDuplicateEvents(SagaEndedEvent.name()); } catch (Exception e) { - log.warn("Failed to delete duplicate SagaEndedEvent", e); + log.warn("Failed to delete duplicate event", e); } } @@ -109,6 +128,23 @@ private void updateCompensationStatus(TxEvent event) { markSagaEnded(event); } + private void abortTimeoutEvents() { + timeoutRepository.findFirstTimeout().forEach(timeout -> { + log.info("Found timeout event {} to abort", timeout); + + eventRepository.save(toTxAbortedEvent(timeout)); + + if (timeout.type().equals(TxStartedEvent.name())) { + eventRepository.findTxStartedEventToCompensate(timeout.globalTxId(), timeout.localTxId()) + .ifPresent(omegaCallback::compensate); + } + }); + } + + private void updateTransactionStatus() { + eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd); + } + private void markSagaEnded(TxEvent event) { if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) { markGlobalTxEnd(event); @@ -120,11 +156,22 @@ private void markGlobalTxEnd(TxEvent event) { log.info("Marked end of transaction with globalTxId {}", event.globalTxId()); } + private TxEvent toTxAbortedEvent(TxTimeout timeout) { + return new TxEvent( + timeout.serviceName(), + timeout.instanceId(), + timeout.globalTxId(), + timeout.localTxId(), + timeout.parentTxId(), + TxAbortedEvent.name(), + "", + ("Transaction timeout").getBytes()); + } + private TxEvent toSagaEndedEvent(TxEvent event) { return new TxEvent( event.serviceName(), event.instanceId(), - new Date(), event.globalTxId(), event.globalTxId(), null, @@ -156,4 +203,18 @@ private TxEvent txStartedEventOf(Command command) { command.payloads() ); } + + private TxTimeout txTimeoutOf(TxEvent event) { + return new TxTimeout( + event.id(), + event.serviceName(), + event.instanceId(), + event.globalTxId(), + event.localTxId(), + event.parentTxId(), + event.type(), + event.expiryTime(), + NEW.name() + ); + } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java similarity index 96% rename from alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java index 0c9b78b8..442213b3 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/CommandStatus.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TaskStatus.java @@ -17,7 +17,7 @@ package org.apache.servicecomb.saga.alpha.core; -public enum CommandStatus { +public enum TaskStatus { NEW, PENDING, DONE diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java index 1364cb79..42a202fd 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEvent.java @@ -17,15 +17,21 @@ package org.apache.servicecomb.saga.alpha.core; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.Date; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; +import javax.persistence.Transient; @Entity public class TxEvent { + @Transient + private static final long MAX_TIMESTAMP = 253402214400000L; // 9999-12-31 00:00:00 + @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long surrogateId; @@ -38,6 +44,7 @@ private String parentTxId; private String type; private String compensationMethod; + private Date expiryTime; private byte[] payloads; private TxEvent() { @@ -53,6 +60,7 @@ public TxEvent(TxEvent event) { event.parentTxId, event.type, event.compensationMethod, + event.expiryTime, event.payloads); } @@ -65,33 +73,36 @@ public TxEvent( String type, String compensationMethod, byte[] payloads) { - this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, 0, payloads); } public TxEvent( String serviceName, String instanceId, - Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, String compensationMethod, + int timeout, byte[] payloads) { - this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this(-1L, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, timeout, + payloads); } public TxEvent( - long id, String serviceName, String instanceId, + Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, String compensationMethod, + int timeout, byte[] payloads) { - this(id, serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this(-1L, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, compensationMethod, + timeout, payloads); } TxEvent(Long surrogateId, @@ -103,8 +114,25 @@ public TxEvent( String parentTxId, String type, String compensationMethod, + int timeout, byte[] payloads) { + this(surrogateId, serviceName, instanceId, creationTime, globalTxId, localTxId, parentTxId, type, + compensationMethod, + timeout == 0 ? new Date(MAX_TIMESTAMP) : new Date(creationTime.getTime() + SECONDS.toMillis(timeout)), + payloads); + } + TxEvent(Long surrogateId, + String serviceName, + String instanceId, + Date creationTime, + String globalTxId, + String localTxId, + String parentTxId, + String type, + String compensationMethod, + Date expiryTime, + byte[] payloads) { this.surrogateId = surrogateId; this.serviceName = serviceName; this.instanceId = instanceId; @@ -114,6 +142,7 @@ public TxEvent( this.parentTxId = parentTxId; this.type = type; this.compensationMethod = compensationMethod; + this.expiryTime = expiryTime; this.payloads = payloads; } @@ -157,6 +186,10 @@ public long id() { return surrogateId; } + public Date expiryTime() { + return expiryTime; + } + @Override public String toString() { return "TxEvent{" + @@ -169,6 +202,7 @@ public String toString() { ", parentTxId='" + parentTxId + '\'' + ", type='" + type + '\'' + ", compensationMethod='" + compensationMethod + '\'' + + ", expiryTime='" + expiryTime + '\'' + '}'; } } diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java index b61aa069..0af6fb5f 100644 --- a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxEventRepository.java @@ -23,6 +23,12 @@ public interface TxEventRepository { void save(TxEvent event); + Optional findFirstAbortedGlobalTransaction(); + + List findTimeoutEvents(); + + Optional findTxStartedEventToCompensate(String globalTxId, String localTxId); + List findTransactions(String globalTxId, String type); List findFirstUncompensatedEventByIdGreaterThan(long id, String type); diff --git a/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java new file mode 100644 index 00000000..00ca2ec7 --- /dev/null +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeout.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.core; + +import java.util.Date; + +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Version; + +@Entity +public class TxTimeout { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long surrogateId; + + private long eventId; + private String serviceName; + private String instanceId; + private String globalTxId; + private String localTxId; + private String parentTxId; + private String type; + private Date expiryTime; + private String status; + + @Version + private long version; + + TxTimeout() { + } + + TxTimeout(long eventId, String serviceName, String instanceId, String globalTxId, String localTxId, + String parentTxId, String type, Date expiryTime, String status) { + this.eventId = eventId; + this.serviceName = serviceName; + this.instanceId = instanceId; + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.parentTxId = parentTxId; + this.type = type; + this.expiryTime = expiryTime; + this.status = status; + } + + public String serviceName() { + return serviceName; + } + + public String instanceId() { + return instanceId; + } + + public String globalTxId() { + return globalTxId; + } + + public String localTxId() { + return localTxId; + } + + public String parentTxId() { + return parentTxId; + } + + public String type() { + return type; + } + + public Date expiryTime() { + return expiryTime; + } + + public String status() { + return status; + } + + @Override + public String toString() { + return "TxTimeout{" + + "eventId=" + eventId + + ", serviceName='" + serviceName + '\'' + + ", instanceId='" + instanceId + '\'' + + ", globalTxId='" + globalTxId + '\'' + + ", localTxId='" + localTxId + '\'' + + ", parentTxId='" + parentTxId + '\'' + + ", type='" + type + '\'' + + ", expiryTime=" + expiryTime + + ", status=" + status + + '}'; + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java similarity index 79% rename from omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java rename to alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java index eb820d61..97387a36 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/OmegaTxTimeoutException.java +++ b/alpha/alpha-core/src/main/java/org/apache/servicecomb/saga/alpha/core/TxTimeoutRepository.java @@ -15,10 +15,14 @@ * limitations under the License. */ -package org.apache.servicecomb.saga.omega.transaction; +package org.apache.servicecomb.saga.alpha.core; -public class OmegaTxTimeoutException extends RuntimeException { - public OmegaTxTimeoutException(String cause) { - super(cause); - } +import java.util.List; + +public interface TxTimeoutRepository { + void save(TxTimeout timeout); + + void markTimeoutAsDone(); + + List findFirstTimeout(); } diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java index 5cda4c54..4ded48af 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import java.util.Date; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -160,7 +159,6 @@ private TxEvent eventOf(String serviceName, String instanceId, EventType eventTy return new TxEvent( serviceName, instanceId, - new Date(), uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 231d5bff..d2209940 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -29,7 +29,6 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import java.util.Date; import java.util.Deque; import java.util.List; import java.util.Optional; @@ -38,6 +37,7 @@ import java.util.stream.Collectors; import org.apache.servicecomb.saga.common.EventType; +import org.junit.Before; import org.junit.Test; public class TxConsistentServiceTest { @@ -48,6 +48,23 @@ public void save(TxEvent event) { events.add(event); } + @Override + public Optional findFirstAbortedGlobalTransaction() { + return Optional.empty(); + } + + @Override + public List findTimeoutEvents() { + return emptyList(); + } + + @Override + public Optional findTxStartedEventToCompensate(String globalTxId, String localTxId) { + return events.stream() + .filter(event -> globalTxId.equals(event.globalTxId()) && localTxId.equals(event.localTxId())) + .findFirst(); + } + @Override public List findTransactions(String globalTxId, String type) { return events.stream() @@ -81,6 +98,11 @@ public void deleteDuplicateEvents(String type) { private final TxConsistentService consistentService = new TxConsistentService(eventRepository); private final byte[] payloads = "yeah".getBytes(); + @Before + public void setUp() throws Exception { + events.clear(); + } + @Test public void persistEventOnArrival() throws Exception { TxEvent[] events = { @@ -111,11 +133,13 @@ public void skipTxStartedEvent_IfGlobalTxAlreadyFailed() { } private TxEvent newEvent(EventType eventType) { - return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, payloads); + return new TxEvent(serviceName, instanceId, globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, + payloads); } private TxEvent eventOf(EventType eventType, String localTxId) { - return new TxEvent(serviceName, instanceId, new Date(), + return new TxEvent(serviceName, + instanceId, globalTxId, localTxId, UUID.randomUUID().toString(), diff --git a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java index 68c33a91..c14ffd9e 100644 --- a/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java +++ b/alpha/alpha-core/src/test/java/org/apache/servicecomb/saga/alpha/core/TxEventMaker.java @@ -17,11 +17,9 @@ package org.apache.servicecomb.saga.alpha.core; -import org.apache.servicecomb.saga.common.EventType; - import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent; -import java.util.Date; import java.util.UUID; class TxEventMaker { @@ -29,11 +27,10 @@ static TxEvent someEvent() { return new TxEvent( uniquify("serviceName"), uniquify("instanceId"), - new Date(), uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), - EventType.TxStartedEvent.name(), + TxStartedEvent.name(), TxEventMaker.class.getCanonicalName(), uniquify("blah").getBytes()); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java index a4314377..6889c9f2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java @@ -35,6 +35,7 @@ import org.apache.servicecomb.saga.alpha.core.PushBackOmegaCallback; import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; +import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.context.annotation.Bean; @@ -69,6 +70,11 @@ CommandRepository springCommandRepository(TxEventEnvelopeRepository eventRepo, C return new SpringCommandRepository(eventRepo, commandRepository); } + @Bean + TxTimeoutRepository springTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) { + return new SpringTxTimeoutRepository(timeoutRepo); + } + @Bean ScheduledExecutorService compensationScheduler() { return scheduler; @@ -80,14 +86,13 @@ TxConsistentService txConsistentService(@Value("${alpha.server.port:8080}") int ScheduledExecutorService scheduler, TxEventRepository eventRepository, CommandRepository commandRepository, + TxTimeoutRepository timeoutRepository, OmegaCallback omegaCallback, Map> omegaCallbacks) { new EventScanner(scheduler, - eventRepository, - commandRepository, - omegaCallback, - eventPollingInterval).run(); + eventRepository, commandRepository, timeoutRepository, + omegaCallback, eventPollingInterval).run(); TxConsistentService consistentService = new TxConsistentService(eventRepository); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index eced7f9f..ee7e2e44 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -56,7 +56,7 @@ public void onConnected(GrpcServiceConfig request, StreamObserver responseObserver) { omegaCallbacks .computeIfAbsent(request.getServiceName(), key -> new ConcurrentHashMap<>()) - .computeIfAbsent(request.getInstanceId(), key -> new GrpcOmegaCallback(responseObserver)); + .put(request.getInstanceId(), new GrpcOmegaCallback(responseObserver)); } // TODO: 2018/1/5 connect is async and disconnect is sync, meaning callback may not be registered on disconnected @@ -84,6 +84,7 @@ public void onTxEvent(GrpcTxEvent message, StreamObserver responseObser message.getParentTxId().isEmpty() ? null : message.getParentTxId(), message.getType(), message.getCompensationMethod(), + message.getTimeout(), message.getPayloads().toByteArray() )); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java index afbdaf5d..086f88ec 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringCommandRepository.java @@ -17,9 +17,9 @@ package org.apache.servicecomb.saga.alpha.server; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.DONE; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.NEW; -import static org.apache.servicecomb.saga.alpha.core.CommandStatus.PENDING; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING; import java.lang.invoke.MethodHandles; import java.util.LinkedHashMap; @@ -33,11 +33,9 @@ import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.data.domain.PageRequest; public class SpringCommandRepository implements CommandRepository { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final PageRequest SINGLE_COMMAND_REQUEST = new PageRequest(0, 1); private final TxEventEnvelopeRepository eventRepository; private final CommandEntityRepository commandRepository; diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java index ad321482..d6ea21c3 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxEventRepository.java @@ -25,6 +25,7 @@ import org.springframework.data.domain.PageRequest; class SpringTxEventRepository implements TxEventRepository { + private static final PageRequest SINGLE_TX_EVENT_REQUEST = new PageRequest(0, 1); private final TxEventEnvelopeRepository eventRepo; SpringTxEventRepository(TxEventEnvelopeRepository eventRepo) { @@ -36,6 +37,21 @@ public void save(TxEvent event) { eventRepo.save(event); } + @Override + public Optional findFirstAbortedGlobalTransaction() { + return eventRepo.findFirstAbortedGlobalTxByType(); + } + + @Override + public List findTimeoutEvents() { + return eventRepo.findTimeoutEvents(SINGLE_TX_EVENT_REQUEST); + } + + @Override + public Optional findTxStartedEventToCompensate(String globalTxId, String localTxId) { + return eventRepo.findFirstStartedEventByGlobalTxIdAndLocalTxId(globalTxId, localTxId); + } + @Override public List findTransactions(String globalTxId, String type) { return eventRepo.findByEventGlobalTxIdAndEventType(globalTxId, type); @@ -43,7 +59,7 @@ public void save(TxEvent event) { @Override public List findFirstUncompensatedEventByIdGreaterThan(long id, String type) { - return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, new PageRequest(0, 1)); + return eventRepo.findFirstByTypeAndSurrogateIdGreaterThan(type, id, SINGLE_TX_EVENT_REQUEST); } @Override diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java new file mode 100644 index 00000000..ee754969 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/SpringTxTimeoutRepository.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING; + +import java.util.List; + +import javax.transaction.Transactional; + +import org.apache.servicecomb.saga.alpha.core.TxTimeout; +import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; +import org.springframework.data.domain.PageRequest; + +public class SpringTxTimeoutRepository implements TxTimeoutRepository { + private final TxTimeoutEntityRepository timeoutRepo; + + SpringTxTimeoutRepository(TxTimeoutEntityRepository timeoutRepo) { + this.timeoutRepo = timeoutRepo; + } + + @Override + public void save(TxTimeout timeout) { + timeoutRepo.save(timeout); + } + + @Override + public void markTimeoutAsDone() { + timeoutRepo.updateStatusOfFinishedTx(); + } + + @Transactional + @Override + public List findFirstTimeout() { + List timeoutEvents = timeoutRepo.findFirstTimeoutTxOrderByExpireTimeAsc(new PageRequest(0, 1)); + timeoutEvents.forEach(event -> timeoutRepo + .updateStatusByGlobalTxIdAndLocalTxId(PENDING.name(), event.globalTxId(), event.localTxId())); + return timeoutEvents; + } +} \ No newline at end of file diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 2e52fef2..0eaf0898 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -31,8 +31,32 @@ interface TxEventEnvelopeRepository extends CrudRepository { List findByGlobalTxId(String globalTxId); + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type = 'TxAbortedEvent' AND NOT EXISTS( " + + " SELECT t1.globalTxId FROM TxEvent t1" + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.type IN ('TxEndedEvent', 'SagaEndedEvent'))") + Optional findFirstAbortedGlobalTxByType(); + + @Query("SELECT t FROM TxEvent t " + + "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') " + + " AND t.expiryTime < CURRENT_TIMESTAMP AND NOT EXISTS( " + + " SELECT t1.globalTxId FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type != t.type" + + ")") + List findTimeoutEvents(Pageable pageable); + + @Query("SELECT t FROM TxEvent t " + + "WHERE t.globalTxId = ?1 " + + " AND t.localTxId = ?2 " + + " AND t.type = 'TxStartedEvent'") + Optional findFirstStartedEventByGlobalTxIdAndLocalTxId(String globalTxId, String localTxId); + @Query("SELECT DISTINCT new org.apache.servicecomb.saga.alpha.core.TxEvent(" - + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, t.type, t.compensationMethod, t.payloads" + + "t.serviceName, t.instanceId, t.globalTxId, t.localTxId, t.parentTxId, " + + "t.type, t.compensationMethod, t.payloads" + ") FROM TxEvent t " + "WHERE t.globalTxId = ?1 AND t.type = ?2") List findByEventGlobalTxIdAndEventType(String globalTxId, String type); @@ -75,7 +99,7 @@ @Query("DELETE FROM TxEvent t " + "WHERE t.type = ?1 AND t.surrogateId NOT IN (" + " SELECT MAX(t1.surrogateId) FROM TxEvent t1 " - + " WHERE t1.type = ?1" + + " WHERE t1.type = ?1 " + " GROUP BY t1.globalTxId" + ")") void deleteByType(String type); diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java new file mode 100644 index 00000000..f0e264a4 --- /dev/null +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/TxTimeoutEntityRepository.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.server; + +import java.util.List; + +import javax.persistence.LockModeType; +import javax.transaction.Transactional; + +import org.apache.servicecomb.saga.alpha.core.TxTimeout; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.CrudRepository; +import org.springframework.data.repository.query.Param; + +interface TxTimeoutEntityRepository extends CrudRepository { + + @Transactional + @Modifying(clearAutomatically = true) + @Query("UPDATE org.apache.servicecomb.saga.alpha.core.TxTimeout t " + + "SET t.status = :status " + + "WHERE t.globalTxId = :globalTxId " + + " AND t.localTxId = :localTxId") + void updateStatusByGlobalTxIdAndLocalTxId( + @Param("status") String status, + @Param("globalTxId") String globalTxId, + @Param("localTxId") String localTxId); + + @Lock(LockModeType.OPTIMISTIC) + @Query("SELECT t FROM TxTimeout AS t " + + "WHERE t.status = 'NEW' " + + " AND t.expiryTime < CURRENT_TIMESTAMP " + + "ORDER BY t.expiryTime ASC") + List findFirstTimeoutTxOrderByExpireTimeAsc(Pageable pageable); + + @Transactional + @Modifying(clearAutomatically = true) + @Query("UPDATE TxTimeout t " + + "SET t.status = 'DONE' " + + "WHERE t.status != 'DONE' AND EXISTS (" + + " SELECT t1.globalTxId FROM TxEvent t1 " + + " WHERE t1.globalTxId = t.globalTxId " + + " AND t1.localTxId = t.localTxId " + + " AND t1.type != t.type" + + ")") + void updateStatusOfFinishedTx(); +} diff --git a/alpha/alpha-server/src/main/resources/schema-postgresql.sql b/alpha/alpha-server/src/main/resources/schema-postgresql.sql index d6b51729..e7f774b7 100644 --- a/alpha/alpha-server/src/main/resources/schema-postgresql.sql +++ b/alpha/alpha-server/src/main/resources/schema-postgresql.sql @@ -8,6 +8,7 @@ CREATE TABLE IF NOT EXISTS TxEvent ( parentTxId varchar(36) DEFAULT NULL, type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, + expiryTime timestamp(6) NOT NULL, payloads bytea ); @@ -30,3 +31,20 @@ CREATE TABLE IF NOT EXISTS Command ( ); CREATE INDEX IF NOT EXISTS saga_commands_index ON Command (surrogateId, eventId, globalTxId, localTxId, status); + + +CREATE TABLE IF NOT EXISTS TxTimeout ( + surrogateId BIGSERIAL PRIMARY KEY, + eventId bigint NOT NULL UNIQUE, + serviceName varchar(16) NOT NULL, + instanceId varchar(36) NOT NULL, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + type varchar(50) NOT NULL, + expiryTime TIMESTAMP NOT NULL, + status varchar(12), + version bigint NOT NULL +); + +CREATE INDEX IF NOT EXISTS saga_timeouts_index ON TxTimeout (surrogateId, expiryTime, globalTxId, localTxId, status); diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java index a479e2b4..c212b3b4 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaEventControllerTest.java @@ -27,7 +27,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import java.util.Date; import java.util.UUID; import org.apache.servicecomb.saga.alpha.core.TxEvent; @@ -40,13 +39,10 @@ import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; -import com.fasterxml.jackson.databind.ObjectMapper; - @RunWith(SpringRunner.class) @WebMvcTest(AlphaEventController.class) public class AlphaEventControllerTest { private final TxEvent someEvent = someEvent(); - private final ObjectMapper mapper = new ObjectMapper(); @Autowired private MockMvc mockMvc; @@ -72,7 +68,6 @@ private TxEvent someEvent() { return new TxEvent( uniquify("serviceName"), uniquify("instanceId"), - new Date(), uniquify("globalTxId"), uniquify("localTxId"), UUID.randomUUID().toString(), diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 539f610a..497c2443 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -19,7 +19,9 @@ import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.servicecomb.saga.alpha.core.TaskStatus.DONE; import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent; +import static org.apache.servicecomb.saga.common.EventType.SagaStartedEvent; import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent; import static org.apache.servicecomb.saga.common.EventType.TxCompensatedEvent; import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent; @@ -30,7 +32,6 @@ import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Queue; @@ -47,6 +48,8 @@ import org.apache.servicecomb.saga.alpha.core.TxConsistentService; import org.apache.servicecomb.saga.alpha.core.TxEvent; import org.apache.servicecomb.saga.alpha.core.TxEventRepository; +import org.apache.servicecomb.saga.alpha.core.TxTimeout; +import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcCompensateCommand; @@ -107,11 +110,17 @@ private CommandRepository commandRepository; @Autowired - private CommandEntityRepository commandEntityRepository; + private TxTimeoutRepository timeoutRepository; + + @Autowired + private TxTimeoutEntityRepository timeoutEntityRepository; @Autowired private OmegaCallback omegaCallback; + @Autowired + private CommandEntityRepository commandEntityRepository; + @Autowired private Map> omegaCallbacks; @@ -144,6 +153,7 @@ public void deleteAllTillSuccessful() { try { eventRepo.deleteAll(); commandEntityRepository.deleteAll(); + timeoutEntityRepository.deleteAll(); deleted = true; } catch (Exception ignored) { } @@ -367,6 +377,54 @@ public void sagaEndedEventIsAlwaysInTheEnd() throws Exception { }); } + @Test + public void abortTimeoutSagaStartedEvent() { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEventWithTimeout(SagaStartedEvent, globalTxId, null, 1)); + + await().atMost(2, SECONDS).until(() -> eventRepo.count() == 3); + + List events = eventRepo.findByGlobalTxId(globalTxId); + assertThat(events.get(0).type(), is(SagaStartedEvent.name())); + assertThat(events.get(1).type(), is(TxAbortedEvent.name())); + assertThat(events.get(2).type(), is(SagaEndedEvent.name())); + + assertThat(timeoutEntityRepository.count(), is(1L)); + Iterable timeouts = timeoutEntityRepository.findAll(); + timeouts.forEach(timeout -> { + assertThat(timeout.status(), is(DONE.name())); + assertThat(timeout.globalTxId(), is(globalTxId)); + assertThat(timeout.localTxId(), is(globalTxId)); + }); + } + + @Test + public void abortTimeoutTxStartedEvent() { + asyncStub.onConnected(serviceConfig, compensateResponseObserver); + blockingStub.onTxEvent(someGrpcEvent(SagaStartedEvent, globalTxId, globalTxId)); + blockingStub.onTxEvent(someGrpcEventWithTimeout(TxStartedEvent, localTxId, globalTxId, 1)); + + await().atMost(2, SECONDS).until(() -> { + List events = eventRepo.findByGlobalTxId(globalTxId); + return eventRepo.count() == 5 && events.get(events.size() - 1).type().equals(SagaEndedEvent.name()); + }); + + List events = eventRepo.findByGlobalTxId(globalTxId); + assertThat(events.get(0).type(), is(SagaStartedEvent.name())); + assertThat(events.get(1).type(), is(TxStartedEvent.name())); + assertThat(events.get(2).type(), is(TxAbortedEvent.name())); + assertThat(events.get(3).type(), is(TxCompensatedEvent.name())); + assertThat(events.get(4).type(), is(SagaEndedEvent.name())); + + assertThat(timeoutEntityRepository.count(), is(1L)); + Iterable timeouts = timeoutEntityRepository.findAll(); + timeouts.forEach(timeout -> { + assertThat(timeout.status(), is(DONE.name())); + assertThat(timeout.globalTxId(), is(globalTxId)); + assertThat(timeout.localTxId(), is(localTxId)); + }); + } + private GrpcAck onCompensation(GrpcCompensateCommand command) { return blockingStub.onTxEvent( eventOf(TxCompensatedEvent, @@ -387,7 +445,6 @@ private TxEvent someTxAbortEvent(String serviceName, String instanceId) { return new TxEvent( serviceName, instanceId, - new Date(), globalTxId, localTxId, parentTxId, @@ -396,6 +453,10 @@ private TxEvent someTxAbortEvent(String serviceName, String instanceId) { payload.getBytes()); } + private GrpcTxEvent someGrpcEventWithTimeout(EventType type, String localTxId, String parentTxId, int timeout) { + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), timeout); + } + private GrpcTxEvent someGrpcEvent(EventType type) { return eventOf(type, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName()); } @@ -405,11 +466,11 @@ private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId) { } private GrpcTxEvent someGrpcEvent(EventType type, String globalTxId, String localTxId) { - return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName()); + return eventOf(type, globalTxId, localTxId, parentTxId, payload.getBytes(), getClass().getCanonicalName(), 0); } private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, String compensationMethod) { - return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod); + return eventOf(eventType, globalTxId, localTxId, parentTxId, payloads, compensationMethod, 0); } private GrpcTxEvent eventOf(EventType eventType, @@ -417,7 +478,8 @@ private GrpcTxEvent eventOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads, - String compensationMethod) { + String compensationMethod, + int timeout) { return GrpcTxEvent.newBuilder() .setServiceName(serviceName) @@ -428,6 +490,7 @@ private GrpcTxEvent eventOf(EventType eventType, .setParentTxId(parentTxId == null ? "" : parentTxId) .setType(eventType.name()) .setCompensationMethod(compensationMethod) + .setTimeout(timeout) .setPayloads(ByteString.copyFrom(payloads)) .build(); } @@ -472,7 +535,7 @@ void init() { Executors.newSingleThreadScheduledExecutor(), eventRepository, commandRepository, - omegaCallback, - 1).run(); + timeoutRepository, + omegaCallback, 1).run(); } } diff --git a/alpha/alpha-server/src/test/resources/schema.sql b/alpha/alpha-server/src/test/resources/schema.sql index 344fdda2..929c69f8 100644 --- a/alpha/alpha-server/src/test/resources/schema.sql +++ b/alpha/alpha-server/src/test/resources/schema.sql @@ -8,8 +8,8 @@ CREATE TABLE IF NOT EXISTS TxEvent ( parentTxId varchar(36) DEFAULT NULL, type varchar(50) NOT NULL, compensationMethod varchar(256) NOT NULL, - payloads varbinary(10240), --- version bigint NOT NULL + expiryTime TIMESTAMP NOT NULL, + payloads varbinary(10240) ); CREATE TABLE IF NOT EXISTS Command ( @@ -26,3 +26,17 @@ CREATE TABLE IF NOT EXISTS Command ( lastModified TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, version bigint NOT NULL ); + +CREATE TABLE IF NOT EXISTS TxTimeout ( + surrogateId bigint GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY, + eventId bigint NOT NULL UNIQUE, + serviceName varchar(36) NOT NULL, + instanceId varchar(36) NOT NULL, + globalTxId varchar(36) NOT NULL, + localTxId varchar(36) NOT NULL, + parentTxId varchar(36) DEFAULT NULL, + type varchar(50) NOT NULL, + expiryTime TIMESTAMP NOT NULL, + status varchar(12), + version bigint NOT NULL +); diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java index 8062ae90..bb24c5cb 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/LoadBalancedClusterMessageSenderTest.java @@ -97,7 +97,8 @@ private final String localTxId = uniquify("localTxId"); private final String parentTxId = uniquify("parentTxId"); private final String compensationMethod = getClass().getCanonicalName(); - private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, "blah"); + private final TxEvent event = new TxEvent(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, + compensationMethod, 0, "blah"); private final String serviceName = uniquify("serviceName"); private final String[] addresses = {"localhost:8080", "localhost:8090"}; @@ -299,7 +300,7 @@ public void stopSendingWhenClusterIsDown() throws Exception { public void forwardSendResult() { assertThat(messageSender.send(event).aborted(), is(false)); - TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", "blah"); + TxEvent rejectEvent = new TxStartedEvent(globalTxId, localTxId, parentTxId, "reject", 0, "blah"); assertThat(messageSender.send(rejectEvent).aborted(), is(true)); } @@ -356,6 +357,7 @@ public void onTxEvent(GrpcTxEvent request, StreamObserver responseObser request.getLocalTxId(), request.getParentTxId(), request.getCompensationMethod(), + 0, new String(request.getPayloads().toByteArray()))); sleep(); diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java index 562c50f8..95bda85f 100644 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/org/apache/servicecomb/saga/omega/connector/grpc/RetryableMessageSenderTest.java @@ -42,7 +42,7 @@ private final String globalTxId = uniquify("globalTxId"); private final String localTxId = uniquify("localTxId"); - private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x"); + private final TxStartedEvent event = new TxStartedEvent(globalTxId, localTxId, null, "method x", 0); @Test public void sendEventWhenSenderIsAvailable() { @@ -56,7 +56,7 @@ public void sendEventWhenSenderIsAvailable() { @Test public void blowsUpWhenEventIsSagaStarted() { - TxEvent event = new SagaStartedEvent(globalTxId, localTxId); + TxEvent event = new SagaStartedEvent(globalTxId, localTxId, 0); try { messageSender.send(event); diff --git a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java index 7daf9547..9fd2a7ed 100644 --- a/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java +++ b/omega/omega-spring-tx/src/test/java/org/apache/servicecomb/saga/omega/transaction/spring/TransactionInterceptionTest.java @@ -131,7 +131,7 @@ public void sendsUserToRemote_AroundTransaction() throws Exception { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()}, toArray(messages) ); @@ -152,7 +152,7 @@ public void sendsAbortEvent_OnSubTransactionFailure() throws Exception { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, illegalUser).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, illegalUser).toString(), new TxAbortedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, throwable).toString()}, toArray(messages) ); @@ -174,9 +174,9 @@ public void compensateOnTransactionException() throws Exception { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), - new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, anotherUser).toString(), + new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, anotherUser).toString(), new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString(), new TxCompensatedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), new TxCompensatedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString() @@ -196,9 +196,9 @@ public void passesOmegaContextThroughDifferentThreads() throws Exception { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), - new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(), + new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(), new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()}, toArray(messages) ); @@ -215,9 +215,9 @@ public void passesOmegaContextInThreadPool() throws Exception { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString(), - new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, jack).toString(), + new TxStartedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod, 0, jack).toString(), new TxEndedEvent(globalTxId, anotherLocalTxId, localTxId, compensationMethod).toString()}, toArray(messages) ); @@ -237,7 +237,7 @@ public void passesOmegaContextThroughReactiveX() throws Exception { assertArrayEquals( new String[]{ - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()}, toArray(messages) ); @@ -255,7 +255,7 @@ public void passesOmegaContextAmongActors() throws Exception { assertArrayEquals( new String[] { - new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, user).toString(), + new TxStartedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod, 0, user).toString(), new TxEndedEvent(globalTxId, newLocalTxId, globalTxId, compensationMethod).toString()}, toArray(messages) ); diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java index 074a5eca..53e51581 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptor.java @@ -29,9 +29,9 @@ } @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { - return sender - .send(new TxStartedEvent(context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, message)); + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { + return sender.send(new TxStartedEvent( + context.globalTxId(), context.localTxId(), parentTxId, compensationMethod, timeout, message)); } @Override diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java index 44259496..bb2cca4b 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/EventAwareInterceptor.java @@ -20,7 +20,7 @@ public interface EventAwareInterceptor { EventAwareInterceptor NO_OP_INTERCEPTOR = new EventAwareInterceptor() { @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { return new AlphaResponse(false); } @@ -33,9 +33,9 @@ public void onError(String parentTxId, String compensationMethod, Throwable thro } }; - AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message); + AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message); - void postIntercept(String parentTxId, String compensationMethod) throws Throwable; + void postIntercept(String parentTxId, String compensationMethod); void onError(String parentTxId, String compensationMethod, Throwable throwable); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java index 7074d8fd..8c70e3a1 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaEndedEvent.java @@ -22,6 +22,6 @@ public class SagaEndedEvent extends TxEvent { public SagaEndedEvent(String globalTxId, String localTxId) { - super(EventType.SagaEndedEvent, globalTxId, localTxId, null, ""); + super(EventType.SagaEndedEvent, globalTxId, localTxId, null, "", 0); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java index 7ef021a2..d3d55fe5 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessor.java @@ -32,9 +32,9 @@ } @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { + public AlphaResponse preIntercept(String parentTxId, String compensationMethod, int timeout, Object... message) { try { - return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); + return sender.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout)); } catch (OmegaException e) { throw new TransactionalException(e.getMessage(), e.getCause()); } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java index 09517522..388f237d 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspect.java @@ -17,12 +17,8 @@ package org.apache.servicecomb.saga.omega.transaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.context.annotations.SagaStart; @@ -38,7 +34,7 @@ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor; - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final OmegaContext context; public SagaStartAspect(MessageSender sender, OmegaContext context) { @@ -51,15 +47,13 @@ Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwab initializeOmegaContext(); Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(sagaStartAnnotationProcessor); - interceptor.preIntercept(context.globalTxId(), method.toString()); + sagaStartAnnotationProcessor.preIntercept(context.globalTxId(), method.toString(), sagaStart.timeout()); LOG.debug("Initialized context {} before execution of method {}", context, method.toString()); - scheduleTimeoutTask(interceptor, method, sagaStart.timeout()); try { Object result = joinPoint.proceed(); - interceptor.postIntercept(context.globalTxId(), method.toString()); + sagaStartAnnotationProcessor.postIntercept(context.globalTxId(), method.toString()); LOG.debug("Transaction with context {} has finished.", context); return result; @@ -74,20 +68,4 @@ Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwab private void initializeOmegaContext() { context.setLocalTxId(context.newGlobalTxId()); } - - private void scheduleTimeoutTask( - TimeAwareInterceptor interceptor, - Method method, - int timeout) { - - if (timeout > 0) { - executor.schedule( - () -> interceptor.onTimeout( - context.globalTxId(), - method.toString(), - new OmegaTxTimeoutException("Saga " + method.toString() + " timed out")), - timeout, - MILLISECONDS); - } - } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java index 54f61e41..cb76a265 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/SagaStartedEvent.java @@ -20,9 +20,8 @@ import org.apache.servicecomb.saga.common.EventType; public class SagaStartedEvent extends TxEvent { - - public SagaStartedEvent(String globalTxId, String localTxId) { + public SagaStartedEvent(String globalTxId, String localTxId, int timeout) { // use "" instead of null as compensationMethod requires not null in sql - super(EventType.SagaStartedEvent, globalTxId, localTxId, null, ""); + super(EventType.SagaStartedEvent, globalTxId, localTxId, null, "", timeout); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java deleted file mode 100644 index 2057fbc5..00000000 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.saga.omega.transaction; - -import java.util.concurrent.atomic.AtomicReference; - -class TimeAwareInterceptor implements EventAwareInterceptor { - private final EventAwareInterceptor interceptor; - private final AtomicReference interceptorRef; - private Throwable throwable = null; - - TimeAwareInterceptor(EventAwareInterceptor interceptor) { - this.interceptor = interceptor; - this.interceptorRef = new AtomicReference<>(interceptor); - } - - @Override - public AlphaResponse preIntercept(String parentTxId, String signature, Object... args) { - return interceptor.preIntercept(parentTxId, signature, args); - } - - @Override - public void postIntercept(String parentTxId, String signature) throws Throwable { - if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) { - interceptor.postIntercept(parentTxId, signature); - } else if (throwable != null) { - throw throwable; - } - } - - @Override - public void onError(String parentTxId, String signature, Throwable throwable) { - if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) { - interceptor.onError(parentTxId, signature, throwable); - } - } - - void onTimeout(String parentTxId, String signature, Throwable throwable) { - if (interceptorRef.compareAndSet(interceptor, NO_OP_INTERCEPTOR)) { - interceptor.onError(parentTxId, signature, throwable); - this.throwable = throwable; - } - } -} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java index 5a61dc74..932b9901 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspect.java @@ -17,12 +17,8 @@ package org.apache.servicecomb.saga.omega.transaction; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import javax.transaction.InvalidTransactionException; @@ -40,7 +36,7 @@ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final OmegaContext context; - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final CompensableInterceptor interceptor; public TransactionAspect(MessageSender sender, OmegaContext context) { @@ -58,8 +54,7 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Thr String localTxId = context.localTxId(); context.newLocalTxId(); - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(this.interceptor); - AlphaResponse response = interceptor.preIntercept(localTxId, signature, joinPoint.getArgs()); + AlphaResponse response = interceptor.preIntercept(localTxId, signature, compensable.timeout(), joinPoint.getArgs()); if (response.aborted()) { String abortedLocalTxId = context.localTxId(); context.setLocalTxId(localTxId); @@ -68,9 +63,6 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Thr } LOG.debug("Updated context {} for compensable method {} ", context, method.toString()); - // TODO: 2018/1/15 omega shall be stateless, all states shall be on alpha - scheduleTimeoutTask(interceptor, localTxId, signature, method, compensable.timeout()); - try { Object result = joinPoint.proceed(); interceptor.postIntercept(localTxId, signature); @@ -85,24 +77,6 @@ Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Thr } } - private void scheduleTimeoutTask( - TimeAwareInterceptor interceptor, - String localTxId, - String signature, - Method method, - int timeout) { - - if (timeout > 0) { - executor.schedule( - () -> interceptor.onTimeout( - localTxId, - signature, - new OmegaTxTimeoutException("Transaction " + method.toString() + " timed out")), - timeout, - MILLISECONDS); - } - } - private String compensationMethodSignature(ProceedingJoinPoint joinPoint, Compensable compensable, Method method) throws NoSuchMethodException { diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java index 13df2f73..d6aa5333 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxAbortedEvent.java @@ -17,14 +17,14 @@ package org.apache.servicecomb.saga.omega.transaction; -import org.apache.servicecomb.saga.common.EventType; - import java.io.PrintWriter; import java.io.StringWriter; +import org.apache.servicecomb.saga.common.EventType; + public class TxAbortedEvent extends TxEvent { public TxAbortedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Throwable throwable) { - super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, stackTrace(throwable)); + super(EventType.TxAbortedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0, stackTrace(throwable)); } private static String stackTrace(Throwable e) { diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java index dbbaeab5..8e288dfb 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxCompensatedEvent.java @@ -21,6 +21,6 @@ public class TxCompensatedEvent extends TxEvent { public TxCompensatedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod); + super(EventType.TxCompensatedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java index 4e587c85..8d6666a6 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEndedEvent.java @@ -21,6 +21,6 @@ public class TxEndedEvent extends TxEvent { public TxEndedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod) { - super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod); + super(EventType.TxEndedEvent, globalTxId, localTxId, parentTxId, compensationMethod, 0); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java index 1398d3ea..34be420e 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxEvent.java @@ -17,10 +17,10 @@ package org.apache.servicecomb.saga.omega.transaction; -import org.apache.servicecomb.saga.common.EventType; - import java.util.Arrays; +import org.apache.servicecomb.saga.common.EventType; + public class TxEvent { private final long timestamp; @@ -29,9 +29,11 @@ private final String localTxId; private final String parentTxId; private final String compensationMethod; + private final int timeout; private final Object[] payloads; - public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { + public TxEvent(EventType type, String globalTxId, String localTxId, String parentTxId, String compensationMethod, + int timeout, Object... payloads) { this.timestamp = System.currentTimeMillis(); this.type = type; this.localTxId = localTxId; @@ -39,6 +41,7 @@ public TxEvent(EventType type, String globalTxId, String localTxId, String paren this.compensationMethod = compensationMethod; this.payloads = payloads; this.globalTxId = globalTxId; + this.timeout = timeout; } public long timestamp() { @@ -69,6 +72,10 @@ public String compensationMethod() { return compensationMethod; } + public int timeout() { + return timeout; + } + @Override public String toString() { return type.name() + "{" + @@ -76,6 +83,7 @@ public String toString() { ", localTxId='" + localTxId + '\'' + ", parentTxId='" + parentTxId + '\'' + ", compensationMethod='" + compensationMethod + '\'' + + ", timeout='" + timeout + '\'' + ", payloads=" + Arrays.toString(payloads) + '}'; } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java index ce93ea3e..4732d952 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/TxStartedEvent.java @@ -21,7 +21,8 @@ public class TxStartedEvent extends TxEvent { - public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, String compensationMethod, Object... payloads) { - super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, payloads); + public TxStartedEvent(String globalTxId, String localTxId, String parentTxId, + String compensationMethod, int timeout, Object... payloads) { + super(EventType.TxStartedEvent, globalTxId, localTxId, parentTxId, compensationMethod, timeout, payloads); } } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java index 21af7e6e..0ef9d4dc 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/CompensableInterceptorTest.java @@ -62,7 +62,7 @@ public void setUp() throws Exception { @Test public void sendsTxStartedEventBefore() throws Exception { - interceptor.preIntercept(parentTxId, compensationMethod, message); + interceptor.preIntercept(parentTxId, compensationMethod, 0, message); TxEvent event = messages.get(0); diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java index 566a456b..cc84fc57 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAnnotationProcessorTest.java @@ -65,7 +65,7 @@ public void setUp() throws Exception { @Test public void sendsSagaStartedEvent() { - sagaStartAnnotationProcessor.preIntercept(null, null); + sagaStartAnnotationProcessor.preIntercept(null, null, 0); TxEvent event = messages.get(0); @@ -99,7 +99,7 @@ public void transformInterceptedException() { doThrow(exception).when(sender).send(any()); try { - sagaStartAnnotationProcessor.preIntercept(null, null); + sagaStartAnnotationProcessor.preIntercept(null, null, 0); expectFailing(TransactionalException.class); } catch (TransactionalException e) { assertThat(e.getMessage(), is("exception")); diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java index 1bc2b285..77d40ef9 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/SagaStartAspectTest.java @@ -18,20 +18,14 @@ package org.apache.servicecomb.saga.omega.transaction; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.servicecomb.saga.common.EventType; import org.apache.servicecomb.saga.omega.context.IdGenerator; @@ -62,8 +56,6 @@ private final OmegaContext omegaContext = new OmegaContext(idGenerator); private final SagaStartAspect aspect = new SagaStartAspect(sender, omegaContext); - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - @Before public void setUp() throws Exception { when(idGenerator.nextId()).thenReturn(globalTxId); @@ -120,42 +112,6 @@ public void clearContextOnSagaStartError() throws Throwable { assertThat(omegaContext.localTxId(), is(nullValue())); } - @Test - public void sendsAbortEventOnTimeout() throws Throwable { - CountDownLatch latch = new CountDownLatch(1); - when(sagaStart.timeout()).thenReturn(100); - when(joinPoint.proceed()).thenAnswer(invocationOnMock -> { - latch.await(); - assertThat(omegaContext.localTxId(), is(globalTxId)); - return null; - }); - - ExpectedException exception = ExpectedException.none(); - executor.execute(() -> { - try { - aspect.advise(joinPoint, sagaStart); - } catch (Throwable throwable) { - exception.expect(OmegaTxTimeoutException.class); - } - }); - - await().atMost(1, SECONDS).until(() -> messages.size() == 2); - - TxEvent event = messages.get(1); - - assertThat(event.globalTxId(), is(globalTxId)); - assertThat(event.localTxId(), is(globalTxId)); - assertThat(event.parentTxId(), is(nullValue())); - assertThat(event.type(), is(EventType.TxAbortedEvent)); - - latch.countDown(); - - await().atMost(1, SECONDS).until(() -> omegaContext.localTxId() == null); - - // no redundant ended message received - assertThat(messages.size(), is(2)); - } - private String doNothing() { return "doNothing"; } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java deleted file mode 100644 index 1136a450..00000000 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TimeAwareInterceptorTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.servicecomb.saga.omega.transaction; - -import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; -import org.junit.rules.ExpectedException; - -public class TimeAwareInterceptorTest { - private static final int runningCounts = 1000; - - private final String localTxId = uniquify("localTxId"); - private final String signature = uniquify("signature"); - - private final AtomicInteger postInterceptInvoked = new AtomicInteger(); - private final AtomicInteger onErrorInvoked = new AtomicInteger(); - private final AtomicInteger onTimeoutInvoked = new AtomicInteger(); - - private final EventAwareInterceptor underlying = new EventAwareInterceptor() { - @Override - public AlphaResponse preIntercept(String parentTxId, String compensationMethod, Object... message) { - return new AlphaResponse(false); - } - - @Override - public void postIntercept(String parentTxId, String compensationMethod) { - postInterceptInvoked.incrementAndGet(); - } - - @Override - public void onError(String parentTxId, String compensationMethod, Throwable throwable) { - if (throwable instanceof OmegaTxTimeoutException) { - onTimeoutInvoked.incrementAndGet(); - } else { - onErrorInvoked.incrementAndGet(); - } - } - }; - - private final ExecutorService executorService = Executors.newFixedThreadPool(2); - private final RuntimeException timeoutException = new OmegaTxTimeoutException("timed out"); - - - @Test(timeout = 5000) - public void invokeEitherPostInterceptOrOnTimeoutConcurrently() throws Exception { - List> futures = new LinkedList<>(); - - for (int i = 0; i < runningCounts; i++) { - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying); - CyclicBarrier cyclicBarrier = new CyclicBarrier(2); - ExpectedException exception = ExpectedException.none(); - - futures.add(executorService.submit(() -> { - try { - waitForSignal(cyclicBarrier); - interceptor.postIntercept(localTxId, signature); - } catch (Throwable throwable) { - exception.expect(OmegaTxTimeoutException.class); - } - })); - - futures.add(executorService.submit(() -> { - waitForSignal(cyclicBarrier); - interceptor.onTimeout(localTxId, signature, timeoutException); - })); - } - - waitTillAllDone(futures); - - assertThat(postInterceptInvoked.get() + onTimeoutInvoked.get(), is(runningCounts)); - } - - @Test(timeout = 5000) - public void invokeEitherOnErrorOrOnTimeoutConcurrently() throws Exception { - RuntimeException oops = new RuntimeException("oops"); - List> futures = new LinkedList<>(); - - for (int i = 0; i < runningCounts; i++) { - TimeAwareInterceptor interceptor = new TimeAwareInterceptor(underlying); - CyclicBarrier cyclicBarrier = new CyclicBarrier(2); - - - futures.add(executorService.submit(() -> { - waitForSignal(cyclicBarrier); - interceptor.onError(localTxId, signature, oops); - })); - - futures.add(executorService.submit(() -> { - waitForSignal(cyclicBarrier); - interceptor.onTimeout(localTxId, signature, timeoutException); - })); - } - - waitTillAllDone(futures); - - assertThat(onErrorInvoked.get() + onTimeoutInvoked.get(), is(runningCounts)); - } - - private void waitForSignal(CyclicBarrier cyclicBarrier) { - try { - cyclicBarrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - fail(e.getMessage()); - } - } - - private void waitTillAllDone(List> futures) - throws InterruptedException, java.util.concurrent.ExecutionException { - for (Future future : futures) { - future.get(); - } - } -} diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java index 8689a1e6..31d148fe 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/TransactionAspectTest.java @@ -18,11 +18,8 @@ package org.apache.servicecomb.saga.omega.transaction; import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.awaitility.Awaitility.await; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -32,9 +29,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.transaction.InvalidTransactionException; @@ -69,8 +63,6 @@ private final OmegaContext omegaContext = new OmegaContext(idGenerator); private final TransactionAspect aspect = new TransactionAspect(sender, omegaContext); - private final ExecutorService executor = Executors.newSingleThreadExecutor(); - @Before public void setUp() throws Exception { when(idGenerator.nextId()).thenReturn(newLocalTxId); @@ -130,46 +122,6 @@ public void restoreContextOnCompensableError() throws Throwable { assertThat(omegaContext.localTxId(), is(localTxId)); } - @Test - public void sendsAbortEventOnTimeout() throws Throwable { - CountDownLatch latch = new CountDownLatch(1); - when(compensable.timeout()).thenReturn(100); - when(joinPoint.proceed()).thenAnswer(invocationOnMock -> { - latch.await(); - assertThat(omegaContext.localTxId(), is(newLocalTxId)); - return null; - }); - - ExpectedException exception = ExpectedException.none(); - executor.execute(() -> { - try { - // need to setup the thread local for it - omegaContext.setGlobalTxId(globalTxId); - omegaContext.setLocalTxId(localTxId); - - aspect.advise(joinPoint, compensable); - } catch (Throwable throwable) { - exception.expect(OmegaTxTimeoutException.class); - } - }); - - await().atMost(1, SECONDS).until(() -> messages.size() == 2); - - TxEvent event = messages.get(1); - - assertThat(event.globalTxId(), is(globalTxId)); - assertThat(event.localTxId(), is(newLocalTxId)); - assertThat(event.parentTxId(), is(localTxId)); - assertThat(event.type(), is(EventType.TxAbortedEvent)); - - latch.countDown(); - - await().atMost(1, SECONDS).until(() -> localTxId.equals(omegaContext.localTxId())); - - // no redundant ended message received - assertThat(messages.size(), is(2)); - } - @Test public void returnImmediatelyWhenReceivedRejectResponse() throws Throwable { MessageSender sender = mock(MessageSender.class); diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto index 26368814..3944eee9 100644 --- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto +++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto @@ -46,6 +46,7 @@ message GrpcTxEvent { bytes payloads = 7; string serviceName = 8; string instanceId = 9; + int32 timeout = 10; } message GrpcCompensateCommand { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services