beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [09/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:47:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
deleted file mode 100644
index 9e7c16e..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.JsonIgnorePredicate;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.Registration;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.common.ReflectHelpers;
-import com.google.common.base.Defaults;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ClassToInstanceMap;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.MutableClassToInstanceMap;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-import java.beans.PropertyDescriptor;
-import java.io.IOException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * Represents and {@link InvocationHandler} for a {@link Proxy}. The invocation handler uses
bean
- * introspection of the proxy class to store and retrieve values based off of the property
name.
- *
- * <p>Unset properties use the {@code @Default} metadata on the getter to return values.
If there
- * is no {@code @Default} annotation on the getter, then a <a
- * href="https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html">default</a>
as
- * per the Java Language Specification for the expected return type is returned.
- *
- * <p>In addition to the getter/setter pairs, this proxy invocation handler supports
- * {@link Object#equals(Object)}, {@link Object#hashCode()}, {@link Object#toString()} and
- * {@link PipelineOptions#as(Class)}.
- */
-@ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler {
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-  /**
-   * No two instances of this class are considered equivalent hence we generate a random
hash code
-   * between 0 and {@link Integer#MAX_VALUE}.
-   */
-  private final int hashCode = (int) (Math.random() * Integer.MAX_VALUE);
-  private final Set<Class<? extends PipelineOptions>> knownInterfaces;
-  private final ClassToInstanceMap<PipelineOptions> interfaceToProxyCache;
-  private final Map<String, Object> options;
-  private final Map<String, JsonNode> jsonOptions;
-  private final Map<String, String> gettersToPropertyNames;
-  private final Map<String, String> settersToPropertyNames;
-
-  ProxyInvocationHandler(Map<String, Object> options) {
-    this(options, Maps.<String, JsonNode>newHashMap());
-  }
-
-  private ProxyInvocationHandler(Map<String, Object> options, Map<String, JsonNode>
jsonOptions) {
-    this.options = options;
-    this.jsonOptions = jsonOptions;
-    this.knownInterfaces = new HashSet<>(PipelineOptionsFactory.getRegisteredOptions());
-    gettersToPropertyNames = Maps.newHashMap();
-    settersToPropertyNames = Maps.newHashMap();
-    interfaceToProxyCache = MutableClassToInstanceMap.create();
-  }
-
-  @Override
-  public Object invoke(Object proxy, Method method, Object[] args) {
-    if (args == null && "toString".equals(method.getName())) {
-      return toString();
-    } else if (args != null && args.length == 1 && "equals".equals(method.getName()))
{
-      return equals(args[0]);
-    } else if (args == null && "hashCode".equals(method.getName())) {
-      return hashCode();
-    } else if (args != null && "as".equals(method.getName()) && args[0] instanceof
Class) {
-      @SuppressWarnings("unchecked")
-      Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>)
args[0];
-      return as(clazz);
-    } else if (args != null && "cloneAs".equals(method.getName()) && args[0]
instanceof Class) {
-      @SuppressWarnings("unchecked")
-      Class<? extends PipelineOptions> clazz = (Class<? extends PipelineOptions>)
args[0];
-      return cloneAs(proxy, clazz);
-    }
-    String methodName = method.getName();
-    synchronized (this) {
-      if (gettersToPropertyNames.keySet().contains(methodName)) {
-        String propertyName = gettersToPropertyNames.get(methodName);
-        if (!options.containsKey(propertyName)) {
-          // Lazy bind the default to the method.
-          Object value = jsonOptions.containsKey(propertyName)
-              ? getValueFromJson(propertyName, method)
-              : getDefault((PipelineOptions) proxy, method);
-          options.put(propertyName, value);
-        }
-        return options.get(propertyName);
-      } else if (settersToPropertyNames.containsKey(methodName)) {
-        options.put(settersToPropertyNames.get(methodName), args[0]);
-        return Void.TYPE;
-      }
-    }
-    throw new RuntimeException("Unknown method [" + method + "] invoked with args ["
-        + Arrays.toString(args) + "].");
-  }
-
-  /**
-   * Backing implementation for {@link PipelineOptions#as(Class)}.
-   *
-   * @param iface The interface that the returned object needs to implement.
-   * @return An object that implements the interface <T>.
-   */
-  synchronized <T extends PipelineOptions> T as(Class<T> iface) {
-    Preconditions.checkNotNull(iface);
-    Preconditions.checkArgument(iface.isInterface());
-    if (!interfaceToProxyCache.containsKey(iface)) {
-      Registration<T> registration =
-          PipelineOptionsFactory.validateWellFormed(iface, knownInterfaces);
-      List<PropertyDescriptor> propertyDescriptors = registration.getPropertyDescriptors();
-      Class<T> proxyClass = registration.getProxyClass();
-      gettersToPropertyNames.putAll(generateGettersToPropertyNames(propertyDescriptors));
-      settersToPropertyNames.putAll(generateSettersToPropertyNames(propertyDescriptors));
-      knownInterfaces.add(iface);
-      interfaceToProxyCache.putInstance(iface,
-          InstanceBuilder.ofType(proxyClass)
-              .fromClass(proxyClass)
-              .withArg(InvocationHandler.class, this)
-              .build());
-    }
-    return interfaceToProxyCache.getInstance(iface);
-  }
-
-  /**
-   * Backing implementation for {@link PipelineOptions#cloneAs(Class)}.
-   *
-   * @return A copy of the PipelineOptions.
-   */
-  synchronized <T extends PipelineOptions> T cloneAs(Object proxy, Class<T> iface)
{
-    PipelineOptions clonedOptions;
-    try {
-      clonedOptions = MAPPER.readValue(MAPPER.writeValueAsBytes(proxy), PipelineOptions.class);
-    } catch (IOException e) {
-      throw new IllegalStateException("Failed to serialize the pipeline options to JSON.",
e);
-    }
-    for (Class<? extends PipelineOptions> knownIface : knownInterfaces) {
-      clonedOptions.as(knownIface);
-    }
-    return clonedOptions.as(iface);
-  }
-
-  /**
-   * Returns true if the other object is a ProxyInvocationHandler or is a Proxy object and
has the
-   * same ProxyInvocationHandler as this.
-   *
-   * @param obj The object to compare against this.
-   * @return true iff the other object is a ProxyInvocationHandler or is a Proxy object and
has the
-   *         same ProxyInvocationHandler as this.
-   */
-  @Override
-  public boolean equals(Object obj) {
-    return obj != null && ((obj instanceof ProxyInvocationHandler && this
== obj)
-        || (Proxy.isProxyClass(obj.getClass()) && this == Proxy.getInvocationHandler(obj)));
-  }
-
-  /**
-   * Each instance of this ProxyInvocationHandler is unique and has a random hash code.
-   *
-   * @return A hash code that was generated randomly.
-   */
-  @Override
-  public int hashCode() {
-    return hashCode;
-  }
-
-  /**
-   * This will output all the currently set values. This is a relatively costly function
-   * as it will call {@code toString()} on each object that has been set and format
-   * the results in a readable format.
-   *
-   * @return A pretty printed string representation of this.
-   */
-  @Override
-  public synchronized String toString() {
-    SortedMap<String, Object> sortedOptions = new TreeMap<>();
-    // Add the options that we received from deserialization
-    sortedOptions.putAll(jsonOptions);
-    // Override with any programmatically set options.
-    sortedOptions.putAll(options);
-
-    StringBuilder b = new StringBuilder();
-    b.append("Current Settings:\n");
-    for (Map.Entry<String, Object> entry : sortedOptions.entrySet()) {
-      b.append("  " + entry.getKey() + ": " + entry.getValue() + "\n");
-    }
-    return b.toString();
-  }
-
-  /**
-   * Uses a Jackson {@link ObjectMapper} to attempt type conversion.
-   *
-   * @param method The method whose return type you would like to return.
-   * @param propertyName The name of the property that is being returned.
-   * @return An object matching the return type of the method passed in.
-   */
-  private Object getValueFromJson(String propertyName, Method method) {
-    try {
-      JavaType type = MAPPER.getTypeFactory().constructType(method.getGenericReturnType());
-      JsonNode jsonNode = jsonOptions.get(propertyName);
-      return MAPPER.readValue(jsonNode.toString(), type);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to parse representation", e);
-    }
-  }
-
-  /**
-   * Returns a default value for the method based upon {@code @Default} metadata on the getter
-   * to return values. If there is no {@code @Default} annotation on the getter, then a <a
-   * href="https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html">default</a>
as
-   * per the Java Language Specification for the expected return type is returned.
-   *
-   * @param proxy The proxy object for which we are attempting to get the default.
-   * @param method The getter method that was invoked.
-   * @return The default value from an {@link Default} annotation if present, otherwise a
default
-   *         value as per the Java Language Specification.
-   */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  private Object getDefault(PipelineOptions proxy, Method method) {
-    for (Annotation annotation : method.getAnnotations()) {
-      if (annotation instanceof Default.Class) {
-        return ((Default.Class) annotation).value();
-      } else if (annotation instanceof Default.String) {
-        return ((Default.String) annotation).value();
-      } else if (annotation instanceof Default.Boolean) {
-        return ((Default.Boolean) annotation).value();
-      } else if (annotation instanceof Default.Character) {
-        return ((Default.Character) annotation).value();
-      } else if (annotation instanceof Default.Byte) {
-        return ((Default.Byte) annotation).value();
-      } else if (annotation instanceof Default.Short) {
-        return ((Default.Short) annotation).value();
-      } else if (annotation instanceof Default.Integer) {
-        return ((Default.Integer) annotation).value();
-      } else if (annotation instanceof Default.Long) {
-        return ((Default.Long) annotation).value();
-      } else if (annotation instanceof Default.Float) {
-        return ((Default.Float) annotation).value();
-      } else if (annotation instanceof Default.Double) {
-        return ((Default.Double) annotation).value();
-      } else if (annotation instanceof Default.Enum) {
-        return Enum.valueOf((Class<Enum>) method.getReturnType(),
-            ((Default.Enum) annotation).value());
-      } else if (annotation instanceof Default.InstanceFactory) {
-        return InstanceBuilder.ofType(((Default.InstanceFactory) annotation).value())
-            .build()
-            .create(proxy);
-      }
-    }
-
-    /*
-     * We need to make sure that we return something appropriate for the return type. Thus
we return
-     * a default value as defined by the JLS.
-     */
-    return Defaults.defaultValue(method.getReturnType());
-  }
-
-  /**
-   * Returns a map from the getters method name to the name of the property based upon the
passed in
-   * {@link PropertyDescriptor}s property descriptors.
-   *
-   * @param propertyDescriptors A list of {@link PropertyDescriptor}s to use when generating
the
-   *        map.
-   * @return A map of getter method name to property name.
-   */
-  private static Map<String, String> generateGettersToPropertyNames(
-      List<PropertyDescriptor> propertyDescriptors) {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    for (PropertyDescriptor descriptor : propertyDescriptors) {
-      if (descriptor.getReadMethod() != null) {
-        builder.put(descriptor.getReadMethod().getName(), descriptor.getName());
-      }
-    }
-    return builder.build();
-  }
-
-  /**
-   * Returns a map from the setters method name to its matching getters method name based
upon the
-   * passed in {@link PropertyDescriptor}s property descriptors.
-   *
-   * @param propertyDescriptors A list of {@link PropertyDescriptor}s to use when generating
the
-   *        map.
-   * @return A map of setter method name to getter method name.
-   */
-  private static Map<String, String> generateSettersToPropertyNames(
-      List<PropertyDescriptor> propertyDescriptors) {
-    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
-    for (PropertyDescriptor descriptor : propertyDescriptors) {
-      if (descriptor.getWriteMethod() != null) {
-        builder.put(descriptor.getWriteMethod().getName(), descriptor.getName());
-      }
-    }
-    return builder.build();
-  }
-
-  static class Serializer extends JsonSerializer<PipelineOptions> {
-    @Override
-    public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvider provider)
-        throws IOException, JsonProcessingException {
-      ProxyInvocationHandler handler = (ProxyInvocationHandler) Proxy.getInvocationHandler(value);
-      synchronized (handler) {
-        // We first filter out any properties that have been modified since
-        // the last serialization of this PipelineOptions and then verify that
-        // they are all serializable.
-        Map<String, Object> filteredOptions = Maps.newHashMap(handler.options);
-        removeIgnoredOptions(handler.knownInterfaces, filteredOptions);
-        ensureSerializable(handler.knownInterfaces, filteredOptions);
-
-        // Now we create the map of serializable options by taking the original
-        // set of serialized options (if any) and updating them with any properties
-        // instances that have been modified since the previous serialization.
-        Map<String, Object> serializableOptions =
-            Maps.<String, Object>newHashMap(handler.jsonOptions);
-        serializableOptions.putAll(filteredOptions);
-        jgen.writeStartObject();
-        jgen.writeFieldName("options");
-        jgen.writeObject(serializableOptions);
-        jgen.writeEndObject();
-      }
-    }
-
-    /**
-     * We remove all properties within the passed in options where there getter is annotated
with
-     * {@link JsonIgnore @JsonIgnore} from the passed in options using the passed in interfaces.
-     */
-    private void removeIgnoredOptions(
-        Set<Class<? extends PipelineOptions>> interfaces, Map<String, Object>
options) {
-      // Find all the method names that are annotated with JSON ignore.
-      Set<String> jsonIgnoreMethodNames = FluentIterable.from(
-          ReflectHelpers.getClosureOfMethodsOnInterfaces(interfaces))
-          .filter(JsonIgnorePredicate.INSTANCE).transform(new Function<Method, String>()
{
-            @Override
-            public String apply(Method input) {
-              return input.getName();
-            }
-          }).toSet();
-
-      // Remove all options that have the same method name as the descriptor.
-      for (PropertyDescriptor descriptor
-          : PipelineOptionsFactory.getPropertyDescriptors(interfaces)) {
-        if (jsonIgnoreMethodNames.contains(descriptor.getReadMethod().getName())) {
-          options.remove(descriptor.getName());
-        }
-      }
-    }
-
-    /**
-     * We use an {@link ObjectMapper} to verify that the passed in options are serializable
-     * and deserializable.
-     */
-    private void ensureSerializable(Set<Class<? extends PipelineOptions>> interfaces,
-        Map<String, Object> options) throws IOException {
-      // Construct a map from property name to the return type of the getter.
-      Map<String, Type> propertyToReturnType = Maps.newHashMap();
-      for (PropertyDescriptor descriptor
-          : PipelineOptionsFactory.getPropertyDescriptors(interfaces)) {
-        if (descriptor.getReadMethod() != null) {
-          propertyToReturnType.put(descriptor.getName(),
-              descriptor.getReadMethod().getGenericReturnType());
-        }
-      }
-
-      // Attempt to serialize and deserialize each property.
-      for (Map.Entry<String, Object> entry : options.entrySet()) {
-        try {
-          String serializedValue = MAPPER.writeValueAsString(entry.getValue());
-          JavaType type = MAPPER.getTypeFactory()
-              .constructType(propertyToReturnType.get(entry.getKey()));
-          MAPPER.readValue(serializedValue, type);
-        } catch (Exception e) {
-          throw new IOException(String.format(
-              "Failed to serialize and deserialize property '%s' with value '%s'",
-              entry.getKey(), entry.getValue()), e);
-        }
-      }
-    }
-  }
-
-  static class Deserializer extends JsonDeserializer<PipelineOptions> {
-    @Override
-    public PipelineOptions deserialize(JsonParser jp, DeserializationContext ctxt)
-        throws IOException, JsonProcessingException {
-      ObjectNode objectNode = (ObjectNode) jp.readValueAsTree();
-      ObjectNode optionsNode = (ObjectNode) objectNode.get("options");
-
-      Map<String, JsonNode> fields = Maps.newHashMap();
-      for (Iterator<Map.Entry<String, JsonNode>> iterator = optionsNode.fields();
-          iterator.hasNext(); ) {
-        Map.Entry<String, JsonNode> field = iterator.next();
-        fields.put(field.getKey(), field.getValue());
-      }
-      PipelineOptions options =
-          new ProxyInvocationHandler(Maps.<String, Object>newHashMap(), fields)
-              .as(PipelineOptions.class);
-      return options;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java
deleted file mode 100644
index deb19e9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/PubsubOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Properties that can be set when using Pubsub with the Beam SDK.
- */
-@Description("Options that are used to configure BigQuery. See "
-    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface PubsubOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-
-  /**
-   * Root URL for use with the Pubsub API.
-   */
-  @Description("Root URL for use with the Pubsub API")
-  @Default.String("https://pubsub.googleapis.com")
-  @Hidden
-  String getPubsubRootUrl();
-  void setPubsubRootUrl(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
deleted file mode 100644
index 67cb386..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/StreamingOptions.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options used to configure streaming.
- */
-public interface StreamingOptions extends
-    ApplicationNameOptions, GcpOptions, PipelineOptions {
-  /**
-   * Set to true if running a streaming pipeline.
-   */
-  @Description("Set to true if running a streaming pipeline.")
-  boolean isStreaming();
-  void setStreaming(boolean value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
deleted file mode 100644
index b7725a6..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Validation.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * {@link Validation} represents a set of annotations that can be used to annotate getter
- * properties on {@link PipelineOptions} with information representing the validation criteria
to
- * be used when validating with the {@link PipelineOptionsValidator}.
- */
-public @interface Validation {
-  /**
-   * This criteria specifies that the value must be not null. Note that this annotation
-   * should only be applied to methods that return nullable objects.
-   */
-  @Target(value = ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Required {
-    /**
-     * The groups that the annotated attribute is a member of. A member can be in 0 or more
groups.
-     * Members not in any groups are considered to be in a group consisting exclusively of
-     * themselves. At least one member of a group must be non-null if the options are to
be valid.
-     */
-    String[] groups() default {};
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
deleted file mode 100644
index 63a03f5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines {@link com.google.cloud.dataflow.sdk.options.PipelineOptions} for
- * configuring pipeline execution.
- *
- * <p>{@link com.google.cloud.dataflow.sdk.options.PipelineOptions} encapsulates the
various
- * parameters that describe how a pipeline should be run. {@code PipelineOptions} are created
- * using a {@link com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory}.
- */
-package com.google.cloud.dataflow.sdk.options;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
deleted file mode 100644
index ab54533..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/package-info.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Provides a simple, powerful model for building both batch and
- * streaming parallel data processing
- * {@link com.google.cloud.dataflow.sdk.Pipeline}s.
- *
- * <p>To use the Google Cloud Dataflow SDK, you build a
- * {@link com.google.cloud.dataflow.sdk.Pipeline}, which manages a graph of
- * {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s
- * and the {@link com.google.cloud.dataflow.sdk.values.PCollection}s that
- * the PTransforms consume and produce.
- *
- * <p>Each Pipeline has a
- * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner} to specify
- * where and how it should run after pipeline construction is complete.
- *
- */
-package com.google.cloud.dataflow.sdk;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
deleted file mode 100644
index 3008c6c..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorPipelineExtractor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AggregatorRetriever;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map}
of
- * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
- */
-public class AggregatorPipelineExtractor {
-  private final Pipeline pipeline;
-
-  /**
-   * Creates an {@code AggregatorPipelineExtractor} for the given {@link Pipeline}.
-   */
-  public AggregatorPipelineExtractor(Pipeline pipeline) {
-    this.pipeline = pipeline;
-  }
-
-  /**
-   * Returns a {@link Map} between each {@link Aggregator} in the {@link Pipeline} to the
{@link
-   * PTransform PTransforms} in which it is used.
-   */
-  public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps()
{
-    HashMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps =
HashMultimap.create();
-    pipeline.traverseTopologically(new AggregatorVisitor(aggregatorSteps));
-    return aggregatorSteps.asMap();
-  }
-
-  private static class AggregatorVisitor implements PipelineVisitor {
-    private final SetMultimap<Aggregator<?, ?>, PTransform<?, ?>> aggregatorSteps;
-
-    public AggregatorVisitor(SetMultimap<Aggregator<?, ?>, PTransform<?, ?>>
aggregatorSteps) {
-      this.aggregatorSteps = aggregatorSteps;
-    }
-
-    @Override
-    public void enterCompositeTransform(TransformTreeNode node) {}
-
-    @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {}
-
-    @Override
-    public void visitTransform(TransformTreeNode node) {
-      PTransform<?, ?> transform = node.getTransform();
-      addStepToAggregators(transform, getAggregators(transform));
-    }
-
-    private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?>
transform) {
-      if (transform != null) {
-        if (transform instanceof ParDo.Bound) {
-          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
-        } else if (transform instanceof ParDo.BoundMulti) {
-          return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
-        }
-      }
-      return Collections.emptyList();
-    }
-
-    private void addStepToAggregators(
-        PTransform<?, ?> transform, Collection<Aggregator<?, ?>> aggregators)
{
-      for (Aggregator<?, ?> aggregator : aggregators) {
-        aggregatorSteps.put(aggregator, transform);
-      }
-    }
-
-    @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
deleted file mode 100644
index 2ed0afa..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorRetrievalException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-
-/**
- * Signals that an exception has occurred while retrieving {@link Aggregator}s.
- */
-public class AggregatorRetrievalException extends Exception {
-  /**
-   * Constructs a new {@code AggregatorRetrievalException} with the specified detail message
and
-   * cause.
-   */
-  public AggregatorRetrievalException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
deleted file mode 100644
index 8f26b36..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/AggregatorValues.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * A collection of values associated with an {@link Aggregator}. Aggregators declared in
a
- * {@link DoFn} are emitted on a per-{@code DoFn}-application basis.
- *
- * @param <T> the output type of the aggregator
- */
-public abstract class AggregatorValues<T> {
-  /**
-   * Get the values of the {@link Aggregator} at all steps it was used.
-   */
-  public Collection<T> getValues() {
-    return getValuesAtSteps().values();
-  }
-
-  /**
-   * Get the values of the {@link Aggregator} by the user name at each step it was used.
-   */
-  public abstract Map<String, T> getValuesAtSteps();
-
-  /**
-   * Get the total value of this {@link Aggregator} by applying the specified {@link CombineFn}.
-   */
-  public T getTotalValue(CombineFn<T, ?, T> combineFn) {
-    return combineFn.apply(getValues());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
deleted file mode 100644
index df596b2..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipeline.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
-
-/**
- * A {@link DirectPipeline} is a {@link Pipeline} that returns
- * {@link DirectPipelineRunner.EvaluationResults} when it is
- * {@link com.google.cloud.dataflow.sdk.Pipeline#run()}.
- */
-public class DirectPipeline extends Pipeline {
-
-  /**
-   * Creates and returns a new DirectPipeline instance for tests.
-   */
-  public static DirectPipeline createForTest() {
-    DirectPipelineRunner runner = DirectPipelineRunner.createForTest();
-    return new DirectPipeline(runner, runner.getPipelineOptions());
-  }
-
-  private DirectPipeline(DirectPipelineRunner runner, DirectPipelineOptions options) {
-    super(runner, options);
-  }
-
-  @Override
-  public DirectPipelineRunner.EvaluationResults run() {
-    return (DirectPipelineRunner.EvaluationResults) super.run();
-  }
-
-  @Override
-  public DirectPipelineRunner getRunner() {
-    return (DirectPipelineRunner) super.getRunner();
-  }
-
-  @Override
-  public String toString() {
-    return "DirectPipeline#" + hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
deleted file mode 100644
index f4a5600..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRegistrar.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.runners;
-
-import com.google.auto.service.AutoService;
-import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsRegistrar;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DirectPipeline}.
- */
-public class DirectPipelineRegistrar {
-  private DirectPipelineRegistrar() { }
-
-  /**
-   * Register the {@link DirectPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners()
{
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectPipelineRunner.class);
-    }
-  }
-
-  /**
-   * Register the {@link DirectPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(DirectPipelineOptions.class);
-    }
-  }
-}



Mime
View raw message