Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D6DA200CE4 for ; Sun, 20 Aug 2017 17:03:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1BEF016441A; Sun, 20 Aug 2017 15:03:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 21CE7164417 for ; Sun, 20 Aug 2017 17:03:10 +0200 (CEST) Received: (qmail 29378 invoked by uid 500); 20 Aug 2017 15:03:10 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 25394 invoked by uid 99); 20 Aug 2017 15:03:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 20 Aug 2017 15:03:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DA44F5EFE; Sun, 20 Aug 2017 15:03:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: pei@apache.org To: commits@beam.apache.org Date: Sun, 20 Aug 2017 15:03:22 -0000 Message-Id: <040a254e883646958ba2f48655e0993b@git.apache.org> In-Reply-To: <24a6f93ff9bc49bb8bb97d5d7383db66@git.apache.org> References: <24a6f93ff9bc49bb8bb97d5d7383db66@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private. archived-at: Sun, 20 Aug 2017 15:03:14 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java deleted file mode 100644 index a26472c..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JStorm {@link Executor} for {@link DoFn} with multi-output. - * @param - * @param - */ -public class MultiOutputDoFnExecutor extends DoFnExecutor { - private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class); - - /** - * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated - * tag is used in downstream consumer. So before output, we need to map this "local" tag to - * "external" tag. See PCollectionTuple for details. - */ - public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager { - @Override - public void output(TupleTag tag, WindowedValue output) { - if (localTupleTagMap.containsKey(tag)) { - executorsBolt.processExecutorElem((TupleTag) localTupleTagMap.get(tag), output); - } else { - executorsBolt.processExecutorElem(tag, output); - } - } - } - - protected Map, TupleTag> localTupleTagMap; - - public MultiOutputDoFnExecutor( - String stepName, - String description, - JStormPipelineOptions pipelineOptions, - DoFn doFn, - Coder> inputCoder, - WindowingStrategy windowingStrategy, - TupleTag mainInputTag, - Collection> sideInputs, - Map> sideInputTagToView, - TupleTag mainTupleTag, - List> sideOutputTags, - Map, TupleTag> localTupleTagMap - ) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, - sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); - this.localTupleTagMap = localTupleTagMap; - this.outputManager = new MultiOutputDoFnExecutorOutputManager(); - LOG.info("localTupleTagMap: {}", localTupleTagMap); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java deleted file mode 100644 index 5e87cff..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; -import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * JStorm {@link Executor} for stateful {@link DoFn} with multi-output. - * @param - */ -public class MultiStatefulDoFnExecutor extends MultiOutputDoFnExecutor { - - public MultiStatefulDoFnExecutor( - String stepName, String description, - JStormPipelineOptions pipelineOptions, DoFn doFn, - Coder> inputCoder, WindowingStrategy windowingStrategy, - TupleTag mainInputTag, Collection> sideInputs, - Map> sideInputTagToView, TupleTag mainTupleTag, - List> sideOutputTags, Map, TupleTag> localTupleTagMap) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, - sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - if (mainInputTag.equals(tag)) { - WindowedValue kvElem = (WindowedValue) elem; - stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, - executorContext.getExecutorsBolt().timerService())); - stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - processMainInput(elem); - } else { - processSideInput(tag, elem); - } - } - - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - stepContext.setStateInternals(new JStormStateInternals<>(key, - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - super.onTimer(key, timerData); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java deleted file mode 100644 index 77ae844..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals; -import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * JStorm {@link Executor} for stateful {@link DoFn}. - * @param - */ -public class StatefulDoFnExecutor extends DoFnExecutor { - public StatefulDoFnExecutor( - String stepName, String description, JStormPipelineOptions pipelineOptions, - DoFn doFn, Coder> inputCoder, - WindowingStrategy windowingStrategy, TupleTag mainInputTag, - Collection> sideInputs, Map> - sideInputTagToView, TupleTag mainTupleTag, List> sideOutputTags) { - super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, - mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - if (mainInputTag.equals(tag)) { - WindowedValue kvElem = (WindowedValue) elem; - stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this, - executorContext.getExecutorsBolt().timerService())); - stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(), - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - processMainInput(elem); - } else { - processSideInput(tag, elem); - } - } - - @Override - public void onTimer(Object key, TimerInternals.TimerData timerData) { - stepContext.setStateInternals(new JStormStateInternals<>(key, - kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId)); - super.onTimer(key, timerData); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java deleted file mode 100644 index 5c41bda..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.runners.core.TimerInternals; -import org.joda.time.Instant; - -/** - * Interface that tracks input watermarks and manages timers in each bolt. - */ -public interface TimerService extends Serializable { - - void init(List upStreamTasks); - - /** - * - * @param task - * @param inputWatermark - * @return new watermark if any timer is triggered during the update of watermark, otherwise 0 - */ - long updateInputWatermark(Integer task, long inputWatermark); - - long currentInputWatermark(); - - long currentOutputWatermark(); - - void clearWatermarkHold(String namespace); - - void addWatermarkHold(String namespace, Instant watermarkHold); - - void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor); - - void fireTimers(long newWatermark); -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java deleted file mode 100644 index 0103095..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.alibaba.jstorm.utils.Pair; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.joda.time.Instant; - -/** - * Default implementation of {@link TimerService}. - */ -public class TimerServiceImpl implements TimerService { - private transient ExecutorContext executorContext; - private transient Map idToDoFnExecutor; - - private final ConcurrentMap upStreamTaskToInputWatermark = - new ConcurrentHashMap<>(); - private final PriorityQueue inputWatermarks = new PriorityQueue<>(); - private final PriorityQueue watermarkHolds = new PriorityQueue<>(); - private final Map namespaceToWatermarkHold = new HashMap<>(); - private final transient PriorityQueue eventTimeTimersQueue = - new PriorityQueue<>(); - private final Map>> - timerDataToKeyedExecutors = Maps.newHashMap(); - - private boolean initialized = false; - - public TimerServiceImpl() { - } - - public TimerServiceImpl(ExecutorContext executorContext) { - this.executorContext = executorContext; - this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor(); - } - - @Override - public void init(List upStreamTasks) { - for (Integer task : upStreamTasks) { - upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); - inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()); - } - initialized = true; - } - - @Override - public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) { - checkState(initialized, "TimerService has not been initialized."); - Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task); - // Make sure the input watermark don't go backward. - if (taskInputWatermark > oldTaskInputWatermark) { - upStreamTaskToInputWatermark.put(task, taskInputWatermark); - inputWatermarks.add(taskInputWatermark); - inputWatermarks.remove(oldTaskInputWatermark); - - long newLocalInputWatermark = currentInputWatermark(); - if (newLocalInputWatermark > oldTaskInputWatermark) { - return newLocalInputWatermark; - } - } - return 0; - } - - @Override - public void fireTimers(long newWatermark) { - TimerInternals.TimerData timerData; - while ((timerData = eventTimeTimersQueue.peek()) != null - && timerData.getTimestamp().getMillis() <= newWatermark) { - for (Pair keyedExecutor : timerDataToKeyedExecutors.get(timerData)) { - DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst()); - executor.onTimer(keyedExecutor.getSecond(), timerData); - } - eventTimeTimersQueue.remove(); - timerDataToKeyedExecutors.remove(timerData); - } - } - - @Override - public long currentInputWatermark() { - return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); - } - - @Override - public long currentOutputWatermark() { - if (watermarkHolds.isEmpty()) { - return currentInputWatermark(); - } else { - return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis()); - } - } - - @Override - public void clearWatermarkHold(String namespace) { - Instant currentHold = namespaceToWatermarkHold.get(namespace); - if (currentHold != null) { - watermarkHolds.remove(currentHold); - namespaceToWatermarkHold.remove(namespace); - } - } - - @Override - public void addWatermarkHold(String namespace, Instant watermarkHold) { - Instant currentHold = namespaceToWatermarkHold.get(namespace); - if (currentHold == null) { - namespaceToWatermarkHold.put(namespace, watermarkHold); - watermarkHolds.add(watermarkHold); - } else if (currentHold != null && watermarkHold.isBefore(currentHold)) { - namespaceToWatermarkHold.put(namespace, watermarkHold); - watermarkHolds.add(watermarkHold); - watermarkHolds.remove(currentHold); - } - } - - @Override - public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) { - checkArgument( - TimeDomain.EVENT_TIME.equals(timerData.getDomain()), - String.format("Does not support domain: %s.", timerData.getDomain())); - Set> keyedExecutors = timerDataToKeyedExecutors.get(timerData); - if (keyedExecutors == null) { - keyedExecutors = Sets.newHashSet(); - eventTimeTimersQueue.add(timerData); - } - keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key)); - timerDataToKeyedExecutors.put(timerData, keyedExecutors); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java deleted file mode 100644 index 8dc51b5..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import com.alibaba.jstorm.cache.IKvStore; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor; -import java.io.IOException; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Transactional executors bolt handles the checkpoint and restore of state and timer. - */ -public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor { - private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class); - - private static final String TIME_SERVICE_STORE_ID = "timer_service_store"; - private static final String TIMER_SERVICE_KET = "timer_service_key"; - - private ExecutorsBolt executorsBolt; - private IKvStoreManager kvStoreManager; - private IKvStore timerServiceStore; - - public TxExecutorsBolt(ExecutorsBolt executorsBolt) { - this.executorsBolt = executorsBolt; - this.executorsBolt.setStatefulBolt(true); - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - try { - executorsBolt.prepare(stormConf, context, collector); - kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager(); - timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); - } catch (IOException e) { - LOG.error("Failed to prepare stateful bolt", e); - throw new RuntimeException(e.getMessage()); - } - } - - @Override - public void execute(Tuple input) { - executorsBolt.execute(input); - } - - @Override - public void cleanup() { - executorsBolt.cleanup(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - executorsBolt.declareOutputFields(declarer); - } - - @Override - public Map getComponentConfiguration() { - return executorsBolt.getComponentConfiguration(); - } - - @Override - public void initState(Object userState) { - LOG.info("Begin to init from state: {}", userState); - restore(userState); - } - - @Override - public Object finishBatch(long batchId) { - try { - timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService()); - } catch (IOException e) { - LOG.error("Failed to store current timer service status", e); - throw new RuntimeException(e.getMessage()); - } - kvStoreManager.checkpoint(batchId); - return null; - } - - @Override - public Object commit(long batchId, Object state) { - return kvStoreManager.backup(batchId); - } - - @Override - public void rollBack(Object userState) { - LOG.info("Begin to rollback from state: {}", userState); - restore(userState); - } - - @Override - public void ackCommit(long batchId, long timeStamp) { - kvStoreManager.remove(batchId); - } - - private void restore(Object userState) { - try { - // restore all states - kvStoreManager.restore(userState); - - // init timer service - timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID); - TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET); - if (timerService == null) { - timerService = executorsBolt.initTimerService(); - } - executorsBolt.setTimerService(timerService); - } catch (IOException e) { - LOG.error("Failed to restore state", e); - throw new RuntimeException(e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java deleted file mode 100644 index 48b410f..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import com.alibaba.jstorm.cache.IKvStore; -import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.cache.KvStoreManagerFactory; -import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor; -import java.io.IOException; -import java.util.Map; -import org.apache.beam.sdk.io.UnboundedSource; -import org.slf4j.LoggerFactory; - -/** - * Transactional unbounded source spout handles the checkpoint and restore of state and timer. - */ -public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor { - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class); - - private static final String SOURCE_STORE_ID = "SourceCheckpoint"; - private static final String CHECKPOINT_MARK = "CheckpointMark"; - - private UnboundedSourceSpout sourceSpout; - private UnboundedSource.UnboundedReader reader; - private IKvStoreManager kvStoreManager; - private IKvStore sourceCheckpointStore; - - public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) { - this.sourceSpout = sourceSpout; - } - - private void restore(Object userState) { - try { - kvStoreManager.restore(userState); - sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID); - UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK); - sourceSpout.createSourceReader(checkpointMark); - reader = sourceSpout.getUnboundedSourceReader(); - } catch (IOException e) { - LOG.error("Failed to init state", e); - throw new RuntimeException(e.getMessage()); - } - } - - @Override - public void initState(Object userState) { - restore(userState); - } - - @Override - public Object finishBatch(long checkpointId) { - try { - // Store check point mark from unbounded source reader - UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark(); - sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark); - - // checkpoint all kv stores in current manager - kvStoreManager.checkpoint(checkpointId); - } catch (IOException e) { - LOG.error(String.format("Failed to finish batch-%s", checkpointId), e); - throw new RuntimeException(e.getMessage()); - } - return null; - } - - @Override - public Object commit(long batchId, Object state) { - // backup kv stores to remote state backend - return kvStoreManager.backup(batchId); - } - - @Override - public void rollBack(Object userState) { - restore(userState); - } - - @Override - public void ackCommit(long batchId, long timeStamp) { - // remove obsolete state in bolt local and remote state backend - kvStoreManager.remove(batchId); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - sourceSpout.declareOutputFields(declarer); - } - - @Override - public Map getComponentConfiguration() { - return sourceSpout.getComponentConfiguration(); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - try { - sourceSpout.open(conf, context, collector); - String storeName = String.format("task-%s", context.getThisTaskId()); - String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName); - kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor( - context, storeName, storePath, true); - - reader = sourceSpout.getUnboundedSourceReader(); - } catch (IOException e) { - LOG.error("Failed to open transactional unbounded source spout", e); - throw new RuntimeException(e.getMessage()); - } - } - - @Override - public void close() { - sourceSpout.close(); - } - - @Override - public void activate() { - sourceSpout.activate(); - } - - @Override - public void deactivate() { - sourceSpout.deactivate(); - } - - @Override - public void nextTuple() { - sourceSpout.nextTuple(); - } - - @Override - public void ack(Object msgId) { - throw new UnsupportedOperationException(); - } - - @Override - public void fail(Object msgId) { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java deleted file mode 100644 index 690824d..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkNotNull; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.tuple.Values; -import com.alibaba.jstorm.utils.KryoSerializer; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.beam.runners.jstorm.JStormPipelineOptions; -import org.apache.beam.runners.jstorm.translation.util.CommonInstance; -import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Spout implementation that wraps a Beam UnboundedSource. - * TODO: add wrapper to support metrics in UnboundedSource. - */ -public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout { - private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class); - - private final String description; - private final UnboundedSource source; - private final SerializedPipelineOptions serializedOptions; - private final TupleTag outputTag; - - private transient JStormPipelineOptions pipelineOptions; - private transient UnboundedSource.UnboundedReader reader; - private transient SpoutOutputCollector collector; - - private volatile boolean hasNextRecord; - private AtomicBoolean activated = new AtomicBoolean(); - - private KryoSerializer serializer; - - private long lastWaterMark = 0L; - - public UnboundedSourceSpout( - String description, - UnboundedSource source, - JStormPipelineOptions options, - TupleTag outputTag) { - this.description = checkNotNull(description, "description"); - this.source = checkNotNull(source, "source"); - this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options")); - this.outputTag = checkNotNull(outputTag, "outputTag"); - } - - @Override - public synchronized void close() { - try { - activated.set(false); - this.reader.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void activate() { - activated.set(true); - - } - - @Override - public void deactivate() { - activated.set(false); - } - - @Override - public void ack(Object msgId) { - throw new UnsupportedOperationException(); - } - - @Override - public void fail(Object msgId) { - throw new UnsupportedOperationException(); - } - - @Override - public Map getComponentConfiguration() { - return null; - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - try { - this.collector = collector; - this.pipelineOptions = - this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class); - - createSourceReader(null); - - this.serializer = new KryoSerializer<>(conf); - } catch (IOException e) { - throw new RuntimeException("Unable to create unbounded reader.", e); - } - } - - public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException { - if (reader != null) { - reader.close(); - } - reader = this.source.createReader(this.pipelineOptions, checkpointMark); - hasNextRecord = this.reader.start(); - } - - @Override - public synchronized void nextTuple() { - if (!activated.get()) { - return; - } - try { - if (!hasNextRecord) { - hasNextRecord = reader.advance(); - } - - while (hasNextRecord && activated.get()) { - Object value = reader.getCurrent(); - Instant timestamp = reader.getCurrentTimestamp(); - - WindowedValue wv = - WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); - LOG.debug("Source output: " + wv.getValue()); - if (keyedEmit(outputTag.getId())) { - KV kv = (KV) wv.getValue(); - // Convert WindowedValue to > - byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue())); - collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue)); - } else { - byte[] immutableValue = serializer.serialize(wv); - collector.emit(outputTag.getId(), new Values(immutableValue)); - } - - // move to next record - hasNextRecord = reader.advance(); - } - - Instant waterMark = reader.getWatermark(); - if (waterMark != null && lastWaterMark < waterMark.getMillis()) { - lastWaterMark = waterMark.getMillis(); - collector.flush(); - collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis())); - LOG.debug("Source output: WM-{}", waterMark.toDateTime()); - } - } catch (IOException e) { - throw new RuntimeException("Exception reading values from source.", e); - } - } - - public UnboundedSource getUnboundedSource() { - return source; - } - - public UnboundedSource.UnboundedReader getUnboundedSourceReader() { - return reader; - } - - @Override - public String toString() { - return description; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java deleted file mode 100644 index 4320967..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * JStorm {@link Executor} for {@link View}. - */ -public class ViewExecutor implements Executor { - - private final String description; - private final TupleTag outputTag; - private ExecutorsBolt executorsBolt; - - public ViewExecutor(String description, TupleTag outputTag) { - this.description = description; - this.outputTag = outputTag; - } - - @Override - public void init(ExecutorContext context) { - this.executorsBolt = context.getExecutorsBolt(); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - executorsBolt.processExecutorElem(outputTag, elem); - } - - @Override - public void cleanup() { - } - - @Override - public String toString() { - return description; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java deleted file mode 100644 index 3cd0aa9..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. - * @param - * @param - */ -public class WindowAssignExecutor implements Executor { - private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class); - - private final String description; - private WindowFn windowFn; - private ExecutorsBolt executorsBolt; - private TupleTag outputTag; - - class JStormAssignContext - extends WindowFn.AssignContext { - private final WindowedValue value; - - JStormAssignContext(WindowFn fn, WindowedValue value) { - fn.super(); - checkArgument( - Iterables.size(value.getWindows()) == 1, - String.format( - "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), - Iterables.size(value.getWindows()), - value.getWindows())); - this.value = value; - } - - @Override - public InputT element() { - return value.getValue(); - } - - @Override - public Instant timestamp() { - return value.getTimestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(value.getWindows()); - } - } - - public WindowAssignExecutor(String description, WindowFn windowFn, TupleTag outputTag) { - this.description = description; - this.windowFn = windowFn; - this.outputTag = outputTag; - } - - @Override - public void init(ExecutorContext context) { - this.executorsBolt = context.getExecutorsBolt(); - } - - @Override - public void process(TupleTag tag, WindowedValue elem) { - Collection windows = null; - try { - windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem)); - for (W window : windows) { - executorsBolt.processExecutorElem( - outputTag, - WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane())); - } - } catch (Exception e) { - LOG.warn("Failed to assign windows for elem=" + elem, e); - } - } - - @Override - public void cleanup() { - } - - - @Override - public String toString() { - return description; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java deleted file mode 100644 index df54383..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.alibaba.jstorm.cache.ComposedKey; -import com.alibaba.jstorm.cache.IKvStore; -import com.alibaba.jstorm.cache.KvStoreIterable; -import java.io.IOException; -import java.util.Iterator; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.ReadableState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of {@link BagState} in JStorm runner. - */ -class JStormBagState implements BagState { - private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class); - - @Nullable - private final K key; - private final StateNamespace namespace; - private final IKvStore kvState; - private final IKvStore stateInfoKvState; - private int elemIndex; - - public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore kvState, - IKvStore stateInfoKvState) throws IOException { - this.key = key; - this.namespace = checkNotNull(namespace, "namespace"); - this.kvState = checkNotNull(kvState, "kvState"); - this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState"); - - Integer index = (Integer) stateInfoKvState.get(getComposedKey()); - this.elemIndex = index != null ? ++index : 0; - } - - @Override - public void add(T input) { - try { - kvState.put(getComposedKey(elemIndex), input); - stateInfoKvState.put(getComposedKey(), elemIndex); - elemIndex++; - } catch (IOException e) { - throw new RuntimeException(e.getCause()); - } - } - - @Override - public ReadableState isEmpty() { - return new ReadableState() { - @Override - public Boolean read() { - return elemIndex <= 0; - } - - @Override - public ReadableState readLater() { - // TODO: support prefetch. - return this; - } - }; - } - - @Override - public Iterable read() { - return new BagStateIterable(elemIndex); - } - - @Override - public BagState readLater() { - // TODO: support prefetch. - return this; - } - - @Override - public void clear() { - try { - for (int i = 0; i < elemIndex; i++) { - kvState.remove(getComposedKey(i)); - } - stateInfoKvState.remove(getComposedKey()); - elemIndex = 0; - } catch (IOException e) { - throw new RuntimeException(e.getCause()); - } - } - - private ComposedKey getComposedKey() { - return ComposedKey.of(key, namespace); - } - - private ComposedKey getComposedKey(int elemIndex) { - return ComposedKey.of(key, namespace, elemIndex); - } - - /** - * Implementation of Bag state Iterable. - */ - private class BagStateIterable implements KvStoreIterable { - - private class BagStateIterator implements Iterator { - private final int size; - private int cursor = 0; - - BagStateIterator() { - Integer s = null; - try { - s = (Integer) stateInfoKvState.get(getComposedKey()); - } catch (IOException e) { - LOG.error("Failed to get elemIndex for key={}", getComposedKey()); - } - this.size = s != null ? ++s : 0; - } - - @Override - public boolean hasNext() { - return cursor < size; - } - - @Override - public T next() { - if (cursor >= size) { - throw new NoSuchElementException(); - } - - T value = null; - try { - value = kvState.get(getComposedKey(cursor)); - } catch (IOException e) { - LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor)); - } - cursor++; - return value; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - private final int size; - - BagStateIterable(int size) { - this.size = size; - } - - @Override - public Iterator iterator() { - return new BagStateIterator(); - } - - @Override - public String toString() { - return String.format("BagStateIterable: composedKey=%s", getComposedKey()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java deleted file mode 100644 index 7c6a239..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import static com.google.common.base.Preconditions.checkNotNull; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.ReadableState; -import org.apache.beam.sdk.transforms.Combine; - -/** - * JStorm implementation of {@link CombiningState}. - */ -public class JStormCombiningState - implements CombiningState { - - @Nullable - private final BagState accumBagState; - private final Combine.CombineFn combineFn; - - JStormCombiningState( - BagState accumBagState, - Combine.CombineFn combineFn) { - this.accumBagState = checkNotNull(accumBagState, "accumBagState"); - this.combineFn = checkNotNull(combineFn, "combineFn"); - } - - @Override - public AccumT getAccum() { - // TODO: replacing the accumBagState with the merged accum. - return combineFn.mergeAccumulators(accumBagState.read()); - } - - @Override - public void addAccum(AccumT accumT) { - accumBagState.add(accumT); - } - - @Override - public AccumT mergeAccumulators(Iterable iterable) { - return combineFn.mergeAccumulators(iterable); - } - - @Override - public void add(InputT input) { - accumBagState.add( - combineFn.addInput(combineFn.createAccumulator(), input)); - } - - @Override - public ReadableState isEmpty() { - return accumBagState.isEmpty(); - } - - @Override - public OutputT read() { - return combineFn.extractOutput( - combineFn.mergeAccumulators(accumBagState.read())); - } - - @Override - public CombiningState readLater() { - // TODO: support prefetch. - return this; - } - - @Override - public void clear() { - accumBagState.clear(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java deleted file mode 100644 index ac3f91f..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import com.alibaba.jstorm.cache.IKvStore; -import java.io.IOException; -import java.util.Map; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.ReadableState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implementation of {@link MapState} in JStorm runner. - * @param - * @param - */ -public class JStormMapState implements MapState { - private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class); - - private final K key; - private final StateNamespace namespace; - private IKvStore kvStore; - - public JStormMapState(K key, StateNamespace namespace, IKvStore kvStore) { - this.key = key; - this.namespace = namespace; - this.kvStore = kvStore; - } - - @Override - public void put(K var1, V var2) { - try { - kvStore.put(var1, var2); - } catch (IOException e) { - reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e); - } - } - - @Override - public ReadableState putIfAbsent(K var1, V var2) { - ReadableState ret = null; - try { - V value = kvStore.get(var1); - if (value == null) { - kvStore.put(var1, var2); - ret = new MapReadableState<>(null); - } else { - ret = new MapReadableState<>(value); - } - } catch (IOException e) { - reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e); - } - return ret; - } - - @Override - public void remove(K var1) { - try { - kvStore.remove(var1); - } catch (IOException e) { - reportError(String.format("Failed to remove key=%s", var1), e); - } - } - - @Override - public ReadableState get(K var1) { - ReadableState ret = new MapReadableState<>(null); - try { - ret = new MapReadableState(kvStore.get(var1)); - } catch (IOException e) { - reportError(String.format("Failed to get value for key=%s", var1), e); - } - return ret; - } - - @Override - public ReadableState> keys() { - ReadableState> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState<>(kvStore.keys()); - } catch (IOException e) { - reportError(String.format("Failed to get keys"), e); - } - return ret; - } - - @Override - public ReadableState> values() { - ReadableState> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState<>(kvStore.values()); - } catch (IOException e) { - reportError(String.format("Failed to get values"), e); - } - return ret; - } - - @Override - public ReadableState>> entries() { - ReadableState>> ret = new MapReadableState<>(null); - try { - ret = new MapReadableState<>(kvStore.entries()); - } catch (IOException e) { - reportError(String.format("Failed to get values"), e); - } - return ret; - } - - @Override - public void clear() { - try { - Iterable keys = kvStore.keys(); - kvStore.removeBatch(keys); - } catch (IOException e) { - reportError(String.format("Failed to clear map state"), e); - } - } - - private void reportError(String errorInfo, IOException e) { - LOG.error(errorInfo, e); - throw new RuntimeException(errorInfo); - } - - private class MapReadableState implements ReadableState { - private T value; - - public MapReadableState(T value) { - this.value = value; - } - - @Override - public T read() { - return value; - } - - @Override - public ReadableState readLater() { - return this; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java deleted file mode 100644 index 80ef3a2..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.alibaba.jstorm.cache.ComposedKey; -import com.alibaba.jstorm.cache.IKvStoreManager; -import java.io.IOException; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateTag; -import org.apache.beam.runners.jstorm.translation.runtime.TimerService; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.MapState; -import org.apache.beam.sdk.state.SetState; -import org.apache.beam.sdk.state.State; -import org.apache.beam.sdk.state.StateBinder; -import org.apache.beam.sdk.state.StateContext; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.joda.time.Instant; - -/** - * JStorm implementation of {@link StateInternals}. - */ -public class JStormStateInternals implements StateInternals { - - private static final String STATE_INFO = "state-info:"; - - @Nullable - private final K key; - private final IKvStoreManager kvStoreManager; - private final TimerService timerService; - private final int executorId; - - public JStormStateInternals(K key, IKvStoreManager kvStoreManager, - TimerService timerService, int executorId) { - this.key = key; - this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager"); - this.timerService = checkNotNull(timerService, "timerService"); - this.executorId = executorId; - } - - @Nullable - @Override - public K getKey() { - return key; - } - - @Override - public T state( - StateNamespace namespace, StateTag address, StateContext c) { - // throw new UnsupportedOperationException("StateContext is not supported."); - /** - * TODOļ¼š - * Same implementation as state() which is without StateContext. This might be updated after - * we figure out if we really need StateContext for JStorm state internals. - */ - return state(namespace, address); - } - - @Override - public T state(final StateNamespace namespace, StateTag address) { - return address.getSpec().bind(address.getId(), new StateBinder() { - @Override - public ValueState bindValue(String id, StateSpec> spec, Coder coder) { - try { - return new JStormValueState<>( - getKey(), namespace, kvStoreManager.getOrCreate(getStoreId(id))); - } catch (IOException e) { - throw new RuntimeException(); - } - } - - @Override - public BagState bindBag(String id, StateSpec> spec, Coder elemCoder) { - try { - return new JStormBagState( - getKey(), namespace, kvStoreManager.getOrCreate(getStoreId(id)), - kvStoreManager.getOrCreate(STATE_INFO + getStoreId(id))); - } catch (IOException e) { - throw new RuntimeException(); - } - } - - @Override - public SetState bindSet(String id, StateSpec> spec, Coder elemCoder) { - throw new UnsupportedOperationException(); - } - - @Override - public MapState bindMap( - String id, - StateSpec> spec, - Coder mapKeyCoder, - Coder mapValueCoder) { - try { - return new JStormMapState<>( - getKey(), namespace, kvStoreManager.getOrCreate(getStoreId(id))); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public CombiningState bindCombining( - String id, - StateSpec> spec, - Coder accumCoder, - Combine.CombineFn combineFn) { - try { - BagState accumBagState = new JStormBagState( - getKey(), namespace, - kvStoreManager.getOrCreate(getStoreId(id)), - kvStoreManager.getOrCreate(STATE_INFO + getStoreId(id))); - return new JStormCombiningState<>(accumBagState, combineFn); - } catch (IOException e) { - throw new RuntimeException(); - } - } - - - @Override - public CombiningState - bindCombiningWithContext( - String id, - StateSpec> stateSpec, Coder coder, - CombineWithContext.CombineFnWithContext combineFnWithContext) { - throw new UnsupportedOperationException(); - } - - @Override - public WatermarkHoldState bindWatermark( - String id, - StateSpec spec, - final TimestampCombiner timestampCombiner) { - try { - BagState> accumBagState = new JStormBagState( - getKey(), namespace, - kvStoreManager.>getOrCreate(getStoreId(id)), - kvStoreManager.getOrCreate(STATE_INFO + getStoreId(id))); - - Combine.CombineFn, Instant> outputTimeCombineFn = - new BinaryCombineFn() { - @Override - public Instant apply(Instant left, Instant right) { - return timestampCombiner.combine(left, right); - } - }; - return new JStormWatermarkHoldState( - namespace, - new JStormCombiningState<>( - accumBagState, - outputTimeCombineFn), - timestampCombiner, - timerService); - } catch (IOException e) { - throw new RuntimeException(); - } - } - }); - } - - private String getStoreId(String stateId) { - return String.format("%s-%s", stateId, executorId); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java deleted file mode 100644 index 79ff6b4..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import com.alibaba.jstorm.cache.ComposedKey; -import com.alibaba.jstorm.cache.IKvStore; -import java.io.IOException; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.sdk.state.ValueState; - -/** - * JStorm implementation of {@link ValueState}. - */ -public class JStormValueState implements ValueState { - - @Nullable - private final K key; - private final StateNamespace namespace; - private final IKvStore kvState; - - JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore kvState) { - this.key = key; - this.namespace = namespace; - this.kvState = kvState; - } - - @Override - public void write(T t) { - try { - kvState.put(getComposedKey(), t); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t)); - } - } - - @Override - public T read() { - try { - return kvState.get(getComposedKey()); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to read key: %s, namespace: %s.", key, namespace)); - } - } - - @Override - public ValueState readLater() { - // TODO: support prefetch. - return this; - } - - @Override - public void clear() { - try { - kvState.remove(getComposedKey()); - } catch (IOException e) { - throw new RuntimeException(String.format( - "Failed to clear key: %s, namespace: %s.", key, namespace)); - } - } - - private ComposedKey getComposedKey() { - return ComposedKey.of(key, namespace); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java deleted file mode 100644 index dc3ba43..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.state; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.jstorm.translation.runtime.TimerService; -import org.apache.beam.sdk.state.GroupingState; -import org.apache.beam.sdk.state.ReadableState; -import org.apache.beam.sdk.state.WatermarkHoldState; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.joda.time.Instant; - -/** - * JStorm implementation of {@link WatermarkHoldState}. - */ -public class JStormWatermarkHoldState implements WatermarkHoldState { - - private final StateNamespace namespace; - private final GroupingState watermarkHoldsState; - private final TimestampCombiner timestampCombiner; - private final TimerService timerService; - - JStormWatermarkHoldState( - StateNamespace namespace, - GroupingState watermarkHoldsState, - TimestampCombiner timestampCombiner, - TimerService timerService) { - this.namespace = checkNotNull(namespace, "namespace"); - this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState"); - this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner"); - this.timerService = checkNotNull(timerService, "timerService"); - } - - @Override - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; - } - - @Override - public void add(Instant instant) { - timerService.addWatermarkHold(namespace.stringKey(), instant); - watermarkHoldsState.add(instant); - } - - @Override - public ReadableState isEmpty() { - return watermarkHoldsState.isEmpty(); - } - - @Override - public Instant read() { - return watermarkHoldsState.read(); - } - - @Override - public WatermarkHoldState readLater() { - // TODO: support prefetch. - return this; - } - - @Override - public void clear() { - timerService.clearWatermarkHold(namespace.stringKey()); - watermarkHoldsState.clear(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java deleted file mode 100644 index 184a957..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.runtime.timer; - -import static com.google.common.base.Preconditions.checkNotNull; - -import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor; -import org.apache.beam.runners.jstorm.translation.runtime.TimerService; -import org.apache.beam.sdk.state.TimeDomain; -import org.joda.time.Instant; - -/** - * JStorm implementation of {@link TimerInternals}. - */ -public class JStormTimerInternals implements TimerInternals { - - private final K key; - private final DoFnExecutor doFnExecutor; - private final TimerService timerService; - - - public JStormTimerInternals( - @Nullable K key, DoFnExecutor doFnExecutor, TimerService timerService) { - this.key = key; - this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor"); - this.timerService = checkNotNull(timerService, "timerService"); - } - - @Override - public void setTimer( - StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - setTimer(TimerData.of(timerId, namespace, target, timeDomain)); - } - - @Override - @Deprecated - public void setTimer(TimerData timerData) { - timerService.setTimer(key, timerData, doFnExecutor); - } - - @Override - public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); - } - - @Override - @Deprecated - public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); - } - - @Override - @Deprecated - public void deleteTimer(TimerData timerData) { - throw new UnsupportedOperationException( - "Canceling of a timer is not yet supported."); - } - - @Override - public Instant currentProcessingTime() { - return Instant.now(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return null; - } - - @Override - public Instant currentInputWatermarkTime() { - return new Instant(timerService.currentInputWatermark()); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return new Instant(timerService.currentOutputWatermark()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java deleted file mode 100644 index 7e7a54a..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Translates a {@link Read.Bounded} into a Storm spout. - * - * @param - */ -public class BoundedSourceTranslator extends TransformTranslator.Default> { - - @Override - public void translateNode(Read.Bounded transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = - describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - - TupleTag outputTag = userGraphContext.getOutputTag(); - PValue outputValue = userGraphContext.getOutput(); - UnboundedSourceSpout spout = new UnboundedSourceSpout( - description, - new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()), - userGraphContext.getOptions(), outputTag); - - context.getExecutionGraphContext().registerSpout( - spout, TaggedPValue.of(outputTag, outputValue)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java deleted file mode 100644 index 44ce8d8..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import com.google.common.collect.Maps; -import java.util.Map; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}. - * @param - */ -public class FlattenTranslator extends TransformTranslator.Default> { - - @Override - public void translateNode(Flatten.PCollections transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - - // Since a new tag is created in PCollectionList, retrieve the real tag here. - Map, PValue> inputs = Maps.newHashMap(); - for (Map.Entry, PValue> entry : userGraphContext.getInputs().entrySet()) { - PCollection pc = (PCollection) entry.getValue(); - inputs.putAll(pc.expand()); - } - System.out.println("Real inputs: " + inputs); - System.out.println("FlattenList inputs: " + userGraphContext.getInputs()); - String description = describeTransform(transform, inputs, userGraphContext.getOutputs()); - FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag()); - context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java deleted file mode 100644 index 85cb85d..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.List; -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}. - * @param - * @param - */ -public class GroupByKeyTranslator extends TransformTranslator.Default> { - // information of transform - protected PCollection> input; - protected PCollection>> output; - protected List> inputTags; - protected TupleTag>> mainOutputTag; - protected List> sideOutputTags; - protected List> sideInputs; - protected WindowingStrategy windowingStrategy; - - @Override - public void translateNode(GroupByKey transform, TranslationContext context) { - TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - String description = - describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs()); - - input = (PCollection>) userGraphContext.getInput(); - output = (PCollection>>) userGraphContext.getOutput(); - - inputTags = userGraphContext.getInputTags(); - mainOutputTag = (TupleTag>>) userGraphContext.getOutputTag(); - sideOutputTags = Lists.newArrayList(); - - sideInputs = Collections.>emptyList(); - windowingStrategy = input.getWindowingStrategy(); - - GroupByWindowExecutor groupByWindowExecutor = new GroupByWindowExecutor<>( - userGraphContext.getStepName(), - description, - context, - context.getUserGraphContext().getOptions(), - windowingStrategy, - mainOutputTag, - sideOutputTags); - context.addTransformExecutor(groupByWindowExecutor); - } -}