beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4707) Change fields and table names in nexmark perfkit tables
Date Tue, 03 Jul 2018 08:34:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4707?focusedWorklogId=118622&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118622
]

ASF GitHub Bot logged work on BEAM-4707:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jul/18 08:33
            Start Date: 03/Jul/18 08:33
    Worklog Time Spent: 10m 
      Work Description: lgajowy closed pull request #5855: [BEAM-4707] Add timestamp field
to Nexmark tables and use explicit mode in table name
URL: https://github.com/apache/beam/pull/5855
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
index afc38442095..ccb52138bce 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java
@@ -99,7 +99,7 @@ void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) throws IOException
         }
       }
       if (options.getExportSummaryToBigQuery()) {
-        savePerfsToBigQuery(options, actual, null);
+        savePerfsToBigQuery(options, actual, null, start);
       }
     } finally {
       if (options.getMonitorJobs()) {
@@ -117,7 +117,8 @@ void runAll(OptionT options, NexmarkLauncher nexmarkLauncher) throws IOException
   static void savePerfsToBigQuery(
       NexmarkOptions options,
       Map<NexmarkConfiguration, NexmarkPerf> perfs,
-      @Nullable BigQueryServices testBigQueryServices) {
+      @Nullable BigQueryServices testBigQueryServices,
+      Instant start) {
     Pipeline pipeline = Pipeline.create(options);
     PCollection<KV<NexmarkConfiguration, NexmarkPerf>> perfsPCollection =
         pipeline.apply(
@@ -145,6 +146,7 @@ public NexmarkPerf decode(InputStream inStream)
         new TableSchema()
             .setFields(
                 ImmutableList.of(
+                    new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
                     new TableFieldSchema().setName("runtimeSec").setType("FLOAT"),
                     new TableFieldSchema().setName("eventsPerSec").setType("FLOAT"),
                     new TableFieldSchema().setName("numResults").setType("INTEGER")));
@@ -162,6 +164,7 @@ public NexmarkPerf decode(InputStream inStream)
           NexmarkPerf nexmarkPerf = input.getValue();
           TableRow row =
               new TableRow()
+                  .set("timestamp", start.getMillis())
                   .set("runtimeSec", nexmarkPerf.runtimeSec)
                   .set("eventsPerSec", nexmarkPerf.eventsPerSec)
                   .set("numResults", nexmarkPerf.numResults);
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
index 796e08a19c0..cd01cca3149 100644
--- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -178,7 +178,7 @@ static String tableSpec(NexmarkOptions options, String queryName, long
now, Stri
                 baseTableName,
                 queryName,
                 options.getRunner().getSimpleName(),
-                options.isStreaming(),
+                options.isStreaming() ? "streaming" : "batch",
                 version)
             : String.format(
                 "%s:%s.%s_%s_%s_%s",
@@ -187,7 +187,7 @@ static String tableSpec(NexmarkOptions options, String queryName, long
now, Stri
                 baseTableName,
                 queryName,
                 options.getRunner().getSimpleName(),
-                options.isStreaming());
+                options.isStreaming() ? "streaming" : "batch");
     }
     throw new RuntimeException("Unrecognized enum " + options.getResourceNameMode());
   }
diff --git a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
index 6ab056ad9db..7fb4b4ce806 100644
--- a/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
+++ b/sdks/java/nexmark/src/test/java/org/apache/beam/sdk/nexmark/PerfsToBigQueryTest.java
@@ -33,6 +33,7 @@
 import org.apache.beam.sdk.io.gcp.bigquery.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.FakeJobService;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -91,7 +92,8 @@ public void testSavePerfsToBigQuery() throws IOException, InterruptedException
{
     HashMap<NexmarkConfiguration, NexmarkPerf> perfs = new HashMap<>(2);
     perfs.put(nexmarkConfiguration1, nexmarkPerf1);
     perfs.put(nexmarkConfiguration2, nexmarkPerf2);
-    Main.savePerfsToBigQuery(options, perfs, fakeBqServices);
+    Instant start = Instant.now();
+    Main.savePerfsToBigQuery(options, perfs, fakeBqServices, start);
 
     String tableSpec = NexmarkUtils.tableSpec(options, String.valueOf(QUERY), 0L, null);
     List<TableRow> actualRows =
@@ -103,6 +105,7 @@ public void testSavePerfsToBigQuery() throws IOException, InterruptedException
{
     List<TableRow> expectedRows = new ArrayList<>();
     TableRow row1 =
         new TableRow()
+            .set("timestamp", start.getMillis())
             .set("runtimeSec", nexmarkPerf1.runtimeSec)
             .set("eventsPerSec", nexmarkPerf1.eventsPerSec)
             // when read using TableRowJsonCoder the row field is boxed into an Integer,
cast it to int
@@ -111,6 +114,7 @@ public void testSavePerfsToBigQuery() throws IOException, InterruptedException
{
     expectedRows.add(row1);
     TableRow row2 =
         new TableRow()
+            .set("timestamp", start.getMillis())
             .set("runtimeSec", nexmarkPerf2.runtimeSec)
             .set("eventsPerSec", nexmarkPerf2.eventsPerSec)
             // when read using TableRowJsonCoder the row field is boxed into an Integer,
cast it to int


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 118622)
    Time Spent: 2h  (was: 1h 50m)

> Change fields and table names in nexmark perfkit tables
> -------------------------------------------------------
>
>                 Key: BEAM-4707
>                 URL: https://issues.apache.org/jira/browse/BEAM-4707
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Nexmark BQ tables for perfkit lack timestamp field. Also the table name contains a boolean
than shows the mode of execution. It would be better to have batch or streaming label in place
of the boolean.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message