Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D5161100BF for ; Tue, 11 Feb 2014 11:13:13 +0000 (UTC) Received: (qmail 47880 invoked by uid 500); 11 Feb 2014 11:13:10 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 47735 invoked by uid 500); 11 Feb 2014 11:13:09 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 47699 invoked by uid 99); 11 Feb 2014 11:13:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 11:13:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9E20D92392C; Tue, 11 Feb 2014 11:13:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Tue, 11 Feb 2014 11:13:03 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] git commit: CAMEL-7184 Allow lazy creating QuickfixJ engines Updated Branches: refs/heads/master 1099b8ce9 -> 16f112aa0 CAMEL-7184 Allow lazy creating QuickfixJ engines Introduced "lazyCreateEngines" component setting and "lazyCreateEngine" endpoint URI parameter. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6f1bc4e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6f1bc4e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6f1bc4e Branch: refs/heads/master Commit: e6f1bc4e923810aa789425fb2bb86b1e3e8f1f81 Parents: 9e383f4 Author: Grzegorz Grzybek Authored: Tue Feb 11 11:04:00 2014 +0100 Committer: Grzegorz Grzybek Committed: Tue Feb 11 11:04:00 2014 +0100 ---------------------------------------------------------------------- .../component/quickfixj/QuickfixjComponent.java | 36 ++++++- .../component/quickfixj/QuickfixjEndpoint.java | 14 +++ .../component/quickfixj/QuickfixjEngine.java | 70 +++++++++++-- .../component/quickfixj/QuickfixjProducer.java | 1 + .../quickfixj/QuickfixjComponentTest.java | 100 ++++++++++++++++++- .../quickfixj/QuickfixjLazyProducerTest.java | 85 ++++++++++++++++ .../quickfixj/QuickfixjSpringTest.java | 7 ++ .../camel/component/quickfixj/TestSupport.java | 8 +- .../quickfixj/QuickfixjSpringTest-context.xml | 20 ++++ 9 files changed, 321 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java index 03b89cd..890d1d7 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjComponent.java @@ -26,6 +26,7 @@ import org.apache.camel.StartupListener; import org.apache.camel.impl.DefaultComponent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import quickfix.LogFactory; import quickfix.MessageFactory; import quickfix.MessageStoreFactory; @@ -33,6 +34,7 @@ import quickfix.SessionSettings; public class QuickfixjComponent extends DefaultComponent implements StartupListener { private static final Logger LOG = LoggerFactory.getLogger(QuickfixjComponent.class); + private static final String PARAMETER_LAZY_CREATE_ENGINE = "lazyCreateEngine"; private final Object engineInstancesLock = new Object(); private final Map engines = new HashMap(); @@ -43,6 +45,7 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe private LogFactory logFactory; private MessageFactory messageFactory; private Map configurations = new HashMap(); + private boolean lazyCreateEngines = false; @Override protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { @@ -58,12 +61,17 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe } if (engine == null) { QuickfixjConfiguration configuration = configurations.get(remaining); + SessionSettings settings = null; if (configuration != null) { - SessionSettings settings = configuration.createSessionSettings(); - engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory); + settings = configuration.createSessionSettings(); } else { - engine = new QuickfixjEngine(uri, remaining, messageStoreFactory, logFactory, messageFactory); + settings = QuickfixjEngine.loadSettings(remaining); } + Boolean lazyCreateEngineForEndpoint = super.getAndRemoveParameter(parameters, PARAMETER_LAZY_CREATE_ENGINE, Boolean.TYPE); + if (lazyCreateEngineForEndpoint == null) + lazyCreateEngineForEndpoint = isLazyCreateEngines(); + engine = new QuickfixjEngine(uri, settings, messageStoreFactory, logFactory, messageFactory, + lazyCreateEngineForEndpoint); // only start engine if CamelContext is already started, otherwise the engines gets started // automatic later when CamelContext has been started using the StartupListener @@ -112,8 +120,12 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe } private void startQuickfixjEngine(QuickfixjEngine engine) throws Exception { - LOG.info("Starting QuickFIX/J engine: {}", engine.getUri()); - engine.start(); + if (!engine.isLazy()) { + LOG.info("Starting QuickFIX/J engine: {}", engine.getUri()); + engine.start(); + } else { + LOG.info("QuickFIX/J engine: {} will start lazily", engine.getUri()); + } } // Test Support @@ -153,6 +165,20 @@ public class QuickfixjComponent extends DefaultComponent implements StartupListe this.configurations = configurations; } + public boolean isLazyCreateEngines() { + return this.lazyCreateEngines; + } + + /** + * If set to true, the engines will be created and started when needed (when first message + * is send) + * + * @param lazyCreateEngines + */ + public void setLazyCreateEngines(boolean lazyCreateEngines) { + this.lazyCreateEngines = lazyCreateEngines; + } + @Override public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception { // only start quickfix engines when CamelContext have finished starting http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java index 44a117d..09f0822 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java @@ -136,6 +136,20 @@ public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEvent return true; } + /** + * Initializing and starts the engine if it wasn't initialized so far. + */ + public void ensureInitialized() throws Exception { + if (!engine.isInitialized()) { + synchronized (engine) { + if (!engine.isInitialized()) { + engine.initializeEngine(); + engine.start(); + } + } + } + } + public QuickfixjEngine getEngine() { return engine; } http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java index 673fd51..c1fb9ff 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEngine.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import javax.management.JMException; import javax.management.ObjectName; @@ -87,17 +88,20 @@ public class QuickfixjEngine extends ServiceSupport { private static final Logger LOG = LoggerFactory.getLogger(QuickfixjEngine.class); - private final Acceptor acceptor; - private final Initiator initiator; - private final JmxExporter jmxExporter; - private final MessageStoreFactory messageStoreFactory; - private final LogFactory sessionLogFactory; - private final MessageFactory messageFactory; + private Acceptor acceptor; + private Initiator initiator; + private JmxExporter jmxExporter; + private MessageStoreFactory messageStoreFactory; + private LogFactory sessionLogFactory; + private MessageFactory messageFactory; private final MessageCorrelator messageCorrelator = new MessageCorrelator(); private List eventListeners = new CopyOnWriteArrayList(); private final String uri; private ObjectName acceptorObjectName; private ObjectName initiatorObjectName; + private final SessionSettings settings; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private boolean lazy = false; public enum ThreadModel { ThreadPerConnector, ThreadPerSession; @@ -148,13 +152,48 @@ public class QuickfixjEngine extends ServiceSupport { public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride, MessageFactory messageFactoryOverride) throws ConfigError, FieldConvertError, IOException, JMException { + this(uri, settings, messageStoreFactoryOverride, sessionLogFactoryOverride, messageFactoryOverride, false); + } + + public QuickfixjEngine(String uri, SessionSettings settings, MessageStoreFactory messageStoreFactoryOverride, LogFactory sessionLogFactoryOverride, + MessageFactory messageFactoryOverride, boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException { addEventListener(messageCorrelator); this.uri = uri; - - messageFactory = messageFactoryOverride != null ? messageFactoryOverride : new DefaultMessageFactory(); - sessionLogFactory = sessionLogFactoryOverride != null ? sessionLogFactoryOverride : inferLogFactory(settings); - messageStoreFactory = messageStoreFactoryOverride != null ? messageStoreFactoryOverride : inferMessageStoreFactory(settings); + this.lazy = lazy; + this.settings = settings; + + // overrides + if (messageFactoryOverride != null) { + messageFactory = messageFactoryOverride; + } + if (sessionLogFactoryOverride != null) { + sessionLogFactory = sessionLogFactoryOverride; + } + if (messageStoreFactoryOverride != null) { + messageStoreFactory = messageStoreFactoryOverride; + } + + if (!lazy) { + initializeEngine(); + } + } + + /** + * Initializes the engine on demand. May be called immediately in constructor or when needed. + * If initializing later, it should be started afterwards. + */ + void initializeEngine() throws ConfigError, + FieldConvertError, JMException { + if (messageFactory == null) { + messageFactory = new DefaultMessageFactory(); + } + if (sessionLogFactory == null) { + sessionLogFactory = inferLogFactory(settings); + } + if (messageStoreFactory == null) { + messageStoreFactory = inferMessageStoreFactory(settings); + } // Set default session schedule if not specified in configuration if (!settings.isSetting(Session.SETTING_START_TIME)) { @@ -208,9 +247,10 @@ public class QuickfixjEngine extends ServiceSupport { } finally { Thread.currentThread().setContextClassLoader(ccl); } + initialized.set(true); } - private static SessionSettings loadSettings(String settingsResourceName) throws ConfigError { + static SessionSettings loadSettings(String settingsResourceName) throws ConfigError { InputStream inputStream = ObjectHelper.loadResourceAsStream(settingsResourceName); if (inputStream == null) { throw new IllegalArgumentException("Could not load " + settingsResourceName); @@ -507,6 +547,14 @@ public class QuickfixjEngine extends ServiceSupport { return messageCorrelator; } + public boolean isInitialized() { + return this.initialized.get(); + } + + public boolean isLazy() { + return this.lazy; + } + // For Testing Initiator getInitiator() { return initiator; http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java index 4114cc0..ff262c0 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjProducer.java @@ -43,6 +43,7 @@ public class QuickfixjProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws Exception { try { + getEndpoint().ensureInitialized(); sendMessage(exchange, exchange.getIn()); } catch (Exception e) { exchange.setException(e); http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java index 532a9b6..a8a369b 100644 --- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java +++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjComponentTest.java @@ -168,6 +168,7 @@ public class QuickfixjComponentTest { Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null)); assertThat(component.getProvisionalEngines().size(), is(1)); assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue())); + assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true)); assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false)); assertThat(component.getEngines().size(), is(0)); assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue())); @@ -178,6 +179,7 @@ public class QuickfixjComponentTest { Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile2.getName(), null)); assertThat(component.getProvisionalEngines().size(), is(2)); assertThat(component.getProvisionalEngines().get(settingsFile.getName()), is(notNullValue())); + assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isInitialized(), is(true)); assertThat(component.getProvisionalEngines().get(settingsFile.getName()).isStarted(), is(false)); assertThat(component.getEngines().size(), is(0)); assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(nullValue())); @@ -187,15 +189,16 @@ public class QuickfixjComponentTest { assertThat(component.getProvisionalEngines().size(), is(0)); assertThat(component.getEngines().size(), is(2)); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); - + // Move these too an endpoint testcase if one exists assertThat(e1.isSingleton(), is(true)); assertThat(((MultipleConsumersSupport)e1).isMultipleConsumersSupported(), is(true)); assertThat(e2.isSingleton(), is(true)); assertThat(((MultipleConsumersSupport)e2).isMultipleConsumersSupported(), is(true)); } - + @Test public void createEndpointAfterComponentStart() throws Exception { setUpComponent(); @@ -211,6 +214,7 @@ public class QuickfixjComponentTest { Endpoint e1 = component.createEndpoint(getEndpointUri(settingsFile.getName(), null)); assertThat(component.getEngines().size(), is(1)); assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue())); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); assertThat(component.getProvisionalEngines().size(), is(0)); assertThat(((QuickfixjEndpoint)e1).getSessionID(), is(nullValue())); @@ -218,12 +222,102 @@ public class QuickfixjComponentTest { Endpoint e2 = component.createEndpoint(getEndpointUri(settingsFile.getName(), sessionID)); assertThat(component.getEngines().size(), is(1)); assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue())); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); assertThat(component.getProvisionalEngines().size(), is(0)); assertThat(((QuickfixjEndpoint)e2).getSessionID(), is(sessionID)); } @Test + public void createEnginesLazily() throws Exception { + setUpComponent(); + component.setLazyCreateEngines(true); + + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); + settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); + + writeSettings(); + + // start the component + camelContext.start(); + + QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null)); + assertThat(component.getEngines().size(), is(1)); + assertThat(component.getProvisionalEngines().size(), is(0)); + assertThat(component.getEngines().get(settingsFile.getName()), is(notNullValue())); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false)); + assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false)); + + e1.ensureInitialized(); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); + assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); + } + + @Test + public void createEndpointsInNonLazyComponent() throws Exception { + setUpComponent(); + // configuration will be done per endpoint + component.setLazyCreateEngines(false); + + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); + settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); + + writeSettings(); + + // will start the component + camelContext.start(); + + QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null) + "?lazyCreateEngine=true"); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false)); + assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false)); + assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true)); + + e1.ensureInitialized(); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); + assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); + + writeSettings(settings, false); + + // will use connector's lazyCreateEngines setting + component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID)); + assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true)); + assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true)); + assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false)); + } + + @Test + public void createEndpointsInLazyComponent() throws Exception { + setUpComponent(); + component.setLazyCreateEngines(true); + + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.INITIATOR_CONNECTION_TYPE); + settings.setLong(sessionID, Initiator.SETTING_SOCKET_CONNECT_PORT, 1234); + + writeSettings(); + + // will start the component + camelContext.start(); + + // will use connector's lazyCreateEngines setting + QuickfixjEndpoint e1 = (QuickfixjEndpoint) component.createEndpoint(getEndpointUri(settingsFile.getName(), null)); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(false)); + assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false)); + assertThat(component.getEngines().get(settingsFile.getName()).isLazy(), is(true)); + + e1.ensureInitialized(); + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); + assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(true)); + + writeSettings(settings, false); + + // will override connector's lazyCreateEngines setting + component.createEndpoint(getEndpointUri(settingsFile2.getName(), sessionID) + "&lazyCreateEngine=false"); + assertThat(component.getEngines().get(settingsFile2.getName()).isInitialized(), is(true)); + assertThat(component.getEngines().get(settingsFile2.getName()).isStarted(), is(true)); + assertThat(component.getEngines().get(settingsFile2.getName()).isLazy(), is(false)); + } + + @Test public void componentStop() throws Exception { setUpComponent(); @@ -259,6 +353,8 @@ public class QuickfixjComponentTest { component.stop(); assertThat(component.getEngines().get(settingsFile.getName()).isStarted(), is(false)); + // it should still be initialized (ready to start again) + assertThat(component.getEngines().get(settingsFile.getName()).isInitialized(), is(true)); } @Test http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java new file mode 100644 index 0000000..dc1a2e3 --- /dev/null +++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjLazyProducerTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; + +import quickfix.FixVersions; +import quickfix.Message; +import quickfix.MessageUtils; +import quickfix.SessionID; +import quickfix.field.BeginString; +import quickfix.field.SenderCompID; +import quickfix.field.TargetCompID; + +public class QuickfixjLazyProducerTest { + private Exchange mockExchange; + private QuickfixjEndpoint endpoint; + private org.apache.camel.Message mockCamelMessage; + private QuickfixjProducer producer; + private SessionID sessionID; + private Message inboundFixMessage; + private QuickfixjEngine quickfixjEngine; + + @Before + public void setUp() throws Exception { + mockExchange = Mockito.mock(Exchange.class); + mockCamelMessage = Mockito.mock(org.apache.camel.Message.class); + Mockito.when(mockExchange.getIn()).thenReturn(mockCamelMessage); + Mockito.when(mockExchange.getPattern()).thenReturn(ExchangePattern.InOnly); + + quickfixjEngine = TestSupport.createEngine(true); + endpoint = Mockito.spy(new QuickfixjEndpoint(quickfixjEngine, "", new QuickfixjComponent())); + + inboundFixMessage = new Message(); + inboundFixMessage.getHeader().setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44); + inboundFixMessage.getHeader().setString(SenderCompID.FIELD, "SENDER"); + inboundFixMessage.getHeader().setString(TargetCompID.FIELD, "TARGET"); + sessionID = MessageUtils.getSessionID(inboundFixMessage); + + Mockito.when(mockCamelMessage.getBody(Message.class)).thenReturn(inboundFixMessage); + + Mockito.when(endpoint.getSessionID()).thenReturn(sessionID); + + producer = Mockito.spy(new QuickfixjProducer(endpoint)); + } + + @Test + public void processWithLazyEngine() throws Exception { + QuickfixjEngine engine = (QuickfixjEngine) ReflectionTestUtils.getField(endpoint, "engine"); + assertThat(engine.isInitialized(), is(false)); + assertThat(engine.isStarted(), is(false)); +// Session mockSession = Mockito.spy(TestSupport.createSession(sessionID)); +// Mockito.doReturn(mockSession).when(producer).getSession(MessageUtils.getSessionID(inboundFixMessage)); +// Mockito.doReturn(true).when(mockSession).send(Matchers.isA(Message.class)); + + producer.process(mockExchange); + assertThat(engine.isInitialized(), is(true)); + assertThat(engine.isStarted(), is(true)); +// +// Mockito.verify(mockExchange, Mockito.never()).setException(Matchers.isA(IllegalStateException.class)); +// Mockito.verify(mockSession).send(inboundFixMessage); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java index f071bc9..1915a5f 100644 --- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java +++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/QuickfixjSpringTest.java @@ -67,7 +67,14 @@ public class QuickfixjSpringTest extends CamelSpringTestSupport { assertThat(sessionProperties.get("SocketConnectProtocol").toString(), CoreMatchers.is("VM_PIPE")); QuickfixjComponent component = context.getComponent("quickfix", QuickfixjComponent.class); + assertThat(component.isLazyCreateEngines(), is(false)); QuickfixjEngine engine = component.getEngines().values().iterator().next(); + assertThat(engine.isInitialized(), is(true)); + + QuickfixjComponent lazyComponent = context.getComponent("lazyQuickfix", QuickfixjComponent.class); + assertThat(lazyComponent.isLazyCreateEngines(), is(true)); + QuickfixjEngine lazyEngine = lazyComponent.getEngines().values().iterator().next(); + assertThat(lazyEngine.isInitialized(), is(false)); assertThat(engine.getMessageFactory(), is(instanceOf(CustomMessageFactory.class))); } http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java index c0d415a..5a14e63 100644 --- a/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java +++ b/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java @@ -93,8 +93,12 @@ public final class TestSupport { return factory.create(sessionID, settings); } - + public static QuickfixjEngine createEngine() throws ConfigError, FieldConvertError, IOException, JMException { + return createEngine(false); + } + + public static QuickfixjEngine createEngine(boolean lazy) throws ConfigError, FieldConvertError, IOException, JMException { SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET"); MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class); @@ -114,6 +118,6 @@ public final class TestSupport { return new QuickfixjEngine("", settings, mockMessageStoreFactory, Mockito.mock(LogFactory.class), - Mockito.mock(MessageFactory.class)); + Mockito.mock(MessageFactory.class), lazy); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e6f1bc4e/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml ---------------------------------------------------------------------- diff --git a/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml b/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml index 03725b7..10f9d58 100644 --- a/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml +++ b/components/camel-quickfix/src/test/resources/org/apache/camel/component/quickfixj/QuickfixjSpringTest-context.xml @@ -34,6 +34,13 @@ + + + + ${in.header.EventCategory} == 'AppMessageReceived' + + + @@ -48,6 +55,19 @@ + + + + + + + + + + + + +