apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tweise <...@git.apache.org>
Subject [GitHub] apex-malhar pull request #319: REVIEW ONLY: Operator supporting the Beam con...
Date Sat, 18 Jun 2016 03:47:33 GMT
Github user tweise commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/319#discussion_r67594809
  
    --- Diff: stream/src/main/java/org/apache/apex/malhar/stream/window/impl/WindowedOperatorImpl.java
---
    @@ -0,0 +1,436 @@
    +package org.apache.apex.malhar.stream.window.impl;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeMap;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.common.util.BaseOperator;
    +import com.datatorrent.stram.engine.WindowGenerator;
    +import com.datatorrent.stram.plan.logical.LogicalPlan;
    +import org.apache.apex.malhar.stream.api.function.Function;
    +import org.apache.apex.malhar.stream.window.Accumulation;
    +import org.apache.apex.malhar.stream.window.SessionWindowedStorage;
    +import org.apache.apex.malhar.stream.window.WindowedStorage;
    +import org.apache.apex.malhar.stream.window.TriggerOption;
    +import org.apache.apex.malhar.stream.window.Watermark;
    +import org.apache.apex.malhar.stream.window.Window;
    +import org.apache.apex.malhar.stream.window.WindowOption;
    +import org.apache.apex.malhar.stream.window.WindowState;
    +import org.apache.apex.malhar.stream.window.WindowedOperator;
    +import org.joda.time.Duration;
    +
    +
    +/**
    + * Created by david on 6/13/16.
    + */
    +public class WindowedOperatorImpl<InputT, KeyT, AccumT, OutputT>
    +    extends BaseOperator implements WindowedOperator<InputT, KeyT, AccumT, OutputT>
    +{
    +  // TODO: Need further discussion on the type parameters. InputT and OuputT may be a
watermark, a KV pair, a WindowedValue, or a plain data object
    +
    +  private WindowOption windowOption;
    +  private Accumulation<InputT, AccumT, OutputT> accumulation;
    +  private WindowedStorage<KeyT, AccumT> dataStorage;
    +  private WindowedStorage<KeyT, AccumT> retractionStorage;
    +
    +  private TreeMap<Window, WindowState> windowStateMap = new TreeMap<>();
    +  // TODO: Make this window state storage a pluggable interface
    +
    +  private Function.MapFunction<InputT, Long> timestampExtractor;
    +  private Function.MapFunction<InputT, KeyT> keyExtractor;
    +  private long currentWatermark;
    +  private boolean triggerAtWatermark;
    +  private long earlyTriggerCount;
    +  private long earlyTriggerMillis;
    +  private long lateTriggerCount;
    +  private long lateTriggerMillis;
    +  private long currentApexWindowId = -1;
    +  private long currentDerivedTimestamp;
    +  private long firstWindowMillis;
    +  private long windowWidthMillis;
    +
    +  public transient DefaultInputPort<InputT> input = new DefaultInputPort<InputT>()
    +  {
    +    @Override
    +    public void process(InputT tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  // TODO: multiple input ports for join operations
    +
    +  public transient DefaultOutputPort<WindowedValue<OutputT>> output = new
DefaultOutputPort<>();
    +
    +  protected void processTuple(InputT tuple)
    +  {
    +    if (tuple instanceof Watermark) {
    +      processWatermark((Watermark)tuple);
    +    } else {
    +      long timestamp = timestampExtractor.f(tuple);
    +      if (isTooLate(timestamp)) {
    +        dropTuple(tuple);
    +      } else {
    +        WindowedValue<InputT> windowedValue = getWindowedValue(tuple);
    +        // do the accumulation
    +        accumulateTuple(windowedValue);
    +
    +        for (Window window : windowedValue.windows) {
    +          WindowState windowState = windowStateMap.get(window);
    +          windowState.tupleCount++;
    +          // process any count based triggers
    +          if (windowState.watermarkArrivalTime == -1) {
    +            // watermark has not arrived yet
    +            if (earlyTriggerCount > 0 && (windowState.tupleCount % earlyTriggerCount)
== 0) {
    +              fireTrigger(window, windowState);
    +            }
    +          } else {
    +            if (lateTriggerCount > 0 && (windowState.tupleCount % lateTriggerCount)
== 0) {
    +              fireTrigger(window, windowState);
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void setWindowOption(WindowOption windowOption)
    +  {
    +    this.windowOption = windowOption;
    +    TriggerOption triggerOption = this.windowOption.getTriggerOption();
    +    for (TriggerOption.Trigger trigger : triggerOption.getTriggerList()) {
    +      switch (trigger.getWatermarkOpt()) {
    +        case ON_TIME:
    +          triggerAtWatermark = true;
    +          break;
    +        case EARLY:
    +          if (trigger instanceof TriggerOption.TimeTrigger) {
    +            earlyTriggerMillis = ((TriggerOption.TimeTrigger) trigger).getDuration().getMillis();
    +          } else if (trigger instanceof TriggerOption.CountTrigger) {
    +            earlyTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount();
    +          }
    +          break;
    +        case LATE:
    +          if (trigger instanceof TriggerOption.TimeTrigger) {
    +            lateTriggerMillis = ((TriggerOption.TimeTrigger) trigger).getDuration().getMillis();
    +          } else if (trigger instanceof TriggerOption.CountTrigger) {
    +            lateTriggerCount = ((TriggerOption.CountTrigger)trigger).getCount();
    +          }
    +          break;
    +      }
    +    }
    +  }
    +
    +  @Override
    +  public void setAccumulation(Accumulation<InputT, AccumT, OutputT> accumulation)
    +  {
    +    this.accumulation = accumulation;
    +  }
    +
    +  @Override
    +  public void setDataStorage(WindowedStorage<KeyT, AccumT> storageAgent)
    +  {
    +    this.dataStorage = storageAgent;
    +  }
    +
    +  @Override
    +  public void setRetractionStorage(WindowedStorage<KeyT, AccumT> storageAgent)
    +  {
    +    this.retractionStorage = storageAgent;
    +  }
    +
    +  @Override
    +  public void setTimestampExtractor(Function.MapFunction<InputT, Long> timestampExtractor)
    +  {
    +    this.timestampExtractor = timestampExtractor;
    +  }
    +
    +  @Override
    +  public void setKeyExtractor(Function.MapFunction<InputT, KeyT> keyExtractor)
    +  {
    +    this.keyExtractor = keyExtractor;
    +  }
    +
    +  @Override
    +  public WindowedValue<InputT> getWindowedValue(InputT input)
    +  {
    +    WindowedValue<InputT> windowedValue = new WindowedValue<>();
    +    windowedValue.timestamp = timestampExtractor.f(input);
    +    assignWindows(windowedValue.windows, input);
    +    return windowedValue;
    +  }
    +
    +  private void assignWindows(List<Window> windows, InputT input)
    +  {
    +    if (windowOption instanceof WindowOption.GlobalWindow) {
    +
    +      windows.add(Window.GLOBAL_WINDOW);
    +
    +    } else {
    +
    +      long timestamp = timestampExtractor.f(input);
    +      if (windowOption instanceof WindowOption.TimeWindows) {
    +
    +        for (Window.TimeWindow window : getTimeWindowsFromTimestamp(timestamp)) {
    +          if (!windowStateMap.containsKey(window)) {
    +            windowStateMap.put(window, new WindowState());
    +          }
    +          windows.add(window);
    +        }
    +
    +      } else if (windowOption instanceof WindowOption.SessionWindows) {
    +
    +        WindowOption.SessionWindows sessionWindowOption = (WindowOption.SessionWindows)windowOption;
    +        SessionWindowedStorage<KeyT, AccumT> sessionStorage = (SessionWindowedStorage<KeyT,
AccumT>)dataStorage;
    +        KeyT key = keyExtractor.f(input);
    +        Collection<Map.Entry<Window.SessionWindow, AccumT>> sessionEntries
= sessionStorage.getSessionEntries(key, timestamp, sessionWindowOption.getMinGap().getMillis());
    +        switch (sessionEntries.size()) {
    +          case 0: {
    +            // There are no existing windows within the minimum gap. Create a new session
window
    +            Window.SessionWindow<KeyT> sessionWindow = new Window.SessionWindow<>(key,
timestamp, 1);
    +            windowStateMap.put(sessionWindow, new WindowState());
    +            windows.add(sessionWindow);
    +            break;
    +          }
    +          case 1: {
    +            Map.Entry<Window.SessionWindow, AccumT> sessionWindowEntry = sessionEntries.iterator().next();
    +            Window.SessionWindow<KeyT> sessionWindow = sessionWindowEntry.getKey();
    +            if (sessionWindow.getBeginTimestamp() <= timestamp && timestamp
< sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis()) {
    +              // The session window already covers the event
    +              windows.add(sessionWindow);
    +            } else {
    +              // The session window does not cover the event but is within the min gap
    +              if (windowOption.getAccumulationMode() == WindowOption.AccumulationMode.ACCUMULATING_AND_RETRACTING)
{
    +                // fire a retraction trigger because the session window will be enlarged
    +                fireRetractionTrigger(sessionWindow);
    +              }
    +              // create a new session window that covers the timestamp
    +              long newBeginTimestamp = Math.min(sessionWindow.getBeginTimestamp(), timestamp);
    +              long newEndTimestamp = Math.max(sessionWindow.getBeginTimestamp() + sessionWindow.getDurationMillis(),
timestamp + 1);
    +              Window.SessionWindow<KeyT> newSessionWindow =
    +                  new Window.SessionWindow<>(key, newBeginTimestamp, newEndTimestamp
- newBeginTimestamp);
    +              windowStateMap.remove(sessionWindow);
    +              sessionStorage.migrateWindow(sessionWindow, newSessionWindow);
    +              windowStateMap.put(newSessionWindow, new WindowState());
    +            }
    +            break;
    +          }
    +          case 2: {
    +            // merge the two windows
    +            Map.Entry<Window.SessionWindow, AccumT> sessionWindowEntry1 = sessionEntries.iterator().next();
    +            Map.Entry<Window.SessionWindow, AccumT> sessionWindowEntry2 = sessionEntries.iterator().next();
    +            Window.SessionWindow<KeyT> sessionWindow1 = sessionWindowEntry1.getKey();
    +            Window.SessionWindow<KeyT> sessionWindow2 = sessionWindowEntry2.getKey();
    +            AccumT sessionData1 = sessionWindowEntry1.getValue();
    +            AccumT sessionData2 = sessionWindowEntry1.getValue();
    +            if (windowOption.getAccumulationMode() == WindowOption.AccumulationMode.ACCUMULATING_AND_RETRACTING)
{
    +              // fire a retraction trigger because the two session windows will be merged
to a new window
    +              fireRetractionTrigger(sessionWindow1);
    +              fireRetractionTrigger(sessionWindow2);
    +            }
    +            long newBeginTimestamp = Math.min(sessionWindow1.getBeginTimestamp(), sessionWindow2.getBeginTimestamp());
    +            long newEndTimestamp = Math.max(sessionWindow1.getBeginTimestamp() + sessionWindow1.getDurationMillis(),
    +                sessionWindow2.getBeginTimestamp() + sessionWindow2.getDurationMillis());
    +
    +            Window.SessionWindow<KeyT> newSessionWindow = new Window.SessionWindow<>(key,
newBeginTimestamp, newEndTimestamp - newBeginTimestamp);
    +            AccumT newSessionData = accumulation.merge(sessionData1, sessionData2);
    +            sessionStorage.remove(sessionWindow1);
    +            sessionStorage.remove(sessionWindow2);
    +            sessionStorage.put(newSessionWindow, key, newSessionData);
    +            break;
    +          }
    +          default:
    +            throw new IllegalStateException("There are more than two sessions matching
one timestamp");
    +        }
    +      }
    +    }
    +  }
    +
    +  private List<Window.TimeWindow> getTimeWindowsFromTimestamp(long timestamp)
    --- End diff --
    
    ForTimestamp? Even though it's private, some javadoc wouldn't hurt here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message