Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 52B5E200C44 for ; Mon, 27 Mar 2017 17:19:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5124B160B5D; Mon, 27 Mar 2017 15:19:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CE774160BA6 for ; Mon, 27 Mar 2017 17:19:04 +0200 (CEST) Received: (qmail 73964 invoked by uid 500); 27 Mar 2017 15:19:03 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 73325 invoked by uid 99); 27 Mar 2017 15:19:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Mar 2017 15:19:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8BC99DFBC9; Mon, 27 Mar 2017 15:19:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.apache.org Date: Mon, 27 Mar 2017 15:19:19 -0000 Message-Id: In-Reply-To: <1662890fd0a04cb38b9009b3fd256cf7@git.apache.org> References: <1662890fd0a04cb38b9009b3fd256cf7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/19] apex-malhar git commit: APEXMALHAR-2233 Updated the examples to follow the structure of apex-malhar examples. Specified dependencies in pom.xmls of individual examples correctly. archived-at: Mon, 27 Mar 2017 15:19:07 -0000 APEXMALHAR-2233 Updated the examples to follow the structure of apex-malhar examples. Specified dependencies in pom.xmls of individual examples correctly. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9c154f20 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9c154f20 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9c154f20 Branch: refs/heads/master Commit: 9c154f204042a9e1974c2466e8783505b2c6da03 Parents: 8e20097 Author: Lakshmi Prasanna Velineni Authored: Sun Mar 19 22:40:04 2017 -0700 Committer: Lakshmi Prasanna Velineni Committed: Sun Mar 26 11:43:48 2017 -0700 ---------------------------------------------------------------------- examples/csvformatter/pom.xml | 299 +++---------------- .../java/com/demo/myapexapp/Application.java | 45 --- .../com/demo/myapexapp/HDFSOutputOperator.java | 87 ------ .../java/com/demo/myapexapp/JsonGenerator.java | 78 ----- .../main/java/com/demo/myapexapp/PojoEvent.java | 141 --------- .../apex/examples/csvformatter/Application.java | 37 +++ .../csvformatter/HDFSOutputOperator.java | 87 ++++++ .../examples/csvformatter/JsonGenerator.java | 76 +++++ .../apex/examples/csvformatter/PojoEvent.java | 141 +++++++++ .../src/main/resources/META-INF/properties.xml | 2 +- .../com/demo/myapexapp/ApplicationTest.java | 67 ----- .../examples/csvformatter/ApplicationTest.java | 65 ++++ examples/dedup/pom.xml | 277 +---------------- .../java/com/example/dedup/Application.java | 123 -------- .../apache/apex/examples/dedup/Application.java | 122 ++++++++ .../java/com/example/dedup/ApplicationTest.java | 38 --- .../apex/examples/dedup/ApplicationTest.java | 37 +++ examples/dynamic-partition/pom.xml | 270 +---------------- .../src/main/java/com/example/dynamic/App.java | 23 -- .../src/main/java/com/example/dynamic/Gen.java | 169 ----------- .../org/apache/apex/examples/dynamic/App.java | 23 ++ .../org/apache/apex/examples/dynamic/Gen.java | 171 +++++++++++ .../com/example/dynamic/ApplicationTest.java | 34 --- .../apex/examples/dynamic/ApplicationTest.java | 33 ++ examples/enricher/pom.xml | 298 ++---------------- .../com/example/myapexapp/DataGenerator.java | 94 ------ .../myapexapp/EnricherAppWithJSONFile.java | 47 --- .../example/myapexapp/LineOutputOperator.java | 34 --- .../main/java/com/example/myapexapp/POJO.java | 49 --- .../com/example/myapexapp/POJOEnriched.java | 71 ----- .../apex/examples/enricher/DataGenerator.java | 94 ++++++ .../enricher/EnricherAppWithJSONFile.java | 47 +++ .../examples/enricher/LineOutputOperator.java | 34 +++ .../org/apache/apex/examples/enricher/POJO.java | 49 +++ .../apex/examples/enricher/POJOEnriched.java | 71 +++++ .../src/main/resources/META-INF/properties.xml | 6 +- .../com/example/myapexapp/ApplicationTest.java | 31 -- .../apex/examples/enricher/ApplicationTest.java | 31 ++ examples/filter/pom.xml | 276 +---------------- .../tutorial/filter/Application.java | 49 --- .../tutorial/filter/TransactionPOJO.java | 64 ---- .../apex/examples/filter/Application.java | 49 +++ .../apex/examples/filter/TransactionPOJO.java | 62 ++++ .../src/main/resources/META-INF/properties.xml | 8 +- .../tutorial/filter/ApplicationTest.java | 111 ------- .../apex/examples/filter/ApplicationTest.java | 96 ++++++ examples/innerjoin/pom.xml | 269 ++--------------- .../com/example/join/InnerJoinApplication.java | 39 --- .../java/com/example/join/POJOGenerator.java | 260 ---------------- .../innerjoin/InnerJoinApplication.java | 38 +++ .../apex/examples/innerjoin/POJOGenerator.java | 260 ++++++++++++++++ .../example/join/InnerJoinApplicationTest.java | 21 -- .../innerjoin/InnerJoinApplicationTest.java | 21 ++ examples/parser/pom.xml | 268 ++--------------- .../tutorial/jsonparser/Application.java | 35 --- .../tutorial/jsonparser/Campaign.java | 74 ----- .../tutorial/jsonparser/JsonGenerator.java | 83 ----- .../examples/parser/jsonparser/Application.java | 35 +++ .../examples/parser/jsonparser/Campaign.java | 74 +++++ .../parser/jsonparser/JsonGenerator.java | 83 +++++ .../src/main/resources/META-INF/properties.xml | 4 +- .../tutorial/jsonparser/ApplicationTest.java | 36 --- .../parser/jsonparser/ApplicationTest.java | 35 +++ examples/partition/pom.xml | 276 +---------------- .../java/com/example/myapexapp/Application.java | 27 -- .../main/java/com/example/myapexapp/Codec3.java | 13 - .../myapexapp/RandomNumberGenerator.java | 83 ----- .../com/example/myapexapp/TestPartition.java | 164 ---------- .../apex/examples/partition/Application.java | 25 ++ .../apache/apex/examples/partition/Codec3.java | 13 + .../partition/RandomNumberGenerator.java | 76 +++++ .../apex/examples/partition/TestPartition.java | 149 +++++++++ .../src/main/resources/my-log4j.properties | 2 +- .../com/example/myapexapp/ApplicationTest.java | 37 --- .../examples/partition/ApplicationTest.java | 36 +++ examples/pom.xml | 10 + examples/recordReader/pom.xml | 284 +----------------- .../com/example/recordReader/Application.java | 32 -- .../recordReader/TransactionsSchema.java | 168 ----------- .../apex/examples/recordReader/Application.java | 32 ++ .../recordReader/TransactionsSchema.java | 168 +++++++++++ .../src/main/resources/META-INF/properties.xml | 4 +- .../example/recordReader/ApplicationTest.java | 91 ------ .../examples/recordReader/ApplicationTest.java | 91 ++++++ examples/throttle/pom.xml | 256 +--------------- .../examples/throttle/Application.java | 51 ---- .../examples/throttle/PassThroughOperator.java | 20 -- .../throttle/RandomNumberGenerator.java | 64 ---- .../examples/throttle/SlowDevNullOperator.java | 35 --- .../throttle/ThrottlingStatsListener.java | 150 ---------- .../apex/examples/throttle/Application.java | 51 ++++ .../examples/throttle/PassThroughOperator.java | 20 ++ .../throttle/RandomNumberGenerator.java | 64 ++++ .../examples/throttle/SlowDevNullOperator.java | 35 +++ .../throttle/ThrottlingStatsListener.java | 150 ++++++++++ .../examples/throttle/ApplicationTest.java | 37 --- .../apex/examples/throttle/ApplicationTest.java | 36 +++ examples/transform/pom.xml | 250 +--------------- .../java/com/example/transform/Application.java | 39 --- .../com/example/transform/CustomerEvent.java | 74 ----- .../com/example/transform/CustomerInfo.java | 60 ---- .../transform/DynamicTransformApplication.java | 52 ---- .../com/example/transform/POJOGenerator.java | 125 -------- .../apex/examples/transform/Application.java | 39 +++ .../apex/examples/transform/CustomerEvent.java | 74 +++++ .../apex/examples/transform/CustomerInfo.java | 60 ++++ .../transform/DynamicTransformApplication.java | 51 ++++ .../apex/examples/transform/POJOGenerator.java | 125 ++++++++ .../com/example/transform/ApplicationTest.java | 21 -- .../examples/transform/ApplicationTest.java | 21 ++ 110 files changed, 3404 insertions(+), 6088 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/csvformatter/pom.xml b/examples/csvformatter/pom.xml index 9033db5..be3be7a 100644 --- a/examples/csvformatter/pom.xml +++ b/examples/csvformatter/pom.xml @@ -1,266 +1,61 @@ + 4.0.0 - com.example - 1.0-SNAPSHOT - formatter + + org.apache.apex + malhar-examples + 3.7.0-SNAPSHOT + + + malhar-examples-csvformatter jar Formatter Apps Applications to showcase different formatters - - - 3.5.0 - lib/*.jar - 3.6.0 - - - - - - org.apache.maven.plugins - maven-eclipse-plugin - 2.9 - - true - - - - maven-compiler-plugin - 3.3 - - UTF-8 - 1.7 - 1.7 - true - false - true - true - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - prepare-package - - copy-dependencies - - - target/deps - runtime - - - - - - - maven-assembly-plugin - - - app-package-assembly - package - - single - - - ${project.artifactId}-${project.version}-apexapp - false - - src/assemble/appPackage.xml - - - 0755 - - - - ${apex.apppackage.classpath} - ${apex.version} - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.name} - ${project.description} - - - - - - - - - maven-antrun-plugin - 1.7 - - - package - - - - - - - run - - - - - createJavadocDirectory - generate-resources - - - - - - - - run - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.9.1 - - - attach-artifacts - package - - attach-artifact - - - - - target/${project.artifactId}-${project.version}.apa - apa - - - false - - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - - xml-doclet - generate-resources - - javadoc - - - com.github.markusbernhardt.xmldoclet.XmlDoclet - -d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml - false - - com.github.markusbernhardt - xml-doclet - 1.0.4 - - - - - - - - org.codehaus.mojo - xml-maven-plugin - 1.0 - - - transform-xmljavadoc - generate-resources - - transform - - - - - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - XmlJavadocCommentsExtractor.xsl - ${project.build.directory}/generated-resources/xml-javadoc - - - - - - - maven-resources-plugin - 2.6 - - - copy-resources - process-resources - - copy-resources - - - ${basedir}/target/classes - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - true - - - - - - - - - - - - org.apache.apex - malhar-library - ${malhar.version} - - - org.apache.apex - malhar-contrib - ${malhar.version} + org.codehaus.jettison + jettison + 1.1 com.fasterxml.jackson.core jackson-databind - 2.5.4 + 2.7.0 com.github.fge @@ -269,29 +64,9 @@ true - org.apache.apex - apex-common - ${apex.version} - provided - - - junit - junit - 4.10 - test - - - org.apache.apex - apex-engine - ${apex.version} - test - - - net.sf.supercsv super-csv 2.4.0 - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java deleted file mode 100644 index a4ff06f..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.demo.myapexapp; - -import java.util.Arrays; - -import com.datatorrent.contrib.parser.JsonParser; - -import org.apache.apex.malhar.contrib.parser.StreamingJsonParser; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StatsListener; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.formatter.CsvFormatter; -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; -import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; - -@ApplicationAnnotation(name = "CustomOutputFormatter") -public class Application implements StreamingApplication -{ - //Set the delimiters and schema structure for the custom output in schema.json - private static final String filename = "schema.json"; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class); - JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class); - - CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class); - formatter.setSchema(SchemaUtils.jarResourceFileToString(filename)); - dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class); - - HDFSOutputOperator hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class); - hdfsOutput.setLineDelimiter(""); - - dag.addStream("parserStream", generator.out, jsonParser.in); - dag.addStream("formatterStream", jsonParser.out, formatter.in); - dag.addStream("outputStream", formatter.out, hdfsOutput.input); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java deleted file mode 100644 index 5cb162c..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.demo.myapexapp; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -/** - * HDFSoutput operator with implementation to write Objects to HDFS - * - * @param - */ -public class HDFSOutputOperator extends AbstractFileOutputOperator -{ - - @NotNull - String outFileName; - - //setting default value - String lineDelimiter = "\n"; - - //Switch to write the files to HDFS - set to false to diable writes - private boolean writeFilesFlag = true; - - int id; - - @Override - public void setup(OperatorContext context) - { - super.setup(context); - id = context.getId(); - } - - public boolean isWriteFilesFlag() - { - return writeFilesFlag; - } - - public void setWriteFilesFlag(boolean writeFilesFlag) - { - this.writeFilesFlag = writeFilesFlag; - } - - public String getOutFileName() - { - return outFileName; - } - - public void setOutFileName(String outFileName) - { - this.outFileName = outFileName; - } - - @Override - protected String getFileName(T tuple) - { - return getOutFileName() + id; - } - - public String getLineDelimiter() - { - return lineDelimiter; - } - - public void setLineDelimiter(String lineDelimiter) - { - this.lineDelimiter = lineDelimiter; - } - - @Override - protected byte[] getBytesForTuple(T tuple) - { - String temp = tuple.toString().concat(String.valueOf(lineDelimiter)); - byte[] theByteArray = temp.getBytes(); - - return theByteArray; - } - - @Override - protected void processTuple(T tuple) - { - if (writeFilesFlag) { - } - super.processTuple(tuple); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java deleted file mode 100644 index f50f300..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.demo.myapexapp; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Random; - -import javax.validation.constraints.Min; - -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -public class JsonGenerator extends BaseOperator implements InputOperator -{ - - private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class); - - @Min(1) - private int numTuples = 20; - private transient int count = 0; - - public static Random rand = new Random(); - private int sleepTime=5; - - public final transient DefaultOutputPort out = new DefaultOutputPort(); - - private static String getJson() - { - - JSONObject obj = new JSONObject(); - try { - obj.put("campaignId", 1234); - obj.put("campaignName", "SimpleCsvFormatterExample"); - obj.put("campaignBudget", 10000.0); - obj.put("weatherTargeting", "false"); - obj.put("securityCode", "APEX"); - } catch (JSONException e) { - return null; - } - return obj.toString(); - } - - @Override - public void beginWindow(long windowId) - { - count = 0; - } - - @Override - public void emitTuples() - { - if (count++ < numTuples) { - out.emit(getJson().getBytes()); - } else { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted"); - } - } - } - - public int getNumTuples() - { - return numTuples; - } - - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java deleted file mode 100644 index 8514856..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java +++ /dev/null @@ -1,141 +0,0 @@ -package com.demo.myapexapp; - -import java.util.Date; - -public class PojoEvent -{ - - private int advId; - private int campaignId; - private String campaignName; - private double campaignBudget; - private Date startDate; - private Date endDate; - private String securityCode; - private boolean weatherTargeting; - private boolean optimized; - private String parentCampaign; - private Character weatherTargeted; - private String valid; - - public int getAdvId() - { - return advId; - } - - public void setAdvId(int AdId) - { - this.advId = advId; - } - - public int getCampaignId() - { - return campaignId; - } - - public void setCampaignId(int campaignId) - { - this.campaignId = campaignId; - } - - public String getCampaignName() - { - return campaignName; - } - - public void setCampaignName(String campaignName) - { - this.campaignName = campaignName; - } - - public double getCampaignBudget() - { - return campaignBudget; - } - - public void setCampaignBudget(double campaignBudget) - { - this.campaignBudget = campaignBudget; - } - - public Date getStartDate() - { - return startDate; - } - - public void setStartDate(Date startDate) - { - this.startDate = startDate; - } - - public Date getEndDate() - { - return endDate; - } - - public void setEndDate(Date endDate) - { - this.endDate = endDate; - } - - public String getSecurityCode() - { - return securityCode; - } - - public void setSecurityCode(String securityCode) - { - this.securityCode = securityCode; - } - - public boolean isWeatherTargeting() - { - return weatherTargeting; - } - - public void setWeatherTargeting(boolean weatherTargeting) - { - this.weatherTargeting = weatherTargeting; - } - - public boolean isOptimized() - { - return optimized; - } - - public void setOptimized(boolean optimized) - { - this.optimized = optimized; - } - - public String getParentCampaign() - { - return parentCampaign; - } - - public void setParentCampaign(String parentCampaign) - { - this.parentCampaign = parentCampaign; - } - - public Character getWeatherTargeted() - { - return weatherTargeted; - } - - public void setWeatherTargeted(Character weatherTargeted) - { - this.weatherTargeted = weatherTargeted; - } - - public String getValid() - { - return valid; - } - - public void setValid(String valid) - { - this.valid = valid; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java new file mode 100644 index 0000000..cc9ee79 --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java @@ -0,0 +1,37 @@ +package org.apache.apex.examples.csvformatter; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.JsonParser; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; + +@ApplicationAnnotation(name = "CustomOutputFormatter") +public class Application implements StreamingApplication +{ + //Set the delimiters and schema structure for the custom output in schema.json + private static final String filename = "schema.json"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class); + JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class); + + CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class); + formatter.setSchema(SchemaUtils.jarResourceFileToString(filename)); + dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class); + + HDFSOutputOperator hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class); + hdfsOutput.setLineDelimiter(""); + + dag.addStream("parserStream", generator.out, jsonParser.in); + dag.addStream("formatterStream", jsonParser.out, formatter.in); + dag.addStream("outputStream", formatter.out, hdfsOutput.input); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java new file mode 100644 index 0000000..7cdd8bb --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java @@ -0,0 +1,87 @@ +package org.apache.apex.examples.csvformatter; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * HDFSoutput operator with implementation to write Objects to HDFS + * + * @param + */ +public class HDFSOutputOperator extends AbstractFileOutputOperator +{ + + @NotNull + String outFileName; + + //setting default value + String lineDelimiter = "\n"; + + //Switch to write the files to HDFS - set to false to diable writes + private boolean writeFilesFlag = true; + + int id; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + id = context.getId(); + } + + public boolean isWriteFilesFlag() + { + return writeFilesFlag; + } + + public void setWriteFilesFlag(boolean writeFilesFlag) + { + this.writeFilesFlag = writeFilesFlag; + } + + public String getOutFileName() + { + return outFileName; + } + + public void setOutFileName(String outFileName) + { + this.outFileName = outFileName; + } + + @Override + protected String getFileName(T tuple) + { + return getOutFileName() + id; + } + + public String getLineDelimiter() + { + return lineDelimiter; + } + + public void setLineDelimiter(String lineDelimiter) + { + this.lineDelimiter = lineDelimiter; + } + + @Override + protected byte[] getBytesForTuple(T tuple) + { + String temp = tuple.toString().concat(String.valueOf(lineDelimiter)); + byte[] theByteArray = temp.getBytes(); + + return theByteArray; + } + + @Override + protected void processTuple(T tuple) + { + if (writeFilesFlag) { + } + super.processTuple(tuple); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java new file mode 100644 index 0000000..9b7698c --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java @@ -0,0 +1,76 @@ +package org.apache.apex.examples.csvformatter; + +import java.util.Random; + +import javax.validation.constraints.Min; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class JsonGenerator extends BaseOperator implements InputOperator +{ + + private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class); + + @Min(1) + private int numTuples = 20; + private transient int count = 0; + + public static Random rand = new Random(); + private int sleepTime=5; + + public final transient DefaultOutputPort out = new DefaultOutputPort(); + + private static String getJson() + { + + JSONObject obj = new JSONObject(); + try { + obj.put("campaignId", 1234); + obj.put("campaignName", "SimpleCsvFormatterExample"); + obj.put("campaignBudget", 10000.0); + obj.put("weatherTargeting", "false"); + obj.put("securityCode", "APEX"); + } catch (JSONException e) { + return null; + } + return obj.toString(); + } + + @Override + public void beginWindow(long windowId) + { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(getJson().getBytes()); + } else { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + } + + public int getNumTuples() + { + return numTuples; + } + + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java new file mode 100644 index 0000000..03fda93 --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java @@ -0,0 +1,141 @@ +package org.apache.apex.examples.csvformatter; + +import java.util.Date; + +public class PojoEvent +{ + + private int advId; + private int campaignId; + private String campaignName; + private double campaignBudget; + private Date startDate; + private Date endDate; + private String securityCode; + private boolean weatherTargeting; + private boolean optimized; + private String parentCampaign; + private Character weatherTargeted; + private String valid; + + public int getAdvId() + { + return advId; + } + + public void setAdvId(int AdId) + { + this.advId = advId; + } + + public int getCampaignId() + { + return campaignId; + } + + public void setCampaignId(int campaignId) + { + this.campaignId = campaignId; + } + + public String getCampaignName() + { + return campaignName; + } + + public void setCampaignName(String campaignName) + { + this.campaignName = campaignName; + } + + public double getCampaignBudget() + { + return campaignBudget; + } + + public void setCampaignBudget(double campaignBudget) + { + this.campaignBudget = campaignBudget; + } + + public Date getStartDate() + { + return startDate; + } + + public void setStartDate(Date startDate) + { + this.startDate = startDate; + } + + public Date getEndDate() + { + return endDate; + } + + public void setEndDate(Date endDate) + { + this.endDate = endDate; + } + + public String getSecurityCode() + { + return securityCode; + } + + public void setSecurityCode(String securityCode) + { + this.securityCode = securityCode; + } + + public boolean isWeatherTargeting() + { + return weatherTargeting; + } + + public void setWeatherTargeting(boolean weatherTargeting) + { + this.weatherTargeting = weatherTargeting; + } + + public boolean isOptimized() + { + return optimized; + } + + public void setOptimized(boolean optimized) + { + this.optimized = optimized; + } + + public String getParentCampaign() + { + return parentCampaign; + } + + public void setParentCampaign(String parentCampaign) + { + this.parentCampaign = parentCampaign; + } + + public Character getWeatherTargeted() + { + return weatherTargeted; + } + + public void setWeatherTargeted(Character weatherTargeted) + { + this.weatherTargeted = weatherTargeted; + } + + public String getValid() + { + return valid; + } + + public void setValid(String valid) + { + this.valid = valid; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/resources/META-INF/properties.xml b/examples/csvformatter/src/main/resources/META-INF/properties.xml index ed2b5ce..8d67c93 100644 --- a/examples/csvformatter/src/main/resources/META-INF/properties.xml +++ b/examples/csvformatter/src/main/resources/META-INF/properties.xml @@ -20,7 +20,7 @@ dt.application.CustomOutputFormatter.operator.jsonParser.port.out.attr.TUPLE_CLASS - com.demo.myapexapp.PojoEvent + org.apache.apex.examples.csvformatter.PojoEvent http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java b/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java deleted file mode 100644 index efe5946..0000000 --- a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.demo.myapexapp; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; - -import javax.validation.ConstraintViolationException; - -import org.apache.commons.io.FileUtils; - -import org.junit.AfterClass; -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; - -import org.junit.Test; - -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest -{ - - private static final String FILE_NAME = "/tmp/formatterApp"; - - @AfterClass - public static void cleanup() - { - try { - FileUtils.deleteDirectory(new File(FILE_NAME)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testApplication() throws Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - - // wait for output files to roll - Thread.sleep(5000); - - String[] extensions = {"dat.0", "tmp"}; - Collection list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); - - for (File file : list) { - for (String line : FileUtils.readLines(file)) { - Assert.assertEquals("Delimiter in record", true, (line.equals( - "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||"))); - } - } - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java b/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java new file mode 100644 index 0000000..67d5fd0 --- /dev/null +++ b/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java @@ -0,0 +1,65 @@ +package org.apache.apex.examples.csvformatter; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + + private static final String FILE_NAME = "/tmp/formatterApp"; + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File(FILE_NAME)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to roll + Thread.sleep(5000); + + String[] extensions = {"dat.0", "tmp"}; + Collection list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); + + for (File file : list) { + for (String line : FileUtils.readLines(file)) { + Assert.assertEquals("Delimiter in record", true, (line.equals( + "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||"))); + } + } + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/pom.xml ---------------------------------------------------------------------- diff --git a/examples/dedup/pom.xml b/examples/dedup/pom.xml index f777784..ba5a24d 100644 --- a/examples/dedup/pom.xml +++ b/examples/dedup/pom.xml @@ -2,279 +2,30 @@ 4.0.0 - com.example - 1.0-SNAPSHOT - dedup + + org.apache.apex + malhar-examples + 3.7.0-SNAPSHOT + + + malhar-examples-dedup jar - My Apex Application - My Apex Application Description - - - - 3.5.0 - 3.6.0 - lib/*.jar - - - - - - org.apache.maven.plugins - maven-eclipse-plugin - 2.9 - - true - - - - maven-compiler-plugin - 3.3 - - UTF-8 - 1.7 - 1.7 - true - false - true - true - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - prepare-package - - copy-dependencies - - - target/deps - runtime - - - - - - - maven-assembly-plugin - - - app-package-assembly - package - - single - - - ${project.artifactId}-${project.version}-apexapp - false - - src/assemble/appPackage.xml - - - 0755 - - - - ${apex.apppackage.classpath} - ${apex.version} - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.name} - ${project.description} - - - - - - - - - maven-antrun-plugin - 1.7 - - - package - - - - - - - run - - - - - createJavadocDirectory - generate-resources - - - - - - - - run - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.9.1 - - - attach-artifacts - package - - attach-artifact - - - - - target/${project.artifactId}-${project.version}.apa - apa - - - false - - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - - xml-doclet - generate-resources - - javadoc - - - com.github.markusbernhardt.xmldoclet.XmlDoclet - -d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml - false - - com.github.markusbernhardt - xml-doclet - 1.0.4 - - - - - - - - org.codehaus.mojo - xml-maven-plugin - 1.0 - - - transform-xmljavadoc - generate-resources - - transform - - - - - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - XmlJavadocCommentsExtractor.xsl - ${project.build.directory}/generated-resources/xml-javadoc - - - - - - - maven-resources-plugin - 2.6 - - - copy-resources - process-resources - - copy-resources - - - ${basedir}/target/classes - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - true - - - - - - - - - - + Dedup Application + Dedup Application - - - org.apache.apex - malhar-library - ${malhar.version} - - - - - org.apache.apex - apex-common - ${apex.version} - provided - - - junit - junit - 4.10 - test - - - org.apache.apex - apex-engine - ${apex.version} - test - org.codehaus.janino janino 2.7.8 + + joda-time + joda-time + 2.9.1 + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/main/java/com/example/dedup/Application.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/main/java/com/example/dedup/Application.java b/examples/dedup/src/main/java/com/example/dedup/Application.java deleted file mode 100644 index cabdce2..0000000 --- a/examples/dedup/src/main/java/com/example/dedup/Application.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.dedup; - -import java.util.Date; -import java.util.Random; - -import org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; - -@ApplicationAnnotation(name="DedupExample") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Test Data Generator Operator - RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator()); - - // Dedup Operator. Configuration through resources/META-INF/properties.xml - TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator()); - - // Console output operator for unique tuples - ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator()); - - // Console output operator for duplicate tuples - ConsoleOutputOperator consoleDuplicate = dag.addOperator("ConsoleDuplicate", new ConsoleOutputOperator()); - - // Console output operator for duplicate tuples - ConsoleOutputOperator consoleExpired = dag.addOperator("ConsoleExpired", new ConsoleOutputOperator()); - - // Streams - dag.addStream("Generator to Dedup", gen.output, dedup.input); - - // Connect Dedup unique to Console - dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input); - // Connect Dedup duplicate to Console - dag.addStream("Dedup Duplicate to Console", dedup.duplicate, consoleDuplicate.input); - // Connect Dedup expired to Console - dag.addStream("Dedup Expired to Console", dedup.expired, consoleExpired.input); - - // Set Attribute TUPLE_CLASS for supplying schema information to the port - dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); - - // Uncomment the following line to create multiple partitions for Dedup operator. In this case: 2 - // dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2)); - } - - public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator - { - - public final transient DefaultOutputPort output = new DefaultOutputPort<>(); - private final transient Random r = new Random(); - private int tuplesPerWindow = 100; - private transient int count = 0; - - @Override - public void beginWindow(long windowId) { - count = 0; - } - - @Override - public void emitTuples() - { - if (count++ > tuplesPerWindow) { - return; - } - TestEvent event = new TestEvent(); - event.id = r.nextInt(100); - event.eventTime = new Date(System.currentTimeMillis() - (r.nextInt(60 * 1000))); - output.emit(event); - } - } - - public static class TestEvent - { - private int id; - private Date eventTime; - - public TestEvent() - { - } - - public int getId() - { - return id; - } - - public void setId(int id) - { - this.id = id; - } - - public Date getEventTime() - { - return eventTime; - } - - public void setEventTime(Date eventTime) - { - this.eventTime = eventTime; - } - - @Override - public String toString() { - return "TestEvent [id=" + id + ", eventTime=" + eventTime + "]"; - } - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java b/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java new file mode 100644 index 0000000..2498d62 --- /dev/null +++ b/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java @@ -0,0 +1,122 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.dedup; + +import java.util.Date; +import java.util.Random; + +import org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +@ApplicationAnnotation(name="DedupExample") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Test Data Generator Operator + RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator()); + + // Dedup Operator. Configuration through resources/META-INF/properties.xml + TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator()); + + // Console output operator for unique tuples + ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator()); + + // Console output operator for duplicate tuples + ConsoleOutputOperator consoleDuplicate = dag.addOperator("ConsoleDuplicate", new ConsoleOutputOperator()); + + // Console output operator for duplicate tuples + ConsoleOutputOperator consoleExpired = dag.addOperator("ConsoleExpired", new ConsoleOutputOperator()); + + // Streams + dag.addStream("Generator to Dedup", gen.output, dedup.input); + + // Connect Dedup unique to Console + dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input); + // Connect Dedup duplicate to Console + dag.addStream("Dedup Duplicate to Console", dedup.duplicate, consoleDuplicate.input); + // Connect Dedup expired to Console + dag.addStream("Dedup Expired to Console", dedup.expired, consoleExpired.input); + + // Set Attribute TUPLE_CLASS for supplying schema information to the port + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + + // Uncomment the following line to create multiple partitions for Dedup operator. In this case: 2 + // dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2)); + } + + public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator + { + + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + private final transient Random r = new Random(); + private int tuplesPerWindow = 100; + private transient int count = 0; + + @Override + public void beginWindow(long windowId) { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ > tuplesPerWindow) { + return; + } + TestEvent event = new TestEvent(); + event.id = r.nextInt(100); + event.eventTime = new Date(System.currentTimeMillis() - (r.nextInt(60 * 1000))); + output.emit(event); + } + } + + public static class TestEvent + { + private int id; + private Date eventTime; + + public TestEvent() + { + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getEventTime() + { + return eventTime; + } + + public void setEventTime(Date eventTime) + { + this.eventTime = eventTime; + } + + @Override + public String toString() { + return "TestEvent [id=" + id + ", eventTime=" + eventTime + "]"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java b/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java deleted file mode 100644 index 9c9f17c..0000000 --- a/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.dedup; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; -import com.example.dedup.Application; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - - @Test - public void testApplication() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - Thread.sleep(10 * 1000); - lc.shutdown(); - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java b/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java new file mode 100644 index 0000000..3304a04 --- /dev/null +++ b/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java @@ -0,0 +1,37 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.dedup; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + Thread.sleep(10 * 1000); + lc.shutdown(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/pom.xml ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/pom.xml b/examples/dynamic-partition/pom.xml index 34e91ee..21b1c30 100644 --- a/examples/dynamic-partition/pom.xml +++ b/examples/dynamic-partition/pom.xml @@ -1,273 +1,25 @@ 4.0.0 - - com.example - 1.0-SNAPSHOT - dynamic-partition + + + org.apache.apex + malhar-examples + 3.7.0-SNAPSHOT + + + malhar-examples-dynamic-partition jar Dynamic Partitioning Example showing dynamic partitioning - - - 3.5.0 - lib/*.jar - - - - - - org.apache.maven.plugins - maven-eclipse-plugin - 2.9 - - true - - - - maven-compiler-plugin - 3.3 - - UTF-8 - 1.7 - 1.7 - true - false - true - true - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - prepare-package - - copy-dependencies - - - target/deps - runtime - - - - - - - maven-assembly-plugin - - - app-package-assembly - package - - single - - - ${project.artifactId}-${project.version}-apexapp - false - - src/assemble/appPackage.xml - - - 0755 - - - - ${apex.apppackage.classpath} - ${apex.version} - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.name} - ${project.description} - - - - - - - - - maven-antrun-plugin - 1.7 - - - package - - - - - - - run - - - - - createJavadocDirectory - generate-resources - - - - - - - - run - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.9.1 - - - attach-artifacts - package - - attach-artifact - - - - - target/${project.artifactId}-${project.version}.apa - apa - - - false - - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - - xml-doclet - generate-resources - - javadoc - - - com.github.markusbernhardt.xmldoclet.XmlDoclet - -d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml - false - - com.github.markusbernhardt - xml-doclet - 1.0.4 - - - - - - - - org.codehaus.mojo - xml-maven-plugin - 1.0 - - - transform-xmljavadoc - generate-resources - - transform - - - - - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - XmlJavadocCommentsExtractor.xsl - ${project.build.directory}/generated-resources/xml-javadoc - - - - - - - maven-resources-plugin - 2.6 - - - copy-resources - process-resources - - copy-resources - - - ${basedir}/target/classes - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - true - - - - - - - - - - - - - - org.apache.apex - malhar-library - 3.6.0 - - - - - org.apache.apex - apex-common - ${apex.version} - provided - - - junit - junit - 4.10 - test - - org.apache.apex - apex-engine - ${apex.version} - test + com.esotericsoftware.kryo + kryo + 2.24.0 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java b/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java deleted file mode 100644 index 9eec263..0000000 --- a/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.example.dynamic; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.DAG; - -import com.datatorrent.lib.stream.DevNull; - -@ApplicationAnnotation(name="Dyn") -public class App implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - Gen gen = dag.addOperator("gen", Gen.class); - DevNull devNull = dag.addOperator("devNull", DevNull.class); - - dag.addStream("data", gen.out, devNull.data); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java b/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java deleted file mode 100644 index 4cccd23..0000000 --- a/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.example.dynamic; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.validation.constraints.NotNull; -import java.io.ByteArrayOutputStream; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import com.datatorrent.api.DefaultPartition; -import com.datatorrent.api.Partitioner; -import com.datatorrent.api.StatsListener; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -/** - * Operator that dynamically partitions itself after 500 tuples have been emitted - */ -public class Gen extends BaseOperator implements InputOperator, Partitioner, StatsListener -{ - private static final Logger LOG = LoggerFactory.getLogger(Gen.class); - - private static final int MAX_PARTITIONS = 4; // maximum number of partitions - - private int partitions = 2; // initial number of partitions - - @NotNull - private int numTuples; // number of tuples to emit per window - - private transient int count = 0; - - public final transient DefaultOutputPort out = new DefaultOutputPort(); - - @Override - public void partitioned(Map> map) - { - if (partitions != map.size()) { - String msg = String.format("partitions = %d, map.size = %d%n", partitions, map.size()); - throw new RuntimeException(msg); - } - } - - @Override - public void beginWindow(long windowId) - { - count = 0; - } - - @Override - public void emitTuples() - { - if (count < numTuples) { - ++count; - out.emit(Math.random()); - } - } - - public int getNumTuples() - { - return numTuples; - } - - /** - * Sets the number of tuples to be emitted every window. - * @param numTuples number of tuples - */ - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - } - - @Override - public Response processStats(BatchedOperatorStats batchedOperatorStats) { - - final long emittedCount = batchedOperatorStats.getTuplesEmittedPSMA(); - - // we only perform a single dynamic repartition - Response res = new Response(); - res.repartitionRequired = false; - if (emittedCount > 500 && partitions < MAX_PARTITIONS) { - LOG.info("processStats: trying repartition of input operator current {} required {}", - partitions, MAX_PARTITIONS); - LOG.info("**** operator id = {}, window id = {}, tuplesProcessedPSMA = {}, tuplesEmittedPSMA = {}", - batchedOperatorStats.getOperatorId(), - batchedOperatorStats.getCurrentWindowId(), - batchedOperatorStats.getTuplesProcessedPSMA(), - emittedCount); - partitions = MAX_PARTITIONS; - res.repartitionRequired = true; - } - - return res; - } // processStats - - /** - * Clone object by serializing and deserializing using Kryo. - * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. - * - * @param kryo kryo object used to clone objects - * @param src src object that copy from - * @return cloned object - */ - @SuppressWarnings("unchecked") - private static SRC cloneObject(Kryo kryo, SRC src) - { - kryo.setClassLoader(src.getClass().getClassLoader()); - ByteArrayOutputStream bos = null; - Output output; - Input input = null; - try { - bos = new ByteArrayOutputStream(); - output = new Output(bos); - kryo.writeObject(output, src); - output.close(); - input = new Input(bos.toByteArray()); - return (SRC)kryo.readObject(input, src.getClass()); - } finally { - IOUtils.closeQuietly(input); - IOUtils.closeQuietly(bos); - } - } - - @Override - public Collection> definePartitions( - Collection> list, PartitioningContext context) - { - if (partitions < 0) { // error - String msg = String.format("Error: Bad value: partitions = %d%n", partitions); - LOG.error(msg); - throw new RuntimeException(msg); - } - - final int prevCount = list.size(); - if (1 == prevCount) { // initial call - LOG.info("definePartitions: First call, prevCount = {}, partitions = {}", - prevCount, partitions); - } - - if (prevCount == partitions) { - LOG.info("definePartitions: Nothing to do in definePartitions"); - return list; // nothing to do - } - - LOG.debug("definePartitions: Repartitioning from {} to {}", prevCount, partitions); - - Kryo kryo = new Kryo(); - - // return value: new list of partitions (includes old list) - List> newPartitions = Lists.newArrayListWithExpectedSize(partitions); - - for (int i = 0; i < partitions; i++) { - Gen oper = cloneObject(kryo, this); - newPartitions.add(new DefaultPartition<>(oper)); - } - - LOG.info("definePartition: returning {} partitions", newPartitions.size()); - return newPartitions; - } - -}