apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ilooner <...@git.apache.org>
Subject [GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...
Date Wed, 20 Jul 2016 06:44:10 GMT
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71473044
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
---
    @@ -0,0 +1,198 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K,
V>
    +{
    +  @NotNull
    +  private SpillableStateStore store;
    +  private SpillableComplexComponentImpl sccImpl;
    +  private long bucket;
    +  @NotNull
    +  private String identifier;
    +  @NotNull
    +  private Serde<Window, Slice> windowSerde;
    +  @NotNull
    +  private Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde;
    +  @NotNull
    +  private Serde<K, Slice> keySerde;
    +  @NotNull
    +  private Serde<V, Slice> valueSerde;
    +
    +  protected Spillable.SpillableByteMap<ImmutablePair<Window, K>, V> internValues;
    +  protected Spillable.SpillableByteArrayListMultimap<Window, K> internKeys;
    +
    +  public SpillableWindowedKeyedStorage()
    +  {
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, String identifier,
    +      Serde<Window, Slice> windowSerde, Serde<ImmutablePair<Window, K>,
Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
    +  {
    +    this.bucket = bucket;
    +    this.identifier = identifier;
    +    this.windowSerde = windowSerde;
    +    this.windowKeyPairSerde = windowKeyPairSerde;
    +    this.keySerde = keySerde;
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  public void setStore(SpillableStateStore store)
    +  {
    +    this.store = store;
    +  }
    +
    +  public void setBucket(long bucket)
    +  {
    +    this.bucket = bucket;
    +  }
    +
    +  public void setIdentifier(String identifier)
    +  {
    +    this.identifier = identifier;
    +  }
    +
    +  public void setWindowSerde(Serde<Window, Slice> windowSerde)
    +  {
    +    this.windowSerde = windowSerde;
    +  }
    +
    +  public void setWindowKeyPairSerde(Serde<ImmutablePair<Window, K>, Slice>
windowKeyPairSerde)
    +  {
    +    this.windowKeyPairSerde = windowKeyPairSerde;
    +  }
    +
    +  public void setValueSerde(Serde<V, Slice> valueSerde)
    +  {
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store == null) {
    +      // provide a default managed state store
    +      store = new ManagedStateSpillableStateStore();
    +    }
    +    sccImpl = new SpillableComplexComponentImpl(store);
    +    internValues = sccImpl.newSpillableByteMap((identifier + "#values").getBytes(), bucket,
windowKeyPairSerde, valueSerde);
    --- End diff --
    
    There is a constraint on the identifiers right now that they have to be the same number
of bytes in length. Also you want the identifiers to be small since all the keys stored in
managed state will be prefixed with it. For this case you can not specify an identifier. If
you don't specify an identifier a single byte identifier will automatically generated for
each data structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message