flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/6] flink git commit: [tests] Remove subsumes low-level API tests for broadcast variabes
Date Wed, 01 Jul 2015 15:56:02 GMT
Repository: flink
Updated Branches:
  refs/heads/master a9dc4307d -> a137321ac


[tests] Remove subsumes low-level API tests for broadcast variabes

 - BroadcastVarsNepheleITCase is subsumed by BroadcastBranchingITCase
 - KMeansIterativeNepheleITCase is subsumed by the Java/Scala API KMeans example programs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b975058
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b975058
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b975058

Branch: refs/heads/master
Commit: 1b9750588a1ef0c4767ebad8aafaf50e67f5e7ee
Parents: a9dc430
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Jun 14 15:26:53 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jul 1 16:09:00 2015 +0200

----------------------------------------------------------------------
 .../BroadcastVarsNepheleITCase.java             | 340 -------------------
 .../KMeansIterativeNepheleITCase.java           | 324 ------------------
 2 files changed, 664 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b975058/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
deleted file mode 100644
index 9366058..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.broadcastvars;
-
-import java.io.BufferedReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
-
-	private static final long SEED_POINTS = 0xBADC0FFEEBEEFL;
-
-	private static final long SEED_MODELS = 0x39134230AFF32L;
-
-	private static final int NUM_POINTS = 10000;
-
-	private static final int NUM_MODELS = 42;
-
-	private static final int NUM_FEATURES = 3;
-
-	private static final int parallelism = 4;
-
-	protected String pointsPath;
-
-	protected String modelsPath;
-
-	protected String resultPath;
-
-	public BroadcastVarsNepheleITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-
-	public static final String getInputPoints(int numPoints, int numDimensions, long seed) {
-		if (numPoints < 1 || numPoints > 1000000)
-			throw new IllegalArgumentException();
-
-		Random r = new Random();
-
-		StringBuilder bld = new StringBuilder(3 * (1 + numDimensions) * numPoints);
-		for (int i = 1; i <= numPoints; i++) {
-			bld.append(i);
-			bld.append(' ');
-
-			r.setSeed(seed + 1000 * i);
-			for (int j = 1; j <= numDimensions; j++) {
-				bld.append(r.nextInt(1000));
-				bld.append(' ');
-			}
-			bld.append('\n');
-		}
-		return bld.toString();
-	}
-
-	public static final String getInputModels(int numModels, int numDimensions, long seed) {
-		if (numModels < 1 || numModels > 100)
-			throw new IllegalArgumentException();
-
-		Random r = new Random();
-
-		StringBuilder bld = new StringBuilder(3 * (1 + numDimensions) * numModels);
-		for (int i = 1; i <= numModels; i++) {
-			bld.append(i);
-			bld.append(' ');
-
-			r.setSeed(seed + 1000 * i);
-			for (int j = 1; j <= numDimensions; j++) {
-				bld.append(r.nextInt(100));
-				bld.append(' ');
-			}
-			bld.append('\n');
-		}
-		return bld.toString();
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		this.pointsPath = createTempFile("points.txt", getInputPoints(NUM_POINTS, NUM_FEATURES,
SEED_POINTS));
-		this.modelsPath = createTempFile("models.txt", getInputModels(NUM_MODELS, NUM_FEATURES,
SEED_MODELS));
-		this.resultPath = getTempFilePath("results");
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, parallelism);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		final Random randPoints = new Random();
-		final Random randModels = new Random();
-		final Pattern p = Pattern.compile("(\\d+) (\\d+) (\\d+)");
-		
-		long [][] results = new long[NUM_POINTS][NUM_MODELS];
-		boolean [][] occurs = new boolean[NUM_POINTS][NUM_MODELS];
-		for (int i = 0; i < NUM_POINTS; i++) {
-			for (int j = 0; j < NUM_MODELS; j++) {
-				long actDotProd = 0;
-				randPoints.setSeed(SEED_POINTS + 1000 * (i+1));
-				randModels.setSeed(SEED_MODELS + 1000 * (j+1));
-				for (int z = 1; z <= NUM_FEATURES; z++) {
-					actDotProd += randPoints.nextInt(1000) * randModels.nextInt(100);
-				}
-				results[i][j] = actDotProd;
-				occurs[i][j] = false;
-			}
-		}
-
-		for (BufferedReader reader : getResultReader(this.resultPath)) {
-			String line = null;
-			while (null != (line = reader.readLine())) {
-				final Matcher m = p.matcher(line);
-				Assert.assertTrue(m.matches());
-
-				int modelId = Integer.parseInt(m.group(1));
-				int pointId = Integer.parseInt(m.group(2));
-				long expDotProd = Long.parseLong(m.group(3));
-
-				Assert.assertFalse("Dot product for record (" + pointId + ", " + modelId + ") occurs
more than once", occurs[pointId-1][modelId-1]);
-				Assert.assertEquals(String.format("Bad product for (%04d, %04d)", pointId, modelId),
expDotProd, results[pointId-1][modelId-1]);
-
-				occurs[pointId-1][modelId-1] = true;
-			}
-		}
-
-		for (int i = 0; i < NUM_POINTS; i++) {
-			for (int j = 0; j < NUM_MODELS; j++) {
-				Assert.assertTrue("Dot product for record (" + (i+1) + ", " + (j+1) + ") does not occur",
occurs[i][j]);
-			}
-		}
-	}
-
-	// -------------------------------------------------------------------------------------------------------------
-	// UDFs
-	// -------------------------------------------------------------------------------------------------------------
-
-	public static final class DotProducts extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		private final Record result = new Record(3);
-
-		private final LongValue lft = new LongValue();
-
-		private final LongValue rgt = new LongValue();
-
-		private final LongValue prd = new LongValue();
-
-		private Collection<Record> models;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			List<Record> shared = this.getRuntimeContext().getBroadcastVariable("models");
-			this.models = new ArrayList<Record>(shared.size());
-			synchronized (shared) {
-				for (Record r : shared) {
-					this.models.add(r.createCopy());
-				}
-			}
-		}
-
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-
-			for (Record model : this.models) {
-				// compute dot product between model and pair
-				long product = 0;
-				for (int i = 1; i <= NUM_FEATURES; i++) {
-					product += model.getField(i, this.lft).getValue() * record.getField(i, this.rgt).getValue();
-				}
-				this.prd.setValue(product);
-
-				// construct result
-				this.result.copyFrom(model, new int[] { 0 }, new int[] { 0 });
-				this.result.copyFrom(record, new int[] { 0 }, new int[] { 1 });
-				this.result.setField(2, this.prd);
-
-				// emit result
-				out.collect(this.result);
-			}
-		}
-	}
-
-	// -------------------------------------------------------------------------------------------------------------
-	// Job vertex builder methods
-	// -------------------------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	private static InputFormatVertex createPointsInput(JobGraph jobGraph, String pointsPath,
int numSubTasks, TypeSerializerFactory<?> serializer) {
-		CsvInputFormat pointsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class,
LongValue.class, LongValue.class);
-		InputFormatVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]",
jobGraph, numSubTasks);
-
-		{
-			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
-			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			taskConfig.setOutputSerializer(serializer);
-		}
-
-		return pointsInput;
-	}
-
-	@SuppressWarnings("unchecked")
-	private static InputFormatVertex createModelsInput(JobGraph jobGraph, String pointsPath,
int numSubTasks, TypeSerializerFactory<?> serializer) {
-		CsvInputFormat modelsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class,
LongValue.class, LongValue.class);
-		InputFormatVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]",
jobGraph, numSubTasks);
-
-		{
-			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
-			taskConfig.addOutputShipStrategy(ShipStrategyType.BROADCAST);
-			taskConfig.setOutputSerializer(serializer);
-		}
-
-		return modelsInput;
-	}
-
-	private static JobVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?>
serializer) {
-		JobVertex pointsInput = JobGraphUtils.createTask(RegularPactTask.class, "Map[DotProducts]",
jobGraph, numSubTasks);
-
-		{
-			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
-
-			taskConfig.setStubWrapper(new UserCodeClassWrapper<DotProducts>(DotProducts.class));
-			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			taskConfig.setOutputSerializer(serializer);
-			taskConfig.setDriver(CollectorMapDriver.class);
-			taskConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-
-			taskConfig.addInputToGroup(0);
-			taskConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
-			taskConfig.setInputSerializer(serializer, 0);
-
-			taskConfig.setBroadcastInputName("models", 0);
-			taskConfig.addBroadcastInputToGroup(0);
-			taskConfig.setBroadcastInputSerializer(serializer, 0);
-		}
-
-		return pointsInput;
-	}
-
-	private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int
numSubTasks, TypeSerializerFactory<?> serializer) {
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
-
-		{
-			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
-			taskConfig.addInputToGroup(0);
-			taskConfig.setInputSerializer(serializer, 0);
-
-			@SuppressWarnings("unchecked")
-			CsvOutputFormat outFormat = new CsvOutputFormat("\n", " ", LongValue.class, LongValue.class,
LongValue.class);
-			outFormat.setOutputFilePath(new Path(resultPath));
-			
-			taskConfig.setStubWrapper(new UserCodeObjectWrapper<CsvOutputFormat>(outFormat));
-		}
-
-		return output;
-	}
-
-	// -------------------------------------------------------------------------------------------------------------
-	// Unified solution set and workset tail update
-	// -------------------------------------------------------------------------------------------------------------
-
-	private JobGraph createJobGraphV1(String pointsPath, String centersPath, String resultPath,
int numSubTasks) {
-
-		// -- init -------------------------------------------------------------------------------------------------
-		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
-
-		JobGraph jobGraph = new JobGraph("Distance Builder");
-
-		// -- vertices ---------------------------------------------------------------------------------------------
-		InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
-		InputFormatVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer);
-		JobVertex mapper = createMapper(jobGraph, numSubTasks, serializer);
-		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-
-		// -- edges ------------------------------------------------------------------------------------------------
-		JobGraphUtils.connect(points, mapper, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(models, mapper, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(mapper, output, DistributionPattern.POINTWISE);
-
-		// -- instance sharing -------------------------------------------------------------------------------------
-		
-		SlotSharingGroup sharing = new SlotSharingGroup();
-		
-		points.setSlotSharingGroup(sharing);
-		models.setSlotSharingGroup(sharing);
-		mapper.setSlotSharingGroup(sharing);
-		output.setSlotSharingGroup(sharing);
-
-		return jobGraph;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b975058/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
deleted file mode 100644
index 61ba59a..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.broadcastvars;
-
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.NoOpDriver;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.nephele.JobGraphUtils;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast.PointBuilder;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast.PointOutFormat;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast.RecomputeClusterCenter;
-import org.apache.flink.test.recordJobs.kmeans.KMeansBroadcast.SelectNearestCenter;
-import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-
-public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
-
-	private static final int ITERATION_ID = 42;
-	
-	private static final int MEMORY_PER_CONSUMER = 2;
-
-	private static final int parallelism = 4;
-
-	private static final double MEMORY_FRACTION_PER_CONSUMER = (double)MEMORY_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
-
-	protected String dataPath;
-	protected String clusterPath;
-	protected String resultPath;
-
-	
-	public KMeansIterativeNepheleITCase() {
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		dataPath = createTempFile("datapoints.txt", KMeansData.DATAPOINTS);
-		clusterPath = createTempFile("initial_centers.txt", KMeansData.INITIAL_CENTERS);
-		resultPath = getTempDirPath("result");
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(KMeansData.CENTERS_AFTER_20_ITERATIONS_SINGLE_DIGIT, resultPath);
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraph(dataPath, clusterPath, this.resultPath, parallelism, 20);
-	}
-
-	// -------------------------------------------------------------------------------------------------------------
-	// Job vertex builder methods
-	// -------------------------------------------------------------------------------------------------------------
-
-	private static InputFormatVertex createPointsInput(JobGraph jobGraph, String pointsPath,
int numSubTasks, TypeSerializerFactory<?> serializer) {
-		@SuppressWarnings("unchecked")
-		CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class,
DoubleValue.class, DoubleValue.class);
-		InputFormatVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]",
jobGraph, numSubTasks);
-		{
-			TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration());
-			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			taskConfig.setOutputSerializer(serializer);
-			
-			TaskConfig chainedMapper = new TaskConfig(new Configuration());
-			chainedMapper.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			chainedMapper.setStubWrapper(new UserCodeObjectWrapper<PointBuilder>(new PointBuilder()));
-			chainedMapper.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			chainedMapper.setOutputSerializer(serializer);
-			
-			taskConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapper, "Build points");
-		}
-
-		return pointsInput;
-	}
-
-	private static InputFormatVertex createCentersInput(JobGraph jobGraph, String centersPath,
int numSubTasks, TypeSerializerFactory<?> serializer) {
-		@SuppressWarnings("unchecked")
-		CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class,
DoubleValue.class, DoubleValue.class);
-		InputFormatVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath,
"[Models]", jobGraph, numSubTasks);
-
-		{
-			TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration());
-			taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			taskConfig.setOutputSerializer(serializer);
-
-			TaskConfig chainedMapper = new TaskConfig(new Configuration());
-			chainedMapper.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			chainedMapper.setStubWrapper(new UserCodeObjectWrapper<PointBuilder>(new PointBuilder()));
-			chainedMapper.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			chainedMapper.setOutputSerializer(serializer);
-			
-			taskConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapper, "Build centers");
-		}
-
-		return modelsInput;
-	}
-
-	private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int
numSubTasks, TypeSerializerFactory<?> serializer) {
-		
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
-
-		{
-			TaskConfig taskConfig = new TaskConfig(output.getConfiguration());
-			taskConfig.addInputToGroup(0);
-			taskConfig.setInputSerializer(serializer, 0);
-
-			PointOutFormat outFormat = new PointOutFormat();
-			outFormat.setOutputFilePath(new Path(resultPath));
-			
-			taskConfig.setStubWrapper(new UserCodeObjectWrapper<PointOutFormat>(outFormat));
-		}
-
-		return output;
-	}
-	
-	private static JobVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?>
serializer) {
-		JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head",
jobGraph, numSubTasks);
-
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		headConfig.setIterationId(ITERATION_ID);
-		
-		// initial input / partial solution
-		headConfig.addInputToGroup(0);
-		headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-		headConfig.setInputSerializer(serializer, 0);
-		
-		// back channel / iterations
-		headConfig.setRelativeBackChannelMemory(MEMORY_FRACTION_PER_CONSUMER);
-		
-		// output into iteration. broadcasting the centers
-		headConfig.setOutputSerializer(serializer);
-		headConfig.addOutputShipStrategy(ShipStrategyType.BROADCAST);
-		
-		// final output
-		TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
-		headFinalOutConfig.setOutputSerializer(serializer);
-		headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-		
-		// the sync
-		headConfig.setIterationHeadIndexOfSyncOutput(2);
-		
-		// the driver 
-		headConfig.setDriver(NoOpDriver.class);
-		headConfig.setDriverStrategy(DriverStrategy.UNARY_NO_OP);
-		
-		return head;
-	}
-	
-	private static JobVertex createMapper(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?>
inputSerializer,
-			TypeSerializerFactory<?> broadcastVarSerializer, TypeSerializerFactory<?>
outputSerializer,
-			TypeComparatorFactory<?> outputComparator)
-	{
-		JobVertex mapper = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Map (Select nearest center)", jobGraph, numSubTasks);
-		
-		TaskConfig intermediateConfig = new TaskConfig(mapper.getConfiguration());
-		intermediateConfig.setIterationId(ITERATION_ID);
-		
-		intermediateConfig.setDriver(CollectorMapDriver.class);
-		intermediateConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-		intermediateConfig.addInputToGroup(0);
-		intermediateConfig.setInputSerializer(inputSerializer, 0);
-		
-		intermediateConfig.setOutputSerializer(outputSerializer);
-		intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-		intermediateConfig.setOutputComparator(outputComparator, 0);
-
-		intermediateConfig.setBroadcastInputName("centers", 0);
-		intermediateConfig.addBroadcastInputToGroup(0);
-		intermediateConfig.setBroadcastInputSerializer(broadcastVarSerializer, 0);
-		
-		// the udf
-		intermediateConfig.setStubWrapper(new UserCodeObjectWrapper<SelectNearestCenter>(new
SelectNearestCenter()));
-		
-		return mapper;
-	}
-	
-	private static JobVertex createReducer(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?>
inputSerializer,
-			TypeComparatorFactory<?> inputComparator, TypeSerializerFactory<?> outputSerializer)
-	{
-		// ---------------- the tail (reduce) --------------------
-		
-		JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Reduce / Iteration
Tail", jobGraph,
-			numSubTasks);
-		
-		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
-		tailConfig.setIterationId(ITERATION_ID);
-		tailConfig.setIsWorksetUpdate();
-		
-		// inputs and driver
-		tailConfig.setDriver(GroupReduceDriver.class);
-		tailConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-		tailConfig.addInputToGroup(0);
-		tailConfig.setInputSerializer(inputSerializer, 0);		
-		tailConfig.setDriverComparator(inputComparator, 0);
-
-		tailConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-		tailConfig.setInputComparator(inputComparator, 0);
-		tailConfig.setRelativeMemoryInput(0, MEMORY_FRACTION_PER_CONSUMER);
-		tailConfig.setFilehandlesInput(0, 128);
-		tailConfig.setSpillingThresholdInput(0, 0.9f);
-		
-		// output
-		tailConfig.setOutputSerializer(outputSerializer);
-		
-		// the udf
-		tailConfig.setStubWrapper(new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingReduceFunction(new
RecomputeClusterCenter())));
-		
-		return tail;
-	}
-	
-	private static JobVertex createSync(JobGraph jobGraph, int numIterations, int parallelism)
{
-		JobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setNumberOfIterations(numIterations);
-		syncConfig.setIterationId(ITERATION_ID);
-		return sync;
-	}
-
-	// -------------------------------------------------------------------------------------------------------------
-	// Unified solution set and workset tail update
-	// -------------------------------------------------------------------------------------------------------------
-
-	private static JobGraph createJobGraph(String pointsPath, String centersPath, String resultPath,
int numSubTasks, int numIterations) {
-
-		// -- init -------------------------------------------------------------------------------------------------
-		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
-		@SuppressWarnings("unchecked")
-		final TypeComparatorFactory<?> int0Comparator = new RecordComparatorFactory(new int[]
{ 0 }, new Class[] { IntValue.class });
-
-		JobGraph jobGraph = new JobGraph("KMeans Iterative");
-
-		// -- vertices ---------------------------------------------------------------------------------------------
-		InputFormatVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer);
-		InputFormatVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer);
-		
-		JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer);
-		JobVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer,
int0Comparator);
-		
-		JobVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
-		
-		JobVertex sync = createSync(jobGraph, numIterations, numSubTasks);
-		
-		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-
-		// -- edges ------------------------------------------------------------------------------------------------
-		JobGraphUtils.connect(points, mapper, DistributionPattern.POINTWISE);
-		
-		JobGraphUtils.connect(centers, head, DistributionPattern.POINTWISE);
-		
-		JobGraphUtils.connect(head, mapper, DistributionPattern.ALL_TO_ALL);
-		new TaskConfig(mapper.getConfiguration()).setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(0,
numSubTasks);
-		new TaskConfig(mapper.getConfiguration()).setInputCached(0, true);
-		new TaskConfig(mapper.getConfiguration()).setRelativeInputMaterializationMemory(0,
-				MEMORY_FRACTION_PER_CONSUMER);
-
-		JobGraphUtils.connect(mapper, reducer, DistributionPattern.ALL_TO_ALL);
-		new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0,
numSubTasks);
-		
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-		
-		JobGraphUtils.connect(head, sync, DistributionPattern.ALL_TO_ALL);
-
-		// -- instance sharing -------------------------------------------------------------------------------------
-		
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		
-		points.setSlotSharingGroup(sharingGroup);
-		centers.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		mapper.setSlotSharingGroup(sharingGroup);
-		reducer.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		
-		mapper.setStrictlyCoLocatedWith(head);
-		reducer.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-}


Mime
View raw message