Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E20F6200CFC for ; Wed, 23 Aug 2017 19:09:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DF93D1638BE; Wed, 23 Aug 2017 17:09:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F0EA016328F for ; Wed, 23 Aug 2017 19:09:16 +0200 (CEST) Received: (qmail 27469 invoked by uid 500); 23 Aug 2017 17:09:15 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 23956 invoked by uid 99); 23 Aug 2017 17:09:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 17:09:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9280BF5EEB; Wed, 23 Aug 2017 17:09:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: iemejia@apache.org To: commits@beam.apache.org Date: Wed, 23 Aug 2017 17:09:13 -0000 Message-Id: <03afac225f7e47e2bbc92ae41aec267a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/55] [abbrv] beam git commit: NexMark archived-at: Wed, 23 Aug 2017 17:09:20 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java new file mode 100644 index 0000000..98f4f00 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Generator.java @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.values.TimestampedValue; +import com.google.common.base.Preconditions; + +import org.joda.time.Instant; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +/** + * A generator for synthetic events. We try to make the data vaguely reasonable. We also ensure + * most primary key/foreign key relations are correct. Eg: a {@link Bid} event will usually have + * valid auction and bidder ids which can be joined to already-generated Auction and Person events. + * + *

To help with testing, we generate timestamps relative to a given {@code baseTime}. Each new + * event is given a timestamp advanced from the previous timestamp by {@code interEventDelayUs} + * (in microseconds). The event stream is thus fully deterministic and does not depend on + * wallclock time. + * + *

This class implements {@link org.apache.beam.sdk.io.UnboundedSource.CheckpointMark} + * so that we can resume generating events from a saved snapshot. + */ +public class Generator implements Iterator>, Serializable { + /** + * Keep the number of categories small so the example queries will find results even with + * a small batch of events. + */ + private static final int NUM_CATEGORIES = 5; + + /** Smallest random string size. */ + private static final int MIN_STRING_LENGTH = 3; + + /** + * Keep the number of states small so that the example queries will find results even with + * a small batch of events. + */ + private static final List US_STATES = Arrays.asList(("AZ,CA,ID,OR,WA,WY").split(",")); + + private static final List US_CITIES = + Arrays.asList( + ("Phoenix,Los Angeles,San Francisco,Boise,Portland,Bend,Redmond,Seattle,Kent,Cheyenne") + .split(",")); + + private static final List FIRST_NAMES = + Arrays.asList(("Peter,Paul,Luke,John,Saul,Vicky,Kate,Julie,Sarah,Deiter,Walter").split(",")); + + private static final List LAST_NAMES = + Arrays.asList(("Shultz,Abrams,Spencer,White,Bartels,Walton,Smith,Jones,Noris").split(",")); + + /** + * Number of yet-to-be-created people and auction ids allowed. + */ + private static final int PERSON_ID_LEAD = 10; + private static final int AUCTION_ID_LEAD = 10; + + /** + * Fraction of people/auctions which may be 'hot' sellers/bidders/auctions are 1 + * over these values. + */ + private static final int HOT_AUCTION_RATIO = 100; + private static final int HOT_SELLER_RATIO = 100; + private static final int HOT_BIDDER_RATIO = 100; + + /** + * Just enough state to be able to restore a generator back to where it was checkpointed. + */ + public static class Checkpoint implements UnboundedSource.CheckpointMark { + private static final Coder LONG_CODER = VarLongCoder.of(); + + /** Coder for this class. */ + public static final Coder CODER_INSTANCE = + new AtomicCoder() { + @Override + public void encode( + Checkpoint value, + OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.numEvents, outStream, Context.NESTED); + LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED); + } + + @Override + public Checkpoint decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long numEvents = LONG_CODER.decode(inStream, Context.NESTED); + long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED); + return new Checkpoint(numEvents, wallclockBaseTime); + } + }; + + private long numEvents; + private long wallclockBaseTime; + + private Checkpoint(long numEvents, long wallclockBaseTime) { + this.numEvents = numEvents; + this.wallclockBaseTime = wallclockBaseTime; + } + + public Generator toGenerator(GeneratorConfig config) { + return new Generator(config, numEvents, wallclockBaseTime); + } + + @Override + public void finalizeCheckpoint() throws IOException { + // Nothing to finalize. + } + + @Override + public String toString() { + return String.format("Generator.Checkpoint{numEvents:%d;wallclockBaseTime:%d}", + numEvents, wallclockBaseTime); + } + } + + /** + * The next event and its various timestamps. Ordered by increasing wallclock timestamp, then + * (arbitrary but stable) event hash order. + */ + public static class NextEvent implements Comparable { + /** When, in wallclock time, should this event be emitted? */ + public final long wallclockTimestamp; + + /** When, in event time, should this event be considered to have occured? */ + public final long eventTimestamp; + + /** The event itself. */ + public final Event event; + + /** The minimum of this and all future event timestamps. */ + public final long watermark; + + public NextEvent(long wallclockTimestamp, long eventTimestamp, Event event, long watermark) { + this.wallclockTimestamp = wallclockTimestamp; + this.eventTimestamp = eventTimestamp; + this.event = event; + this.watermark = watermark; + } + + /** + * Return a deep clone of next event with delay added to wallclock timestamp and + * event annotate as 'LATE'. + */ + public NextEvent withDelay(long delayMs) { + return new NextEvent( + wallclockTimestamp + delayMs, eventTimestamp, event.withAnnotation("LATE"), watermark); + } + + @Override + public int compareTo(NextEvent other) { + int i = Long.compare(wallclockTimestamp, other.wallclockTimestamp); + if (i != 0) { + return i; + } + return Integer.compare(event.hashCode(), other.event.hashCode()); + } + } + + /** + * Configuration to generate events against. Note that it may be replaced by a call to + * {@link #splitAtEventId}. + */ + private GeneratorConfig config; + + /** Number of events generated by this generator. */ + private long numEvents; + + /** + * Wallclock time at which we emitted the first event (ms since epoch). Initially -1. + */ + private long wallclockBaseTime; + + private Generator(GeneratorConfig config, long numEvents, long wallclockBaseTime) { + Preconditions.checkNotNull(config); + this.config = config; + this.numEvents = numEvents; + this.wallclockBaseTime = wallclockBaseTime; + } + + /** + * Create a fresh generator according to {@code config}. + */ + public Generator(GeneratorConfig config) { + this(config, 0, -1); + } + + /** + * Return a checkpoint for the current generator. + */ + public Checkpoint toCheckpoint() { + return new Checkpoint(numEvents, wallclockBaseTime); + } + + /** + * Return a deep clone of this generator. + */ + @Override + public Generator clone() { + return new Generator(config.clone(), numEvents, wallclockBaseTime); + } + + /** + * Return the current config for this generator. Note that configs may be replaced by {@link + * #splitAtEventId}. + */ + public GeneratorConfig getCurrentConfig() { + return config; + } + + /** + * Mutate this generator so that it will only generate events up to but not including + * {@code eventId}. Return a config to represent the events this generator will no longer yield. + * The generators will run in on a serial timeline. + */ + public GeneratorConfig splitAtEventId(long eventId) { + long newMaxEvents = eventId - (config.firstEventId + config.firstEventNumber); + GeneratorConfig remainConfig = config.cloneWith(config.firstEventId, + config.maxEvents - newMaxEvents, config.firstEventNumber + newMaxEvents); + config = config.cloneWith(config.firstEventId, newMaxEvents, config.firstEventNumber); + return remainConfig; + } + + /** + * Return the next 'event id'. Though events don't have ids we can simulate them to + * help with bookkeeping. + */ + public long getNextEventId() { + return config.firstEventId + config.nextAdjustedEventNumber(numEvents); + } + + /** + * Return the last valid person id (ignoring FIRST_PERSON_ID). Will be the current person id if + * due to generate a person. + */ + private long lastBase0PersonId() { + long eventId = getNextEventId(); + long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; + long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; + if (offset >= GeneratorConfig.PERSON_PROPORTION) { + // About to generate an auction or bid. + // Go back to the last person generated in this epoch. + offset = GeneratorConfig.PERSON_PROPORTION - 1; + } + // About to generate a person. + return epoch * GeneratorConfig.PERSON_PROPORTION + offset; + } + + /** + * Return the last valid auction id (ignoring FIRST_AUCTION_ID). Will be the current auction id if + * due to generate an auction. + */ + private long lastBase0AuctionId() { + long eventId = getNextEventId(); + long epoch = eventId / GeneratorConfig.PROPORTION_DENOMINATOR; + long offset = eventId % GeneratorConfig.PROPORTION_DENOMINATOR; + if (offset < GeneratorConfig.PERSON_PROPORTION) { + // About to generate a person. + // Go back to the last auction in the last epoch. + epoch--; + offset = GeneratorConfig.AUCTION_PROPORTION - 1; + } else if (offset >= GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { + // About to generate a bid. + // Go back to the last auction generated in this epoch. + offset = GeneratorConfig.AUCTION_PROPORTION - 1; + } else { + // About to generate an auction. + offset -= GeneratorConfig.PERSON_PROPORTION; + } + return epoch * GeneratorConfig.AUCTION_PROPORTION + offset; + } + + /** return a random US state. */ + private static String nextUSState(Random random) { + return US_STATES.get(random.nextInt(US_STATES.size())); + } + + /** Return a random US city. */ + private static String nextUSCity(Random random) { + return US_CITIES.get(random.nextInt(US_CITIES.size())); + } + + /** Return a random person name. */ + private static String nextPersonName(Random random) { + return FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size())) + " " + + LAST_NAMES.get(random.nextInt(LAST_NAMES.size())); + } + + /** Return a random string of up to {@code maxLength}. */ + private static String nextString(Random random, int maxLength) { + int len = MIN_STRING_LENGTH + random.nextInt(maxLength - MIN_STRING_LENGTH); + StringBuilder sb = new StringBuilder(); + while (len-- > 0) { + if (random.nextInt(13) == 0) { + sb.append(' '); + } else { + sb.append((char) ('a' + random.nextInt(26))); + } + } + return sb.toString().trim(); + } + + /** Return a random string of exactly {@code length}. */ + private static String nextExactString(Random random, int length) { + StringBuilder sb = new StringBuilder(); + while (length-- > 0) { + sb.append((char) ('a' + random.nextInt(26))); + } + return sb.toString(); + } + + /** Return a random email address. */ + private static String nextEmail(Random random) { + return nextString(random, 7) + "@" + nextString(random, 5) + ".com"; + } + + /** Return a random credit card number. */ + private static String nextCreditCard(Random random) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 4; i++) { + if (i > 0) { + sb.append(' '); + } + sb.append(String.format("%04d", random.nextInt(10000))); + } + return sb.toString(); + } + + /** Return a random price. */ + private static long nextPrice(Random random) { + return Math.round(Math.pow(10.0, random.nextDouble() * 6.0) * 100.0); + } + + /** Return a random time delay, in milliseconds, for length of auctions. */ + private long nextAuctionLengthMs(Random random, long timestamp) { + // What's our current event number? + long currentEventNumber = config.nextAdjustedEventNumber(numEvents); + // How many events till we've generated numInFlightAuctions? + long numEventsForAuctions = + (config.configuration.numInFlightAuctions * GeneratorConfig.PROPORTION_DENOMINATOR) + / GeneratorConfig.AUCTION_PROPORTION; + // When will the auction numInFlightAuctions beyond now be generated? + long futureAuction = + config.timestampAndInterEventDelayUsForEvent(currentEventNumber + numEventsForAuctions) + .getKey(); + // System.out.printf("*** auction will be for %dms (%d events ahead) ***\n", + // futureAuction - timestamp, numEventsForAuctions); + // Choose a length with average horizonMs. + long horizonMs = futureAuction - timestamp; + return 1L + nextLong(random, Math.max(horizonMs * 2, 1L)); + } + + /** + * Return a random {@code string} such that {@code currentSize + string.length()} is on average + * {@code averageSize}. + */ + private static String nextExtra(Random random, int currentSize, int desiredAverageSize) { + if (currentSize > desiredAverageSize) { + return ""; + } + desiredAverageSize -= currentSize; + int delta = (int) Math.round(desiredAverageSize * 0.2); + int minSize = desiredAverageSize - delta; + int desiredSize = minSize + (delta == 0 ? 0 : random.nextInt(2 * delta)); + return nextExactString(random, desiredSize); + } + + /** Return a random long from {@code [0, n)}. */ + private static long nextLong(Random random, long n) { + if (n < Integer.MAX_VALUE) { + return random.nextInt((int) n); + } else { + // TODO: Very skewed distribution! Bad! + return Math.abs(random.nextLong()) % n; + } + } + + /** + * Generate and return a random person with next available id. + */ + private Person nextPerson(Random random, long timestamp) { + long id = lastBase0PersonId() + GeneratorConfig.FIRST_PERSON_ID; + String name = nextPersonName(random); + String email = nextEmail(random); + String creditCard = nextCreditCard(random); + String city = nextUSCity(random); + String state = nextUSState(random); + int currentSize = + 8 + name.length() + email.length() + creditCard.length() + city.length() + state.length(); + String extra = nextExtra(random, currentSize, config.configuration.avgPersonByteSize); + return new Person(id, name, email, creditCard, city, state, timestamp, extra); + } + + /** + * Return a random person id (base 0). + */ + private long nextBase0PersonId(Random random) { + // Choose a random person from any of the 'active' people, plus a few 'leads'. + // By limiting to 'active' we ensure the density of bids or auctions per person + // does not decrease over time for long running jobs. + // By choosing a person id ahead of the last valid person id we will make + // newPerson and newAuction events appear to have been swapped in time. + long numPeople = lastBase0PersonId() + 1; + long activePeople = Math.min(numPeople, config.configuration.numActivePeople); + long n = nextLong(random, activePeople + PERSON_ID_LEAD); + return numPeople - activePeople + n; + } + + /** + * Return a random auction id (base 0). + */ + private long nextBase0AuctionId(Random random) { + // Choose a random auction for any of those which are likely to still be in flight, + // plus a few 'leads'. + // Note that ideally we'd track non-expired auctions exactly, but that state + // is difficult to split. + long minAuction = Math.max(lastBase0AuctionId() - config.configuration.numInFlightAuctions, 0); + long maxAuction = lastBase0AuctionId(); + return minAuction + nextLong(random, maxAuction - minAuction + 1 + AUCTION_ID_LEAD); + } + + /** + * Generate and return a random auction with next available id. + */ + private Auction nextAuction(Random random, long timestamp) { + long id = lastBase0AuctionId() + GeneratorConfig.FIRST_AUCTION_ID; + + long seller; + // Here P(auction will be for a hot seller) = 1 - 1/hotSellersRatio. + if (random.nextInt(config.configuration.hotSellersRatio) > 0) { + // Choose the first person in the batch of last HOT_SELLER_RATIO people. + seller = (lastBase0PersonId() / HOT_SELLER_RATIO) * HOT_SELLER_RATIO; + } else { + seller = nextBase0PersonId(random); + } + seller += GeneratorConfig.FIRST_PERSON_ID; + + long category = GeneratorConfig.FIRST_CATEGORY_ID + random.nextInt(NUM_CATEGORIES); + long initialBid = nextPrice(random); + long dateTime = timestamp; + long expires = timestamp + nextAuctionLengthMs(random, timestamp); + String name = nextString(random, 20); + String desc = nextString(random, 100); + long reserve = initialBid + nextPrice(random); + int currentSize = 8 + name.length() + desc.length() + 8 + 8 + 8 + 8 + 8; + String extra = nextExtra(random, currentSize, config.configuration.avgAuctionByteSize); + return new Auction(id, name, desc, initialBid, reserve, dateTime, expires, seller, category, + extra); + } + + /** + * Generate and return a random bid with next available id. + */ + private Bid nextBid(Random random, long timestamp) { + long auction; + // Here P(bid will be for a hot auction) = 1 - 1/hotAuctionRatio. + if (random.nextInt(config.configuration.hotAuctionRatio) > 0) { + // Choose the first auction in the batch of last HOT_AUCTION_RATIO auctions. + auction = (lastBase0AuctionId() / HOT_AUCTION_RATIO) * HOT_AUCTION_RATIO; + } else { + auction = nextBase0AuctionId(random); + } + auction += GeneratorConfig.FIRST_AUCTION_ID; + + long bidder; + // Here P(bid will be by a hot bidder) = 1 - 1/hotBiddersRatio + if (random.nextInt(config.configuration.hotBiddersRatio) > 0) { + // Choose the second person (so hot bidders and hot sellers don't collide) in the batch of + // last HOT_BIDDER_RATIO people. + bidder = (lastBase0PersonId() / HOT_BIDDER_RATIO) * HOT_BIDDER_RATIO + 1; + } else { + bidder = nextBase0PersonId(random); + } + bidder += GeneratorConfig.FIRST_PERSON_ID; + + long price = nextPrice(random); + int currentSize = 8 + 8 + 8 + 8; + String extra = nextExtra(random, currentSize, config.configuration.avgBidByteSize); + return new Bid(auction, bidder, price, timestamp, extra); + } + + @Override + public boolean hasNext() { + return numEvents < config.maxEvents; + } + + /** + * Return the next event. The outer timestamp is in wallclock time and corresponds to + * when the event should fire. The inner timestamp is in event-time and represents the + * time the event is purported to have taken place in the simulation. + */ + public NextEvent nextEvent() { + if (wallclockBaseTime < 0) { + wallclockBaseTime = System.currentTimeMillis(); + } + // When, in event time, we should generate the event. Monotonic. + long eventTimestamp = + config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)).getKey(); + // When, in event time, the event should say it was generated. Depending on outOfOrderGroupSize + // may have local jitter. + long adjustedEventTimestamp = + config.timestampAndInterEventDelayUsForEvent(config.nextAdjustedEventNumber(numEvents)) + .getKey(); + // The minimum of this and all future adjusted event timestamps. Accounts for jitter in + // the event timestamp. + long watermark = + config.timestampAndInterEventDelayUsForEvent(config.nextEventNumberForWatermark(numEvents)) + .getKey(); + // When, in wallclock time, we should emit the event. + long wallclockTimestamp = wallclockBaseTime + (eventTimestamp - getCurrentConfig().baseTime); + + // Seed the random number generator with the next 'event id'. + Random random = new Random(getNextEventId()); + long rem = getNextEventId() % GeneratorConfig.PROPORTION_DENOMINATOR; + + Event event; + if (rem < GeneratorConfig.PERSON_PROPORTION) { + event = new Event(nextPerson(random, adjustedEventTimestamp)); + } else if (rem < GeneratorConfig.PERSON_PROPORTION + GeneratorConfig.AUCTION_PROPORTION) { + event = new Event(nextAuction(random, adjustedEventTimestamp)); + } else { + event = new Event(nextBid(random, adjustedEventTimestamp)); + } + + numEvents++; + return new NextEvent(wallclockTimestamp, adjustedEventTimestamp, event, watermark); + } + + @Override + public TimestampedValue next() { + NextEvent next = nextEvent(); + return TimestampedValue.of(next.event, new Instant(next.eventTimestamp)); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Return how many microseconds till we emit the next event. + */ + public long currentInterEventDelayUs() { + return config.timestampAndInterEventDelayUsForEvent(config.nextEventNumber(numEvents)) + .getValue(); + } + + /** + * Return an estimate of fraction of output consumed. + */ + public double getFractionConsumed() { + return (double) numEvents / config.maxEvents; + } + + @Override + public String toString() { + return String.format("Generator{config:%s; numEvents:%d; wallclockBaseTime:%d}", config, + numEvents, wallclockBaseTime); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java new file mode 100644 index 0000000..59aaf49 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/GeneratorConfig.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.values.KV; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * Parameters controlling how {@link Generator} synthesizes {@link Event} elements. + */ +class GeneratorConfig implements Serializable { + /** + * We start the ids at specific values to help ensure the queries find a match even on + * small synthesized dataset sizes. + */ + public static final long FIRST_AUCTION_ID = 1000L; + public static final long FIRST_PERSON_ID = 1000L; + public static final long FIRST_CATEGORY_ID = 10L; + + /** + * Proportions of people/auctions/bids to synthesize. + */ + public static final int PERSON_PROPORTION = 1; + public static final int AUCTION_PROPORTION = 3; + public static final int BID_PROPORTION = 46; + public static final int PROPORTION_DENOMINATOR = + PERSON_PROPORTION + AUCTION_PROPORTION + BID_PROPORTION; + + /** + * Environment options. + */ + public final NexmarkConfiguration configuration; + + /** + * Delay between events, in microseconds. If the array has more than one entry then + * the rate is changed every {@link #stepLengthSec}, and wraps around. + */ + public final long[] interEventDelayUs; + + /** + * Delay before changing the current inter-event delay. + */ + public final long stepLengthSec; + + /** + * Time for first event (ms since epoch). + */ + public final long baseTime; + + /** + * Event id of first event to be generated. Event ids are unique over all generators, and + * are used as a seed to generate each event's data. + */ + public final long firstEventId; + + /** + * Maximum number of events to generate. + */ + public final long maxEvents; + + /** + * First event number. Generators running in parallel time may share the same event number, + * and the event number is used to determine the event timestamp. + */ + public final long firstEventNumber; + + /** + * True period of epoch in milliseconds. Derived from above. + * (Ie time to run through cycle for all interEventDelayUs entries). + */ + public final long epochPeriodMs; + + /** + * Number of events per epoch. Derived from above. + * (Ie number of events to run through cycle for all interEventDelayUs entries). + */ + public final long eventsPerEpoch; + + public GeneratorConfig( + NexmarkConfiguration configuration, long baseTime, long firstEventId, + long maxEventsOrZero, long firstEventNumber) { + this.configuration = configuration; + this.interEventDelayUs = configuration.rateShape.interEventDelayUs( + configuration.firstEventRate, configuration.nextEventRate, + configuration.rateUnit, configuration.numEventGenerators); + this.stepLengthSec = configuration.rateShape.stepLengthSec(configuration.ratePeriodSec); + this.baseTime = baseTime; + this.firstEventId = firstEventId; + if (maxEventsOrZero == 0) { + // Scale maximum down to avoid overflow in getEstimatedSizeBytes. + this.maxEvents = + Long.MAX_VALUE / (PROPORTION_DENOMINATOR + * Math.max( + Math.max(configuration.avgPersonByteSize, configuration.avgAuctionByteSize), + configuration.avgBidByteSize)); + } else { + this.maxEvents = maxEventsOrZero; + } + this.firstEventNumber = firstEventNumber; + + long eventsPerEpoch = 0; + long epochPeriodMs = 0; + if (interEventDelayUs.length > 1) { + for (int i = 0; i < interEventDelayUs.length; i++) { + long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; + eventsPerEpoch += numEventsForThisCycle; + epochPeriodMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; + } + } + this.eventsPerEpoch = eventsPerEpoch; + this.epochPeriodMs = epochPeriodMs; + } + + /** + * Return a clone of this config. + */ + @Override + public GeneratorConfig clone() { + return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); + } + + /** + * Return clone of this config except with given parameters. + */ + public GeneratorConfig cloneWith(long firstEventId, long maxEvents, long firstEventNumber) { + return new GeneratorConfig(configuration, baseTime, firstEventId, maxEvents, firstEventNumber); + } + + /** + * Split this config into {@code n} sub-configs with roughly equal number of + * possible events, but distinct value spaces. The generators will run on parallel timelines. + * This config should no longer be used. + */ + public List split(int n) { + List results = new ArrayList<>(); + if (n == 1) { + // No split required. + results.add(this); + } else { + long subMaxEvents = maxEvents / n; + long subFirstEventId = firstEventId; + for (int i = 0; i < n; i++) { + if (i == n - 1) { + // Don't loose any events to round-down. + subMaxEvents = maxEvents - subMaxEvents * (n - 1); + } + results.add(cloneWith(subFirstEventId, subMaxEvents, firstEventNumber)); + subFirstEventId += subMaxEvents; + } + } + return results; + } + + /** + * Return an estimate of the bytes needed by {@code numEvents}. + */ + public long estimatedBytesForEvents(long numEvents) { + long numPersons = + (numEvents * GeneratorConfig.PERSON_PROPORTION) / GeneratorConfig.PROPORTION_DENOMINATOR; + long numAuctions = (numEvents * AUCTION_PROPORTION) / PROPORTION_DENOMINATOR; + long numBids = (numEvents * BID_PROPORTION) / PROPORTION_DENOMINATOR; + return numPersons * configuration.avgPersonByteSize + + numAuctions * configuration.avgAuctionByteSize + + numBids * configuration.avgBidByteSize; + } + + /** + * Return an estimate of the byte-size of all events a generator for this config would yield. + */ + public long getEstimatedSizeBytes() { + return estimatedBytesForEvents(maxEvents); + } + + /** + * Return the first 'event id' which could be generated from this config. Though events don't + * have ids we can simulate them to help bookkeeping. + */ + public long getStartEventId() { + return firstEventId + firstEventNumber; + } + + /** + * Return one past the last 'event id' which could be generated from this config. + */ + public long getStopEventId() { + return firstEventId + firstEventNumber + maxEvents; + } + + /** + * Return the next event number for a generator which has so far emitted {@code numEvents}. + */ + public long nextEventNumber(long numEvents) { + return firstEventNumber + numEvents; + } + + /** + * Return the next event number for a generator which has so far emitted {@code numEvents}, + * but adjusted to account for {@code outOfOrderGroupSize}. + */ + public long nextAdjustedEventNumber(long numEvents) { + long n = configuration.outOfOrderGroupSize; + long eventNumber = nextEventNumber(numEvents); + long base = (eventNumber / n) * n; + long offset = (eventNumber * 953) % n; + return base + offset; + } + + /** + * Return the event number who's event time will be a suitable watermark for + * a generator which has so far emitted {@code numEvents}. + */ + public long nextEventNumberForWatermark(long numEvents) { + long n = configuration.outOfOrderGroupSize; + long eventNumber = nextEventNumber(numEvents); + return (eventNumber / n) * n; + } + + /** + * What timestamp should the event with {@code eventNumber} have for this generator? And + * what inter-event delay (in microseconds) is current? + */ + public KV timestampAndInterEventDelayUsForEvent(long eventNumber) { + if (interEventDelayUs.length == 1) { + long timestamp = baseTime + (eventNumber * interEventDelayUs[0]) / 1000L; + return KV.of(timestamp, interEventDelayUs[0]); + } + + long epoch = eventNumber / eventsPerEpoch; + long n = eventNumber % eventsPerEpoch; + long offsetInEpochMs = 0; + for (int i = 0; i < interEventDelayUs.length; i++) { + long numEventsForThisCycle = (stepLengthSec * 1_000_000L) / interEventDelayUs[i]; + if (n < numEventsForThisCycle) { + long offsetInCycleUs = n * interEventDelayUs[i]; + long timestamp = + baseTime + epoch * epochPeriodMs + offsetInEpochMs + (offsetInCycleUs / 1000L); + return KV.of(timestamp, interEventDelayUs[i]); + } + n -= numEventsForThisCycle; + offsetInEpochMs += (numEventsForThisCycle * interEventDelayUs[i]) / 1000L; + } + throw new RuntimeException("internal eventsPerEpoch incorrect"); // can't reach + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("GeneratorConfig"); + sb.append("{configuration:"); + sb.append(configuration.toString()); + sb.append(";interEventDelayUs=["); + for (int i = 0; i < interEventDelayUs.length; i++) { + if (i > 0) { + sb.append(","); + } + sb.append(interEventDelayUs[i]); + } + sb.append("]"); + sb.append(";stepLengthSec:"); + sb.append(stepLengthSec); + sb.append(";baseTime:"); + sb.append(baseTime); + sb.append(";firstEventId:"); + sb.append(firstEventId); + sb.append(";maxEvents:"); + sb.append(maxEvents); + sb.append(";firstEventNumber:"); + sb.append(firstEventNumber); + sb.append(";epochPeriodMs:"); + sb.append(epochPeriodMs); + sb.append(";eventsPerEpoch:"); + sb.append(eventsPerEpoch); + sb.append("}"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java new file mode 100644 index 0000000..c72b76a --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/IdNameReserve.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result type of {@link Query8}. + */ +public class IdNameReserve implements KnownSize, Serializable { + private static final Coder LONG_CODER = VarLongCoder.of(); + private static final Coder STRING_CODER = StringUtf8Coder.of(); + + public static final Coder CODER = new AtomicCoder() { + @Override + public void encode(IdNameReserve value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + LONG_CODER.encode(value.id, outStream, Context.NESTED); + STRING_CODER.encode(value.name, outStream, Context.NESTED); + LONG_CODER.encode(value.reserve, outStream, Context.NESTED); + } + + @Override + public IdNameReserve decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + long id = LONG_CODER.decode(inStream, Context.NESTED); + String name = STRING_CODER.decode(inStream, Context.NESTED); + long reserve = LONG_CODER.decode(inStream, Context.NESTED); + return new IdNameReserve(id, name, reserve); + } + }; + + @JsonProperty + public final long id; + + @JsonProperty + public final String name; + + /** Reserve price in cents. */ + @JsonProperty + public final long reserve; + + // For Avro only. + @SuppressWarnings("unused") + private IdNameReserve() { + id = 0; + name = null; + reserve = 0; + } + + public IdNameReserve(long id, String name, long reserve) { + this.id = id; + this.name = name; + this.reserve = reserve; + } + + @Override + public long sizeInBytes() { + return 8 + name.length() + 1 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java new file mode 100644 index 0000000..394b6db --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/KnownSize.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +/** + * Interface for elements which can quickly estimate their encoded byte size. + */ +public interface KnownSize { + long sizeInBytes(); +} + http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java new file mode 100644 index 0000000..6874578 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Monitor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Max.MaxLongFn; +import org.apache.beam.sdk.transforms.Min.MinLongFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum.SumLongFn; +import org.apache.beam.sdk.values.PCollection; + +import java.io.Serializable; + +/** + * A monitor of elements with support for later retrieving their aggregators. + * + * @param Type of element we are monitoring. + */ +public class Monitor implements Serializable { + private class MonitorDoFn extends DoFn { + public final Aggregator elementCounter = + createAggregator(counterNamePrefix + "_elements", new SumLongFn()); + public final Aggregator bytesCounter = + createAggregator(counterNamePrefix + "_bytes", new SumLongFn()); + public final Aggregator startTime = + createAggregator(counterNamePrefix + "_startTime", new MinLongFn()); + public final Aggregator endTime = + createAggregator(counterNamePrefix + "_endTime", new MaxLongFn()); + public final Aggregator startTimestamp = + createAggregator("startTimestamp", new MinLongFn()); + public final Aggregator endTimestamp = + createAggregator("endTimestamp", new MaxLongFn()); + + @Override + public void processElement(ProcessContext c) { + elementCounter.addValue(1L); + bytesCounter.addValue(c.element().sizeInBytes()); + long now = System.currentTimeMillis(); + startTime.addValue(now); + endTime.addValue(now); + startTimestamp.addValue(c.timestamp().getMillis()); + endTimestamp.addValue(c.timestamp().getMillis()); + c.output(c.element()); + } + } + + final MonitorDoFn doFn; + final PTransform, PCollection> transform; + private String counterNamePrefix; + + public Monitor(String name, String counterNamePrefix) { + this.counterNamePrefix = counterNamePrefix; + doFn = new MonitorDoFn(); + transform = ParDo.named(name + ".Monitor").of(doFn); + } + + public PTransform, PCollection> getTransform() { + return transform; + } + + public Aggregator getElementCounter() { + return doFn.elementCounter; + } + + public Aggregator getBytesCounter() { + return doFn.bytesCounter; + } + + public Aggregator getStartTime() { + return doFn.startTime; + } + + public Aggregator getEndTime() { + return doFn.endTime; + } + + public Aggregator getStartTimestamp() { + return doFn.startTimestamp; + } + + public Aggregator getEndTimestamp() { + return doFn.endTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java new file mode 100644 index 0000000..2753d2e --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NameCityStateId.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Result of {@link Query3}. + */ +public class NameCityStateId implements KnownSize, Serializable { + private static final Coder LONG_CODER = VarLongCoder.of(); + private static final Coder STRING_CODER = StringUtf8Coder.of(); + + public static final Coder CODER = new AtomicCoder() { + @Override + public void encode(NameCityStateId value, OutputStream outStream, + Coder.Context context) + throws CoderException, IOException { + STRING_CODER.encode(value.name, outStream, Context.NESTED); + STRING_CODER.encode(value.city, outStream, Context.NESTED); + STRING_CODER.encode(value.state, outStream, Context.NESTED); + LONG_CODER.encode(value.id, outStream, Context.NESTED); + } + + @Override + public NameCityStateId decode( + InputStream inStream, Coder.Context context) + throws CoderException, IOException { + String name = STRING_CODER.decode(inStream, Context.NESTED); + String city = STRING_CODER.decode(inStream, Context.NESTED); + String state = STRING_CODER.decode(inStream, Context.NESTED); + long id = LONG_CODER.decode(inStream, Context.NESTED); + return new NameCityStateId(name, city, state, id); + } + }; + + @JsonProperty + public final String name; + + @JsonProperty + public final String city; + + @JsonProperty + public final String state; + + @JsonProperty + public final long id; + + // For Avro only. + @SuppressWarnings("unused") + private NameCityStateId() { + name = null; + city = null; + state = null; + id = 0; + } + + public NameCityStateId(String name, String city, String state, long id) { + this.name = name; + this.city = city; + this.state = state; + this.id = id; + } + + @Override + public long sizeInBytes() { + return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8; + } + + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java new file mode 100644 index 0000000..2292ba5 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkConfiguration.java @@ -0,0 +1,662 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; + + +/** + * Configuration controlling how a query is run. May be supplied by command line or + * programmatically. We only capture properties which may influence the resulting + * pipeline performance, as captured by {@link NexmarkPerf}. + */ +class NexmarkConfiguration implements Serializable { + public static final NexmarkConfiguration DEFAULT = new NexmarkConfiguration(); + + /** If {@literal true}, include additional debugging and monitoring stats. */ + @JsonProperty + public boolean debug = true; + + /** Which query to run, in [0,9]. */ + @JsonProperty + public int query = 0; + + /** Where events come from. */ + @JsonProperty + public NexmarkUtils.SourceType sourceType = NexmarkUtils.SourceType.DIRECT; + + /** Where results go to. */ + @JsonProperty + public NexmarkUtils.SinkType sinkType = NexmarkUtils.SinkType.DEVNULL; + + /** + * Control whether pub/sub publishing is done in a stand-alone pipeline or is integrated + * into the overall query pipeline. + */ + @JsonProperty + public NexmarkUtils.PubSubMode pubSubMode = NexmarkUtils.PubSubMode.COMBINED; + + /** + * Number of events to generate. If zero, generate as many as possible without overflowing + * internal counters etc. + */ + @JsonProperty + public long numEvents = 100000; + + /** + * Number of event generators to use. Each generates events in its own timeline. + */ + @JsonProperty + public int numEventGenerators = 100; + + /** + * Shape of event rate curve. + */ + @JsonProperty + public NexmarkUtils.RateShape rateShape = NexmarkUtils.RateShape.SINE; + + /** + * Initial overall event rate (in {@link #rateUnit}). + */ + @JsonProperty + public int firstEventRate = 10000; + + /** + * Next overall event rate (in {@link #rateUnit}). + */ + @JsonProperty + public int nextEventRate = 10000; + + /** + * Unit for rates. + */ + @JsonProperty + public NexmarkUtils.RateUnit rateUnit = NexmarkUtils.RateUnit.PER_SECOND; + + /** + * Overall period of rate shape, in seconds. + */ + @JsonProperty + public int ratePeriodSec = 600; + + /** + * Time in seconds to preload the subscription with data, at the initial input rate of the + * pipeline. + */ + @JsonProperty + public int preloadSeconds = 0; + + /** + * If true, and in streaming mode, generate events only when they are due according to their + * timestamp. + */ + @JsonProperty + public boolean isRateLimited = false; + + /** + * If true, use wallclock time as event time. Otherwise, use a deterministic + * time in the past so that multiple runs will see exactly the same event streams + * and should thus have exactly the same results. + */ + @JsonProperty + public boolean useWallclockEventTime = false; + + /** Average idealized size of a 'new person' event, in bytes. */ + @JsonProperty + public int avgPersonByteSize = 200; + + /** Average idealized size of a 'new auction' event, in bytes. */ + @JsonProperty + public int avgAuctionByteSize = 500; + + /** Average idealized size of a 'bid' event, in bytes. */ + @JsonProperty + public int avgBidByteSize = 100; + + /** Ratio of bids to 'hot' auctions compared to all other auctions. */ + @JsonProperty + public int hotAuctionRatio = 1; + + /** Ratio of auctions for 'hot' sellers compared to all other people. */ + @JsonProperty + public int hotSellersRatio = 1; + + /** Ratio of bids for 'hot' bidders compared to all other people. */ + @JsonProperty + public int hotBiddersRatio = 1; + + /** Window size, in seconds, for queries 3, 5, 7 and 8. */ + @JsonProperty + public long windowSizeSec = 10; + + /** Sliding window period, in seconds, for query 5. */ + @JsonProperty + public long windowPeriodSec = 5; + + /** Number of seconds to hold back events according to their reported timestamp. */ + @JsonProperty + public long watermarkHoldbackSec = 0; + + /** Average number of auction which should be inflight at any time, per generator. */ + @JsonProperty + public int numInFlightAuctions = 100; + + /** Maximum number of people to consider as active for placing auctions or bids. */ + @JsonProperty + public int numActivePeople = 1000; + + /** Coder strategy to follow. */ + @JsonProperty + public NexmarkUtils.CoderStrategy coderStrategy = NexmarkUtils.CoderStrategy.HAND; + + /** + * Delay, in milliseconds, for each event. This will peg one core for this number + * of milliseconds to simulate CPU-bound computation. + */ + @JsonProperty + public long cpuDelayMs = 0; + + /** + * Extra data, in bytes, to save to persistent state for each event. This will force + * i/o all the way to durable storage to simulate an I/O-bound computation. + */ + @JsonProperty + public long diskBusyBytes = 0; + + /** + * Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction. + */ + @JsonProperty + public int auctionSkip = 123; + + /** + * Fanout for queries 4 (groups by category id), 5 and 7 (find a global maximum). + */ + @JsonProperty + public int fanout = 5; + + /** + * Length of occasional delay to impose on events (in seconds). + */ + @JsonProperty + public long occasionalDelaySec = 0; + + /** + * Probability that an event will be delayed by delayS. + */ + @JsonProperty + public double probDelayedEvent = 0.0; + + /** + * Maximum size of each log file (in events). For Query10 only. + */ + @JsonProperty + public int maxLogEvents = 100_000; + + /** + * If true, use pub/sub publish time instead of event time. + */ + @JsonProperty + public boolean usePubsubPublishTime = false; + + /** + * Number of events in out-of-order groups. 1 implies no out-of-order events. 1000 implies + * every 1000 events per generator are emitted in pseudo-random order. + */ + @JsonProperty + public long outOfOrderGroupSize = 1; + + /** + * Replace any properties of this configuration which have been supplied by the command line. + */ + public void overrideFromOptions(Options options) { + if (options.getDebug() != null) { + debug = options.getDebug(); + } + if (options.getQuery() != null) { + query = options.getQuery(); + } + if (options.getSourceType() != null) { + sourceType = options.getSourceType(); + } + if (options.getSinkType() != null) { + sinkType = options.getSinkType(); + } + if (options.getPubSubMode() != null) { + pubSubMode = options.getPubSubMode(); + } + if (options.getNumEvents() != null) { + numEvents = options.getNumEvents(); + } + if (options.getNumEventGenerators() != null) { + numEventGenerators = options.getNumEventGenerators(); + } + if (options.getRateShape() != null) { + rateShape = options.getRateShape(); + } + if (options.getFirstEventRate() != null) { + firstEventRate = options.getFirstEventRate(); + } + if (options.getNextEventRate() != null) { + nextEventRate = options.getNextEventRate(); + } + if (options.getRateUnit() != null) { + rateUnit = options.getRateUnit(); + } + if (options.getRatePeriodSec() != null) { + ratePeriodSec = options.getRatePeriodSec(); + } + if (options.getPreloadSeconds() != null) { + preloadSeconds = options.getPreloadSeconds(); + } + if (options.getIsRateLimited() != null) { + isRateLimited = options.getIsRateLimited(); + } + if (options.getUseWallclockEventTime() != null) { + useWallclockEventTime = options.getUseWallclockEventTime(); + } + if (options.getAvgPersonByteSize() != null) { + avgPersonByteSize = options.getAvgPersonByteSize(); + } + if (options.getAvgAuctionByteSize() != null) { + avgAuctionByteSize = options.getAvgAuctionByteSize(); + } + if (options.getAvgBidByteSize() != null) { + avgBidByteSize = options.getAvgBidByteSize(); + } + if (options.getHotAuctionRatio() != null) { + hotAuctionRatio = options.getHotAuctionRatio(); + } + if (options.getHotSellersRatio() != null) { + hotSellersRatio = options.getHotSellersRatio(); + } + if (options.getHotBiddersRatio() != null) { + hotBiddersRatio = options.getHotBiddersRatio(); + } + if (options.getWindowSizeSec() != null) { + windowSizeSec = options.getWindowSizeSec(); + } + if (options.getWindowPeriodSec() != null) { + windowPeriodSec = options.getWindowPeriodSec(); + } + if (options.getWatermarkHoldbackSec() != null) { + watermarkHoldbackSec = options.getWatermarkHoldbackSec(); + } + if (options.getNumInFlightAuctions() != null) { + numInFlightAuctions = options.getNumInFlightAuctions(); + } + if (options.getNumActivePeople() != null) { + numActivePeople = options.getNumActivePeople(); + } + if (options.getCoderStrategy() != null) { + coderStrategy = options.getCoderStrategy(); + } + if (options.getCpuDelayMs() != null) { + cpuDelayMs = options.getCpuDelayMs(); + } + if (options.getDiskBusyBytes() != null) { + diskBusyBytes = options.getDiskBusyBytes(); + } + if (options.getAuctionSkip() != null) { + auctionSkip = options.getAuctionSkip(); + } + if (options.getFanout() != null) { + fanout = options.getFanout(); + } + if (options.getOccasionalDelaySec() != null) { + occasionalDelaySec = options.getOccasionalDelaySec(); + } + if (options.getProbDelayedEvent() != null) { + probDelayedEvent = options.getProbDelayedEvent(); + } + if (options.getMaxLogEvents() != null) { + maxLogEvents = options.getMaxLogEvents(); + } + if (options.getUsePubsubPublishTime() != null) { + usePubsubPublishTime = options.getUsePubsubPublishTime(); + } + if (options.getOutOfOrderGroupSize() != null) { + outOfOrderGroupSize = options.getOutOfOrderGroupSize(); + } + } + + /** + * Return clone of configuration with given label. + */ + @Override + public NexmarkConfiguration clone() { + NexmarkConfiguration result = new NexmarkConfiguration(); + result.debug = debug; + result.query = query; + result.sourceType = sourceType; + result.sinkType = sinkType; + result.pubSubMode = pubSubMode; + result.numEvents = numEvents; + result.numEventGenerators = numEventGenerators; + result.rateShape = rateShape; + result.firstEventRate = firstEventRate; + result.nextEventRate = nextEventRate; + result.rateUnit = rateUnit; + result.ratePeriodSec = ratePeriodSec; + result.preloadSeconds = preloadSeconds; + result.isRateLimited = isRateLimited; + result.useWallclockEventTime = useWallclockEventTime; + result.avgPersonByteSize = avgPersonByteSize; + result.avgAuctionByteSize = avgAuctionByteSize; + result.avgBidByteSize = avgBidByteSize; + result.hotAuctionRatio = hotAuctionRatio; + result.hotSellersRatio = hotSellersRatio; + result.hotBiddersRatio = hotBiddersRatio; + result.windowSizeSec = windowSizeSec; + result.windowPeriodSec = windowPeriodSec; + result.watermarkHoldbackSec = watermarkHoldbackSec; + result.numInFlightAuctions = numInFlightAuctions; + result.numActivePeople = numActivePeople; + result.coderStrategy = coderStrategy; + result.cpuDelayMs = cpuDelayMs; + result.diskBusyBytes = diskBusyBytes; + result.auctionSkip = auctionSkip; + result.fanout = fanout; + result.occasionalDelaySec = occasionalDelaySec; + result.probDelayedEvent = probDelayedEvent; + result.maxLogEvents = maxLogEvents; + result.usePubsubPublishTime = usePubsubPublishTime; + result.outOfOrderGroupSize = outOfOrderGroupSize; + return result; + } + + /** + * Return short description of configuration (suitable for use in logging). We only render + * the core fields plus those which do not have default values. + */ + public String toShortString() { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("query:%d", query)); + if (debug != DEFAULT.debug) { + sb.append(String.format("; debug:%s", debug)); + } + if (sourceType != DEFAULT.sourceType) { + sb.append(String.format("; sourceType:%s", sourceType)); + } + if (sinkType != DEFAULT.sinkType) { + sb.append(String.format("; sinkType:%s", sinkType)); + } + if (pubSubMode != DEFAULT.pubSubMode) { + sb.append(String.format("; pubSubMode:%s", pubSubMode)); + } + if (numEvents != DEFAULT.numEvents) { + sb.append(String.format("; numEvents:%d", numEvents)); + } + if (numEventGenerators != DEFAULT.numEventGenerators) { + sb.append(String.format("; numEventGenerators:%d", numEventGenerators)); + } + if (rateShape != DEFAULT.rateShape) { + sb.append(String.format("; rateShape:%s", rateShape)); + } + if (firstEventRate != DEFAULT.firstEventRate || nextEventRate != DEFAULT.nextEventRate) { + sb.append(String.format("; firstEventRate:%d", firstEventRate)); + sb.append(String.format("; nextEventRate:%d", nextEventRate)); + } + if (rateUnit != DEFAULT.rateUnit) { + sb.append(String.format("; rateUnit:%s", rateUnit)); + } + if (ratePeriodSec != DEFAULT.ratePeriodSec) { + sb.append(String.format("; ratePeriodSec:%d", ratePeriodSec)); + } + if (preloadSeconds != DEFAULT.preloadSeconds) { + sb.append(String.format("; preloadSeconds:%d", preloadSeconds)); + } + if (isRateLimited != DEFAULT.isRateLimited) { + sb.append(String.format("; isRateLimited:%s", isRateLimited)); + } + if (useWallclockEventTime != DEFAULT.useWallclockEventTime) { + sb.append(String.format("; useWallclockEventTime:%s", useWallclockEventTime)); + } + if (avgPersonByteSize != DEFAULT.avgPersonByteSize) { + sb.append(String.format("; avgPersonByteSize:%d", avgPersonByteSize)); + } + if (avgAuctionByteSize != DEFAULT.avgAuctionByteSize) { + sb.append(String.format("; avgAuctionByteSize:%d", avgAuctionByteSize)); + } + if (avgBidByteSize != DEFAULT.avgBidByteSize) { + sb.append(String.format("; avgBidByteSize:%d", avgBidByteSize)); + } + if (hotAuctionRatio != DEFAULT.hotAuctionRatio) { + sb.append(String.format("; hotAuctionRatio:%d", hotAuctionRatio)); + } + if (hotSellersRatio != DEFAULT.hotSellersRatio) { + sb.append(String.format("; hotSellersRatio:%d", hotSellersRatio)); + } + if (hotBiddersRatio != DEFAULT.hotBiddersRatio) { + sb.append(String.format("; hotBiddersRatio:%d", hotBiddersRatio)); + } + if (windowSizeSec != DEFAULT.windowSizeSec) { + sb.append(String.format("; windowSizeSec:%d", windowSizeSec)); + } + if (windowPeriodSec != DEFAULT.windowPeriodSec) { + sb.append(String.format("; windowPeriodSec:%d", windowPeriodSec)); + } + if (watermarkHoldbackSec != DEFAULT.watermarkHoldbackSec) { + sb.append(String.format("; watermarkHoldbackSec:%d", watermarkHoldbackSec)); + } + if (numInFlightAuctions != DEFAULT.numInFlightAuctions) { + sb.append(String.format("; numInFlightAuctions:%d", numInFlightAuctions)); + } + if (numActivePeople != DEFAULT.numActivePeople) { + sb.append(String.format("; numActivePeople:%d", numActivePeople)); + } + if (coderStrategy != DEFAULT.coderStrategy) { + sb.append(String.format("; coderStrategy:%s", coderStrategy)); + } + if (cpuDelayMs != DEFAULT.cpuDelayMs) { + sb.append(String.format("; cpuSlowdownMs:%d", cpuDelayMs)); + } + if (diskBusyBytes != DEFAULT.diskBusyBytes) { + sb.append(String.format("; diskBuysBytes:%d", diskBusyBytes)); + } + if (auctionSkip != DEFAULT.auctionSkip) { + sb.append(String.format("; auctionSkip:%d", auctionSkip)); + } + if (fanout != DEFAULT.fanout) { + sb.append(String.format("; fanout:%d", fanout)); + } + if (occasionalDelaySec != DEFAULT.occasionalDelaySec) { + sb.append(String.format("; occasionalDelaySec:%d", occasionalDelaySec)); + } + if (probDelayedEvent != DEFAULT.probDelayedEvent) { + sb.append(String.format("; probDelayedEvent:%f", probDelayedEvent)); + } + if (maxLogEvents != DEFAULT.maxLogEvents) { + sb.append(String.format("; maxLogEvents:%d", maxLogEvents)); + } + if (usePubsubPublishTime != DEFAULT.usePubsubPublishTime) { + sb.append(String.format("; usePubsubPublishTime:%s", usePubsubPublishTime)); + } + if (outOfOrderGroupSize != DEFAULT.outOfOrderGroupSize) { + sb.append(String.format("; outOfOrderGroupSize:%d", outOfOrderGroupSize)); + } + return sb.toString(); + } + + /** + * Return full description as a string. + */ + @Override + public String toString() { + try { + return NexmarkUtils.MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + /** + * Parse an object from {@code string}. + * + * @throws IOException + */ + public static NexmarkConfiguration fromString(String string) { + try { + return NexmarkUtils.MAPPER.readValue(string, NexmarkConfiguration.class); + } catch (IOException e) { + throw new RuntimeException("Unable to parse nexmark configuration: ", e); + } + } + + @Override + public int hashCode() { + return Objects.hash(debug, query, sourceType, sinkType, pubSubMode, + numEvents, numEventGenerators, rateShape, firstEventRate, nextEventRate, rateUnit, + ratePeriodSec, preloadSeconds, isRateLimited, useWallclockEventTime, avgPersonByteSize, + avgAuctionByteSize, avgBidByteSize, hotAuctionRatio, hotSellersRatio, hotBiddersRatio, + windowSizeSec, windowPeriodSec, watermarkHoldbackSec, numInFlightAuctions, numActivePeople, + coderStrategy, cpuDelayMs, diskBusyBytes, auctionSkip, fanout, + occasionalDelaySec, probDelayedEvent, maxLogEvents, usePubsubPublishTime, + outOfOrderGroupSize); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + NexmarkConfiguration other = (NexmarkConfiguration) obj; + if (debug != other.debug) { + return false; + } + if (auctionSkip != other.auctionSkip) { + return false; + } + if (avgAuctionByteSize != other.avgAuctionByteSize) { + return false; + } + if (avgBidByteSize != other.avgBidByteSize) { + return false; + } + if (avgPersonByteSize != other.avgPersonByteSize) { + return false; + } + if (coderStrategy != other.coderStrategy) { + return false; + } + if (cpuDelayMs != other.cpuDelayMs) { + return false; + } + if (diskBusyBytes != other.diskBusyBytes) { + return false; + } + if (fanout != other.fanout) { + return false; + } + if (firstEventRate != other.firstEventRate) { + return false; + } + if (hotAuctionRatio != other.hotAuctionRatio) { + return false; + } + if (hotBiddersRatio != other.hotBiddersRatio) { + return false; + } + if (hotSellersRatio != other.hotSellersRatio) { + return false; + } + if (isRateLimited != other.isRateLimited) { + return false; + } + if (maxLogEvents != other.maxLogEvents) { + return false; + } + if (nextEventRate != other.nextEventRate) { + return false; + } + if (rateUnit != other.rateUnit) { + return false; + } + if (numEventGenerators != other.numEventGenerators) { + return false; + } + if (numEvents != other.numEvents) { + return false; + } + if (numInFlightAuctions != other.numInFlightAuctions) { + return false; + } + if (numActivePeople != other.numActivePeople) { + return false; + } + if (occasionalDelaySec != other.occasionalDelaySec) { + return false; + } + if (preloadSeconds != other.preloadSeconds) { + return false; + } + if (Double.doubleToLongBits(probDelayedEvent) + != Double.doubleToLongBits(other.probDelayedEvent)) { + return false; + } + if (pubSubMode != other.pubSubMode) { + return false; + } + if (ratePeriodSec != other.ratePeriodSec) { + return false; + } + if (rateShape != other.rateShape) { + return false; + } + if (query != other.query) { + return false; + } + if (sinkType != other.sinkType) { + return false; + } + if (sourceType != other.sourceType) { + return false; + } + if (useWallclockEventTime != other.useWallclockEventTime) { + return false; + } + if (watermarkHoldbackSec != other.watermarkHoldbackSec) { + return false; + } + if (windowPeriodSec != other.windowPeriodSec) { + return false; + } + if (windowSizeSec != other.windowSizeSec) { + return false; + } + if (usePubsubPublishTime != other.usePubsubPublishTime) { + return false; + } + if (outOfOrderGroupSize != other.outOfOrderGroupSize) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java new file mode 100644 index 0000000..dbc1ce2 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkDriver.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * An implementation of the 'NEXMark queries' for Google Dataflow. + * These are 11 queries over a three table schema representing on online auction system: + *

    + *
  • {@link Person} represents a person submitting an item for auction and/or making a bid + * on an auction. + *
  • {@link Auction} represents an item under auction. + *
  • {@link Bid} represents a bid for an item under auction. + *
+ * The queries exercise many aspects of streaming dataflow. + *

+ *

We synthesize the creation of people, auctions and bids in real-time. The data is not + * particularly sensible. + *

+ *

See + * + * http://datalab.cs.pdx.edu/niagaraST/NEXMark/ + */ +public class NexmarkDriver { + + /** + * Entry point. + */ + public void runAll(OptionT options, NexmarkRunner runner) { + Instant start = Instant.now(); + Map baseline = loadBaseline(options.getBaselineFilename()); + Map actual = new LinkedHashMap<>(); + Iterable configurations = options.getSuite().getConfigurations(options); + + boolean successful = true; + try { + // Run all the configurations. + for (NexmarkConfiguration configuration : configurations) { + NexmarkPerf perf = runner.run(configuration); + if (perf != null) { + if (perf.errors == null || perf.errors.size() > 0) { + successful = false; + } + appendPerf(options.getPerfFilename(), configuration, perf); + actual.put(configuration, perf); + // Summarize what we've run so far. + saveSummary(null, configurations, actual, baseline, start); + } + } + } finally { + if (options.getMonitorJobs()) { + // Report overall performance. + saveSummary(options.getSummaryFilename(), configurations, actual, baseline, start); + saveJavascript(options.getJavascriptFilename(), configurations, actual, baseline, start); + } + } + + if (!successful) { + System.exit(1); + } + } + + /** + * Append the pair of {@code configuration} and {@code perf} to perf file. + */ + private void appendPerf( + @Nullable String perfFilename, NexmarkConfiguration configuration, + NexmarkPerf perf) { + if (perfFilename == null) { + return; + } + List lines = new ArrayList<>(); + lines.add(""); + lines.add(String.format("# %s", Instant.now())); + lines.add(String.format("# %s", configuration.toShortString())); + lines.add(configuration.toString()); + lines.add(perf.toString()); + try { + Files.write(Paths.get(perfFilename), lines, StandardCharsets.UTF_8, StandardOpenOption.CREATE, + StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("Unable to write perf file: ", e); + } + NexmarkUtils.console("appended results to perf file %s.", perfFilename); + } + + /** + * Load the baseline perf. + */ + @Nullable + private static Map loadBaseline( + @Nullable String baselineFilename) { + if (baselineFilename == null) { + return null; + } + Map baseline = new LinkedHashMap<>(); + List lines; + try { + lines = Files.readAllLines(Paths.get(baselineFilename), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Unable to read baseline perf file: ", e); + } + for (int i = 0; i < lines.size(); i++) { + if (lines.get(i).startsWith("#") || lines.get(i).trim().isEmpty()) { + continue; + } + NexmarkConfiguration configuration = NexmarkConfiguration.fromString(lines.get(i++)); + NexmarkPerf perf = NexmarkPerf.fromString(lines.get(i)); + baseline.put(configuration, perf); + } + NexmarkUtils.console("loaded %d entries from baseline file %s.", baseline.size(), + baselineFilename); + return baseline; + } + + private static final String LINE = + "=========================================================================================="; + + /** + * Print summary of {@code actual} vs (if non-null) {@code baseline}. + * + * @throws IOException + */ + private static void saveSummary( + @Nullable String summaryFilename, + Iterable configurations, Map actual, + @Nullable Map baseline, Instant start) { + List lines = new ArrayList<>(); + + lines.add(""); + lines.add(LINE); + + lines.add( + String.format("Run started %s and ran for %s", start, new Duration(start, Instant.now()))); + lines.add(""); + + lines.add("Default configuration:"); + lines.add(NexmarkConfiguration.DEFAULT.toString()); + lines.add(""); + + lines.add("Configurations:"); + lines.add(" Conf Description"); + int conf = 0; + for (NexmarkConfiguration configuration : configurations) { + lines.add(String.format(" %04d %s", conf++, configuration.toShortString())); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf != null && actualPerf.jobId != null) { + lines.add(String.format(" %4s [Ran as job %s]", "", actualPerf.jobId)); + } + } + + lines.add(""); + lines.add("Performance:"); + lines.add(String.format(" %4s %12s %12s %12s %12s %12s %12s", "Conf", "Runtime(sec)", + "(Baseline)", "Events(/sec)", "(Baseline)", "Results", "(Baseline)")); + conf = 0; + for (NexmarkConfiguration configuration : configurations) { + String line = String.format(" %04d ", conf++); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf == null) { + line += "*** not run ***"; + } else { + NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); + double runtimeSec = actualPerf.runtimeSec; + line += String.format("%12.1f ", runtimeSec); + if (baselinePerf == null) { + line += String.format("%12s ", ""); + } else { + double baselineRuntimeSec = baselinePerf.runtimeSec; + double diff = ((runtimeSec - baselineRuntimeSec) / baselineRuntimeSec) * 100.0; + line += String.format("%+11.2f%% ", diff); + } + + double eventsPerSec = actualPerf.eventsPerSec; + line += String.format("%12.1f ", eventsPerSec); + if (baselinePerf == null) { + line += String.format("%12s ", ""); + } else { + double baselineEventsPerSec = baselinePerf.eventsPerSec; + double diff = ((eventsPerSec - baselineEventsPerSec) / baselineEventsPerSec) * 100.0; + line += String.format("%+11.2f%% ", diff); + } + + long numResults = actualPerf.numResults; + line += String.format("%12d ", numResults); + if (baselinePerf == null) { + line += String.format("%12s", ""); + } else { + long baselineNumResults = baselinePerf.numResults; + long diff = numResults - baselineNumResults; + line += String.format("%+12d", diff); + } + } + lines.add(line); + + if (actualPerf != null) { + List errors = actualPerf.errors; + if (errors == null) { + errors = new ArrayList(); + errors.add("NexmarkGoogleRunner returned null errors list"); + } + for (String error : errors) { + lines.add(String.format(" %4s *** %s ***", "", error)); + } + } + } + + lines.add(LINE); + lines.add(""); + + for (String line : lines) { + System.out.println(line); + } + + if (summaryFilename != null) { + try { + Files.write(Paths.get(summaryFilename), lines, StandardCharsets.UTF_8, + StandardOpenOption.CREATE, StandardOpenOption.APPEND); + } catch (IOException e) { + throw new RuntimeException("Unable to save summary file: ", e); + } + NexmarkUtils.console("appended summary to summary file %s.", summaryFilename); + } + } + + /** + * Write all perf data and any baselines to a javascript file which can be used by + * graphing page etc. + */ + private static void saveJavascript( + @Nullable String javascriptFilename, + Iterable configurations, Map actual, + @Nullable Map baseline, Instant start) { + if (javascriptFilename == null) { + return; + } + + List lines = new ArrayList<>(); + lines.add(String.format( + "// Run started %s and ran for %s", start, new Duration(start, Instant.now()))); + lines.add("var all = ["); + + for (NexmarkConfiguration configuration : configurations) { + lines.add(" {"); + lines.add(String.format(" config: %s", configuration)); + NexmarkPerf actualPerf = actual.get(configuration); + if (actualPerf != null) { + lines.add(String.format(" ,perf: %s", actualPerf)); + } + NexmarkPerf baselinePerf = baseline == null ? null : baseline.get(configuration); + if (baselinePerf != null) { + lines.add(String.format(" ,baseline: %s", baselinePerf)); + } + lines.add(" },"); + } + + lines.add("];"); + + try { + Files.write(Paths.get(javascriptFilename), lines, StandardCharsets.UTF_8, + StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + } catch (IOException e) { + throw new RuntimeException("Unable to save javascript file: ", e); + } + NexmarkUtils.console("saved javascript to file %s.", javascriptFilename); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java new file mode 100644 index 0000000..0029a36 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkDriver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Run NexMark queries using Beam-on-Flink runner. + */ +public class NexmarkFlinkDriver extends NexmarkDriver { + /** + * Command line flags. + */ + public interface NexmarkFlinkOptions extends Options, FlinkPipelineOptions { + } + + /** + * Entry point. + */ + public static void main(String[] args) { + // Gather command line args, baseline, configurations, etc. + NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(NexmarkFlinkOptions.class); + options.setRunner(FlinkPipelineRunner.class); + NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options); + new NexmarkFlinkDriver().runAll(options, runner); + } +} + + http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java new file mode 100644 index 0000000..569aef6 --- /dev/null +++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkFlinkRunner.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.integration.nexmark; + +import javax.annotation.Nullable; + +/** + * Run a specific Nexmark query using the Bean-on-Flink runner. + */ +public class NexmarkFlinkRunner extends NexmarkRunner { + @Override + protected boolean isStreaming() { + return options.isStreaming(); + } + + @Override + protected int coresPerWorker() { + return 4; + } + + @Override + protected int maxNumWorkers() { + return 5; + } + + @Override + protected boolean canMonitor() { + return false; + } + + @Override + protected void invokeBuilderForPublishOnlyPipeline( + PipelineBuilder builder) { + builder.build(options); + } + + @Override + protected void waitForPublisherPreload() { + throw new UnsupportedOperationException(); + } + + @Override + @Nullable + protected NexmarkPerf monitor(NexmarkQuery query) { + return null; + } + + public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) { + super(options); + } +}