asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "abdullah alamoudi (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-1443][FEED] Remove Frame Distributor
Date Sat, 24 Jun 2017 02:11:11 GMT
abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1853

Change subject: [ASTERIXDB-1443][FEED] Remove Frame Distributor
......................................................................

[ASTERIXDB-1443][FEED] Remove Frame Distributor

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- FrameDistributor and DistributeFeedFrameWriter are not used
  anymore.

Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5
---
D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
2 files changed, 0 insertions(+), 302 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/53/1853/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
deleted file mode 100644
index ae2e0b9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Provides mechanism for distributing the frames, as received from an operator to a
- * set of registered readers. Each reader typically operates at a different pace. Readers
- * are isolated from each other to ensure that a slow reader does not impact the progress
of
- * others.
- **/
-public class DistributeFeedFrameWriter implements IFrameWriter {
-
-    /** A unique identifier for the feed to which the incoming tuples belong. **/
-    private final EntityId feedId;
-
-    /**
-     * An instance of FrameDistributor that provides the mechanism for distributing a frame
to multiple readers, each
-     * operating in isolation.
-     **/
-    private final FrameDistributor frameDistributor;
-
-    /** The original frame writer instantiated as part of job creation **/
-    private final IFrameWriter writer;
-
-    /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter
**/
-    private final FeedRuntimeType feedRuntimeType;
-
-    /** The value of the partition 'i' if this is the i'th instance of the associated operator
**/
-    private final int partition;
-
-    public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, FeedRuntimeType
feedRuntimeType,
-            int partition) throws IOException {
-        this.feedId = feedId;
-        this.frameDistributor = new FrameDistributor();
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.writer = writer;
-    }
-
-    /**
-     * @param fpa
-     *            Feed policy accessor
-     * @param nextOnlyWriter
-     *            the writer which will deliver the buffers
-     * @param connectionId
-     *            (Dataverse - Dataset - Feed)
-     * @return A frame collector.
-     * @throws HyracksDataException
-     */
-    public void subscribe(FeedFrameCollector collector) throws HyracksDataException {
-        frameDistributor.registerFrameCollector(collector);
-    }
-
-    public void unsubscribeFeed(FeedConnectionId connectionId) throws HyracksDataException
{
-        frameDistributor.deregisterFrameCollector(connectionId);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            frameDistributor.close();
-        } finally {
-            writer.close();
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        frameDistributor.nextFrame(frame);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-    }
-
-    @Override
-    public String toString() {
-        return feedId.toString() + feedRuntimeType + "[" + partition + "]";
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        frameDistributor.flush();
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
deleted file mode 100644
index 6ca4b77..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class FrameDistributor implements IFrameWriter {
-
-    public static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName());
-    /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/
-    private final Map<FeedConnectionId, FeedFrameCollector> registeredCollectors;
-    private Throwable rootFailureCause = null;
-
-    public FrameDistributor() throws HyracksDataException {
-        this.registeredCollectors = new HashMap<FeedConnectionId, FeedFrameCollector>();
-    }
-
-    public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) throws
HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
-                    rootFailureCause);
-        }
-        // registering a new collector.
-        try {
-            frameCollector.open();
-        } catch (Throwable th) {
-            rootFailureCause = th;
-            try {
-                frameCollector.fail();
-            } catch (Throwable failThrowable) {
-                th.addSuppressed(failThrowable);
-            } finally {
-                try {
-                    frameCollector.close();
-                } catch (Throwable closeThrowable) {
-                    th.addSuppressed(closeThrowable);
-                }
-            }
-            throw th;
-        }
-        registeredCollectors.put(frameCollector.getConnectionId(), frameCollector);
-    }
-
-    public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector)
throws HyracksDataException {
-        deregisterFrameCollector(frameCollector.getConnectionId());
-    }
-
-    public synchronized void deregisterFrameCollector(FeedConnectionId connectionId) throws
HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
-                    rootFailureCause);
-        }
-        FeedFrameCollector frameCollector = removeFrameCollector(connectionId);
-        try {
-            frameCollector.close();
-        } catch (Throwable th) {
-            rootFailureCause = th;
-            throw th;
-        }
-    }
-
-    public synchronized FeedFrameCollector removeFrameCollector(FeedConnectionId connectionId)
{
-        return registeredCollectors.remove(connectionId);
-    }
-
-    /*
-     * Fix. What should be done?:
-     * 0. mark failure so no one can subscribe or unsubscribe.
-     * 1. Throw the throwable.
-     * 2. when fail() is called, call fail on all subscribers
-     * 3. close all the subscribers.
-     * (non-Javadoc)
-     * @see org.apache.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
-     */
-    @Override
-    public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new HyracksDataException(rootFailureCause);
-        }
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            try {
-                collector.nextFrame(frame);
-            } catch (Throwable th) {
-                rootFailureCause = th;
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        Collection<FeedFrameCollector> collectors = registeredCollectors.values();
-        Iterator<FeedFrameCollector> it = collectors.iterator();
-        while (it.hasNext()) {
-            FeedFrameCollector collector = it.next();
-            try {
-                collector.fail();
-            } catch (Throwable th) {
-                while (it.hasNext()) {
-                    FeedFrameCollector innerCollector = it.next();
-                    try {
-                        innerCollector.fail();
-                    } catch (Throwable innerTh) {
-                        th.addSuppressed(innerTh);
-                    }
-                }
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        Collection<FeedFrameCollector> collectors = registeredCollectors.values();
-        Iterator<FeedFrameCollector> it = collectors.iterator();
-        while (it.hasNext()) {
-            FeedFrameCollector collector = it.next();
-            try {
-                collector.close();
-            } catch (Throwable th) {
-                while (it.hasNext()) {
-                    FeedFrameCollector innerCollector = it.next();
-                    try {
-                        innerCollector.close();
-                    } catch (Throwable innerTh) {
-                        th.addSuppressed(innerTh);
-                    } finally {
-                        innerCollector.setState(State.FINISHED);
-                    }
-                }
-                // resume here
-                throw th;
-            } finally {
-                collector.setState(State.FINISHED);
-            }
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new HyracksDataException(rootFailureCause);
-        }
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            try {
-                collector.flush();
-            } catch (Throwable th) {
-                rootFailureCause = th;
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        // Nothing to do here :)
-    }
-}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1853
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message