Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B5E7718FB5 for ; Sun, 5 Jul 2015 23:45:03 +0000 (UTC) Received: (qmail 44561 invoked by uid 500); 5 Jul 2015 23:45:03 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 44455 invoked by uid 500); 5 Jul 2015 23:45:03 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 44179 invoked by uid 99); 5 Jul 2015 23:45:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jul 2015 23:45:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CFA1BE1809; Sun, 5 Jul 2015 23:45:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rhs@apache.org To: commits@qpid.apache.org Date: Sun, 05 Jul 2015 23:45:06 -0000 Message-Id: <31a60dffbe6e40e1b0f64ce574a9276b@git.apache.org> In-Reply-To: <232a9358b57044748b5da9fc6483eafb@git.apache.org> References: <232a9358b57044748b5da9fc6483eafb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/38] qpid-proton git commit: PROTON-881: Add a Send example, and supporting changes in the reactor. PROTON-881: Add a Send example, and supporting changes in the reactor. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd09de66 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd09de66 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd09de66 Branch: refs/heads/master Commit: cd09de66362580f0c5ceab464d71c7ad4300b517 Parents: 88df5e7 Author: Adrian Preston Authored: Tue Apr 21 16:23:12 2015 +0100 Committer: Adrian Preston Committed: Wed May 6 23:23:47 2015 +0100 ---------------------------------------------------------------------- .../qpid/proton/example/reactor/Send.java | 142 +++++++++++++++++++ .../apache/qpid/proton/engine/Connection.java | 7 +- .../qpid/proton/engine/impl/ConnectionImpl.java | 32 ++++- .../qpid/proton/engine/impl/EventImpl.java | 12 +- .../qpid/proton/engine/impl/TransportImpl.java | 8 +- .../apache/qpid/proton/reactor/Handshaker.java | 72 ++++++++++ .../org/apache/qpid/proton/reactor/Reactor.java | 5 +- .../qpid/proton/reactor/ReactorChild.java | 27 ++++ .../apache/qpid/proton/reactor/Selectable.java | 2 +- .../qpid/proton/reactor/impl/IOHandler.java | 19 ++- .../qpid/proton/reactor/impl/ReactorImpl.java | 21 ++- .../qpid/proton/reactor/impl/SelectorImpl.java | 30 ++-- 12 files changed, 342 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java new file mode 100644 index 0000000..5cd5811 --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; +import java.nio.BufferOverflowException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.reactor.Handshaker; +import org.apache.qpid.proton.reactor.Reactor; + +// This is a send in terms of low level AMQP events. +public class Send extends BaseHandler { + + private class SendHandler extends BaseHandler { + + private final String hostname; + private final Message message; + private int nextTag = 0; + + private SendHandler(String hostname, Message message) { + this.hostname = hostname; + this.message = message; + + // Add a child handler that performs some default handshaking + // behaviour. + add(new Handshaker()); + } + + @Override + public void onConnectionInit(Event event) { + Connection conn = event.getConnection(); + conn.setHostname(hostname); + + // Every session or link could have their own handler(s) if we + // wanted simply by adding the handler to the given session + // or link + Session ssn = conn.session(); + + // If a link doesn't have an event handler, the events go to + // its parent session. If the session doesn't have a handler + // the events go to its parent connection. If the connection + // doesn't have a handler, the events go to the reactor. + Sender snd = ssn.sender("sender"); + conn.open(); + ssn.open(); + snd.open(); + } + + @Override + public void onLinkFlow(Event event) { + Sender snd = (Sender)event.getLink(); + if (snd.getCredit() > 0 && message != null) { + byte[] msgData = new byte[1024]; + int length; + while(true) { + try { + length = message.encode(msgData, 0, msgData.length); + break; + } catch(BufferOverflowException e) { + msgData = new byte[msgData.length * 2]; + } + } + byte[] tag = String.valueOf(nextTag++).getBytes(); + Delivery dlv = snd.delivery(tag); + snd.send(msgData, 0, length); + dlv.settle(); + snd.advance(); + snd.close(); + snd.getSession().close(); + snd.getSession().getConnection().close(); + } + } + + @Override + public void onTransportError(Event event) { + ErrorCondition condition = event.getTransport().getCondition(); + if (condition != null) { + System.err.println("Error: " + condition.getDescription()); + } else { + System.err.println("Error (no description returned)."); + } + } + } + + private final String hostname; + private final Message message; + + private Send(String hostname, String content) { + this.hostname = hostname; + message = Proton.message(); + message.setBody(new AmqpValue(content)); + } + + @Override + public void onReactorInit(Event event) { + // You can use the connection method to create AMQP connections. + + // This connection's handler is the SendHandler object. All the events + // for this connection will go to the SendHandler object instead of + // going to the reactor. If you were to omit the SendHandler object, + // all the events would go to the reactor. + event.getReactor().connection(new SendHandler(hostname, message)); + } + + public static void main(String[] args) throws IOException { + String hostname = args.length > 0 ? args[0] : "localhost"; + String content = args.length > 1 ? args[1] : "Hello World!"; + + Reactor r = Proton.reactor(new Send(hostname, content)); + r.run(); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java index 5cb57a2..3dccbb1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java @@ -25,6 +25,8 @@ import java.util.Map; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.impl.ConnectionImpl; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.ReactorChild; /** @@ -35,7 +37,7 @@ import org.apache.qpid.proton.engine.impl.ConnectionImpl; * {@link #sessionHead(EnumSet, EnumSet)}, {@link #linkHead(EnumSet, EnumSet)} * {@link #getWorkHead()} respectively. */ -public interface Connection extends HandlerEndpoint +public interface Connection extends HandlerEndpoint, ReactorChild { public static final class Factory @@ -110,12 +112,15 @@ public interface Connection extends HandlerEndpoint void setProperties(Map properties); + @Override Object getContext(); + @Override void setContext(Object context); void collect(Collector collector); Transport getTransport(); + Reactor getReactor(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java index eecc05e..b018a95 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java @@ -25,9 +25,16 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.Map; + import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.engine.*; import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.ProtonJConnection; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.reactor.Reactor; public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnection { @@ -66,6 +73,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec private Object _context; private CollectorImpl _collector; + private Reactor _reactor; private static final Symbol[] EMPTY_SYMBOL_ARRAY = new Symbol[0]; @@ -77,6 +85,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec { } + @Override public SessionImpl session() { SessionImpl session = new SessionImpl(this); @@ -154,6 +163,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec } + @Override public Session sessionHead(final EnumSet local, final EnumSet remote) { if(_sessionHead == null) @@ -168,6 +178,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec } } + @Override public Link linkHead(EnumSet local, EnumSet remote) { if(_linkHead == null) @@ -274,6 +285,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec } } + @Override public int getMaxChannels() { return _maxChannels; @@ -290,6 +302,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec _localContainerId = localContainerId; } + @Override public DeliveryImpl getWorkHead() { return _workHead; @@ -376,11 +389,13 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec return _properties; } + @Override public void setProperties(Map properties) { _properties = properties; } + @Override public Map getRemoteProperties() { return _remoteProperties; @@ -391,6 +406,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec _remoteProperties = remoteProperties; } + @Override public String getHostname() { return _localHostname; @@ -471,6 +487,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec _transport = transport; } + @Override public TransportImpl getTransport() { return _transport; @@ -497,6 +514,7 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec throw new UnsupportedOperationException(); } + @Override public DeliveryImpl next() { DeliveryImpl next = _next; @@ -588,16 +606,19 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec } } + @Override public Object getContext() { return _context; } + @Override public void setContext(Object context) { _context = context; } + @Override public void collect(Collector collector) { _collector = (CollectorImpl) collector; @@ -637,4 +658,13 @@ public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnec { put(Event.Type.CONNECTION_LOCAL_CLOSE, this); } + + @Override + public Reactor getReactor() { + return _reactor; + } + + public void setReactor(Reactor reactor) { + _reactor = reactor; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java index 65a2000..6abec58 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java @@ -295,17 +295,13 @@ class EventImpl implements Event } else if (context instanceof Transport) { return ((TransportImpl)context).getReactor(); } else if (context instanceof Delivery) { - Transport transport = ((Delivery)context).getLink().getSession().getConnection().getTransport(); - return ((TransportImpl)transport).getReactor(); + return ((Delivery)context).getLink().getSession().getConnection().getReactor(); } else if (context instanceof Link) { - Transport transport = ((Link)context).getSession().getConnection().getTransport(); - return ((TransportImpl)transport).getReactor(); + return ((Link)context).getSession().getConnection().getReactor(); } else if (context instanceof Session) { - Transport transport = ((Session)context).getConnection().getTransport(); - return ((TransportImpl)transport).getReactor(); + return ((Session)context).getConnection().getReactor(); } else if (context instanceof Connection) { - Transport transport = ((Connection)context).getTransport(); - return ((TransportImpl)transport).getReactor(); + return ((Connection)context).getReactor(); } else if (context instanceof Selectable) { return ((Selectable)context).getReactor(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index f4813cd..694e23b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -121,6 +121,7 @@ public class TransportImpl extends EndpointImpl private boolean postedHeadClosed = false; private boolean postedTailClosed = false; + private boolean postedTransportError = false; private int _localIdleTimeout = 0; private int _remoteIdleTimeout = 0; @@ -591,7 +592,9 @@ public class TransportImpl extends EndpointImpl session.incrementOutgoingBytes(-delta); } - getConnectionImpl().put(Event.Type.LINK_FLOW, snd); + if (snd.getLocalState() != EndpointState.CLOSED) { + getConnectionImpl().put(Event.Type.LINK_FLOW, snd); + } } if(wasDone && delivery.getLocalState() != null) @@ -1319,8 +1322,9 @@ public class TransportImpl extends EndpointImpl } _head_closed = true; } - if (_condition != null) { + if (_condition != null && !postedTransportError) { put(Event.Type.TRANSPORT_ERROR, this); + postedTransportError = true; } if (!postedTailClosed) { put(Event.Type.TRANSPORT_TAIL_CLOSED, this); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java new file mode 100644 index 0000000..f9b670a --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; + +public class Handshaker extends BaseHandler { + + private void open(Endpoint endpoint) { + if (endpoint.getLocalState() == EndpointState.UNINITIALIZED) { + endpoint.open(); + } + } + + private void close(Endpoint endpoint) { + if (endpoint.getLocalState() != EndpointState.CLOSED) { + endpoint.close(); + } + } + + @Override + public void onConnectionRemoteOpen(Event event) { + open(event.getConnection()); + } + + @Override + public void onSessionRemoteOpen(Event event) { + open(event.getSession()); + } + + @Override + public void onLinkRemoteOpen(Event event) { + open(event.getLink()); + } + + @Override + public void onConnectionRemoteClose(Event event) { + close(event.getConnection()); + } + + @Override + public void onSessionRemoteClose(Event event) { + close(event.getSession()); + } + + @Override + public void onLinkRemoteClose(Event event) { + close(event.getLink()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 02c5de2..0c56a48 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Set; import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.reactor.impl.ReactorImpl; @@ -64,7 +65,7 @@ public interface Reactor { */ - public Set children(); + public Set children(); public Collector collector(); @@ -93,7 +94,7 @@ public interface Reactor { // pn_reactor_schedule from reactor.c public Task schedule(int delay, Handler handler); // TODO: acceptor - // TODO: connection // TODO: acceptorClose + Connection connection(Handler handler); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java new file mode 100644 index 0000000..d020d1a --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +// Tagging interface used to identify classes that can be a child of a reactor. +public interface ReactorChild { + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java index c2b560f..7bb64c7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java @@ -27,7 +27,7 @@ import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.Transport; -public interface Selectable { +public interface Selectable extends ReactorChild { public interface Callback { void run(Selectable selectable); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index 08aca1f..ee988ee 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -22,6 +22,7 @@ package org.apache.qpid.proton.reactor.impl; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SocketChannel; import java.util.Iterator; @@ -96,16 +97,20 @@ public class IOHandler extends BaseHandler { hostname = hostname.substring(0, colonIndex); } - Transport transport = event.getTransport(); + Transport transport = event.getConnection().getTransport(); Socket socket = null; // TODO: null is our equivalent of PN_INVALID_SOCKET try { - socket = new Socket(hostname, port); + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.connect(new InetSocketAddress(hostname, port)); + socket = socketChannel.socket(); } catch(IOException ioException) { - ErrorCondition condition = transport.getCondition(); + ErrorCondition condition = new ErrorCondition(); condition.setCondition(Symbol.getSymbol("proton:io")); condition.setDescription(ioException.getMessage()); + transport.setCondition(condition); transport.close_tail(); transport.close_head(); + transport.pop(transport.pending()); // TODO: force generation of TRANSPORT_HEAD_CLOSE (not in C code) } selectableTransport(reactor, socket, transport); } @@ -170,9 +175,10 @@ public class IOHandler extends BaseHandler { transport.process(); } } catch (IOException e) { - ErrorCondition condition = transport.getCondition(); + ErrorCondition condition = new ErrorCondition(); condition.setCondition(Symbol.getSymbol("proton:io")); condition.setDescription(e.getMessage()); + transport.setCondition(condition); transport.close_tail(); } } @@ -201,9 +207,10 @@ public class IOHandler extends BaseHandler { transport.pop(n); } } catch(IOException ioException) { - ErrorCondition condition = transport.getCondition(); + ErrorCondition condition = new ErrorCondition(); condition.setCondition(Symbol.getSymbol("proton:io")); condition.setDescription(ioException.getMessage()); + transport.setCondition(condition); transport.close_head(); } } @@ -259,7 +266,7 @@ public class IOHandler extends BaseHandler { private Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { // TODO: this code needs to be able to deal with a null socket (this is our equivalent of PN_INVALID_SOCKET) Selectable selectable = reactor.selectable(); - selectable.setChannel(socket.getChannel()); + selectable.setChannel(socket != null ? socket.getChannel() : null); selectable.onReadable(new ConnectionReadable()); selectable.onWritable(new ConnectionWritable()); selectable.onError(new ConnectionError()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 5072958..0a7f84d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -30,12 +30,15 @@ import java.util.Set; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.engine.impl.ConnectionImpl; import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl; import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.ReactorChild; import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selectable.Callback; import org.apache.qpid.proton.reactor.Selectable.RecordKeyType; @@ -68,7 +71,7 @@ public class ReactorImpl implements Reactor { private long timeout; private Handler global; private Handler handler; - private Set children; + private Set children; private int selectables; private boolean yield; private Selectable selectable; @@ -109,7 +112,7 @@ public class ReactorImpl implements Reactor { collector = (CollectorImpl)Proton.collector(); global = new IOHandler(); handler = new BaseHandler(); - children = new HashSet(); + children = new HashSet(); selectables = 0; timer = new Timer(collector); wakeup = Pipe.open(); @@ -182,7 +185,7 @@ public class ReactorImpl implements Reactor { */ @Override - public Set children() { + public Set children() { return children; } @@ -442,4 +445,16 @@ public class ReactorImpl implements Reactor { protected void setSelector(Selector selector) { this.selector = selector; } + + // pn_reactor_connection from connection.c + @Override + public Connection connection(Handler handler) { + Connection connection = Proton.connection(); + connection.add(handler); + connection.collect(collector); + children.add(connection); + ((ConnectionImpl)connection).setReactor(this); + // TODO: C code adds a reference back to the reactor from connection + return connection; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd09de66/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index c74853e..35a6555 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -44,27 +44,34 @@ public class SelectorImpl implements Selector { @Override public void add(Selectable selectable) throws IOException { - selectable.getChannel().configureBlocking(false); - SelectionKey key = selectable.getChannel().register(selector, 0); - key.attach(selectable); + // TODO: valid for selectable to have a 'null' channel - in this case it can only expire... + if (selectable.getChannel() != null) { + selectable.getChannel().configureBlocking(false); + SelectionKey key = selectable.getChannel().register(selector, 0); + key.attach(selectable); + } selectables.add(selectable); update(selectable); } @Override public void update(Selectable selectable) { - int interestedOps = 0; - if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ; - if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; - SelectionKey key = selectable.getChannel().keyFor(selector); - key.interestOps(interestedOps); + if (selectable.getChannel() != null) { + int interestedOps = 0; + if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ; + if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; + SelectionKey key = selectable.getChannel().keyFor(selector); + key.interestOps(interestedOps); + } } @Override public void remove(Selectable selectable) { - SelectionKey key = selectable.getChannel().keyFor(selector); - key.cancel(); - key.attach(null); + if (selectable.getChannel() != null) { + SelectionKey key = selectable.getChannel().keyFor(selector); + key.cancel(); + key.attach(null); + } selectables.remove(selectable); } @@ -106,6 +113,7 @@ public class SelectorImpl implements Selector { if (key.isReadable()) readable.add(selectable); if (key.isWritable()) writeable.add(selectable); } + selector.selectedKeys().clear(); for (Selectable selectable : selectables) { // TODO: this is different to the C code which evaluates expiry at the point the selectable is iterated over. long deadline = selectable.getDeadline(); if (deadline > 0 && awoken >= deadline) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org