beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tobias Feldhaus <Tobias.Feldh...@localsearch.ch>
Subject Testing/Running a pipeline with a BigQuery Sink locally with the DirectRunner
Date Fri, 17 Feb 2017 10:36:17 GMT
Hello,

could it be, that it's no longer possible to run pipelines with a BigQuery sink 
locally on the dev machine? I migrated a "Read JSON from GCS, parse and 
write to BQ" pipeline to Apache Beam 0.5.0 from the Dataflow SDK. 
All tests are green, the pipeline runs successfully on the Dataflow service with 
the test files, but locally with the DirectRunner I get a NPE.

It happens right after I create the TableRow element which I even double 
checked not to be null. Even when I artificially create a LogLine 
element in this step without taking the one from the input the NPE is thrown:


static class Outputter extends DoFn<LogLine, TableRow> {
(...)
	LogLine logLine = c.element();

	TableRow tableRow = logLine.toTableRow();
	tableRow.set("ts", c.timestamp().toString());

	if (c != null && tableRow != null){
	    try {

	        c.output(tableRow);
	    }
	    catch(NullPointerException e){
	        LOG.error("catched NPE");
	        e.printStackTrace();
	    }
	}

The corrensponding Stacktrace looks like this:

ERROR: catched NPE
java.lang.NullPointerException
	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
	at java.util.Arrays.hashCode(Arrays.java:4146)
	at java.util.Objects.hash(Objects.java:128)
	at org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow.hashCode(WindowedValue.java:409)
	at java.util.HashMap.hash(HashMap.java:338)
	at java.util.HashMap.get(HashMap.java:556)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:193)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:128)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:49)
	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:198)
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:352)
	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:553)
	at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter.processElement(FrontendPipeline.java:181)
	at ch.localsearch.dataintel.logfiles.FrontendPipeline$Outputter$auxiliary$sxgOpc6N.invokeProcessElement(Unknown
Source)
	at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
	at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:161)
	at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElement(PushbackSideInputDoFnRunner.java:111)
	at org.apache.beam.runners.core.PushbackSideInputDoFnRunner.processElementInReadyWindows(PushbackSideInputDoFnRunner.java:77)
	at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:134)
	at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:51)
	at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
	at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Best,
Tobias

Mime
View raw message