Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CC30718F2F for ; Mon, 22 Feb 2016 22:34:57 +0000 (UTC) Received: (qmail 31099 invoked by uid 500); 22 Feb 2016 22:34:57 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 31063 invoked by uid 500); 22 Feb 2016 22:34:57 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 31054 invoked by uid 99); 22 Feb 2016 22:34:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 22:34:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 201BCC12B3 for ; Mon, 22 Feb 2016 22:34:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 6eaFYoNRFkyQ for ; Mon, 22 Feb 2016 22:34:51 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 5D5795FE67 for ; Mon, 22 Feb 2016 22:34:50 +0000 (UTC) Received: (qmail 29293 invoked by uid 99); 22 Feb 2016 22:34:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 22:34:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6BD4AE0492; Mon, 22 Feb 2016 22:34:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.incubator.apache.org Date: Mon, 22 Feb 2016 22:35:04 -0000 Message-Id: <79cb9f22cb694f9c8ee3f59036d424ae@git.apache.org> In-Reply-To: <599702c4f0254acfa2e3eeee75299be2@git.apache.org> References: <599702c4f0254acfa2e3eeee75299be2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/34] incubator-asterixdb git commit: Enabled Feed Tests and Added External Library tests http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java index 06f7e72..e39b507 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/provider/TwitterFirehoseInputStreamProvider.java @@ -30,7 +30,9 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.external.api.IInputStreamProvider; +import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.stream.AInputStream; +import org.apache.asterix.external.util.FeedLogManager; import org.apache.asterix.external.util.TweetGenerator; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -80,8 +82,11 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider return true; } - public void start() { - executorService.execute(dataProvider); + public synchronized void start() { + if (!started) { + executorService.execute(dataProvider); + started = true; + } } @Override @@ -93,7 +98,6 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider public int read() throws IOException { if (!started) { start(); - started = true; } return in.read(); } @@ -106,6 +110,18 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider } return in.read(b, off, len); } + + @Override + public void configure(Map configuration) { + } + + @Override + public void setFeedLogManager(FeedLogManager logManager) { + } + + @Override + public void setController(AbstractFeedDataFlowController controller) { + } } private static class DataProvider implements Runnable { @@ -170,7 +186,7 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider break; } catch (Exception e) { if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Exception in adaptor " + e.getMessage()); + LOGGER.warning("Exception in adapter " + e.getMessage()); } } } @@ -181,4 +197,12 @@ public class TwitterFirehoseInputStreamProvider implements IInputStreamProvider } } + + @Override + public void configure(Map configuration) { + } + + @Override + public void setFeedLogManager(FeedLogManager feedLogManager) { + } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java index 3dea50c..df0ddc8 100755 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunctionProvider.java @@ -63,8 +63,8 @@ class ExternalScalarFunction extends ExternalFunction implements IExternalScalar try { setArguments(tuple); evaluate(functionHelper); - functionHelper.reset(); result.set(resultBuffer.getByteArray(), resultBuffer.getStartOffset(), resultBuffer.getLength()); + functionHelper.reset(); } catch (Exception e) { e.printStackTrace(); throw new AlgebricksException(e); @@ -73,6 +73,7 @@ class ExternalScalarFunction extends ExternalFunction implements IExternalScalar @Override public void evaluate(IFunctionHelper argumentProvider) throws Exception { + resultBuffer.reset(); ((IExternalScalarFunction) externalFunction).evaluate(argumentProvider); /* * Make sure that if "setResult" is not called, http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java index 7ad6cfa..db85e2f 100755 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java @@ -23,8 +23,14 @@ import java.util.Map; public class ExternalLibraryManager { - private static Map libraryClassLoaders = new HashMap(); + private static final Map libraryClassLoaders = new HashMap(); + /** + * Register the library class loader with the external library manager + * @param dataverseName + * @param libraryName + * @param classLoader + */ public static void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader) { String key = getKey(dataverseName, libraryName); synchronized (libraryClassLoaders) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java index a929eec..7e28c35 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java @@ -23,11 +23,11 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation; import org.apache.asterix.external.feed.api.IFeedManager; +import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType; import org.apache.asterix.external.feed.api.IFeedSubscriptionManager; import org.apache.asterix.external.feed.api.ISubscribableRuntime; -import org.apache.asterix.external.feed.api.IFeedLifecycleListener.ConnectionLocation; -import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.management.FeedId; import org.apache.asterix.external.feed.runtime.IngestionRuntime; @@ -151,7 +151,7 @@ public class FeedCollectOperatorDescriptor extends AbstractSingleActivityOperato ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId); while (ingestionRuntime == null && waitCycleCount < 10) { try { - Thread.sleep(2000); + Thread.sleep(3000); waitCycleCount++; if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java index 20c11b3..d41179f 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java @@ -36,12 +36,14 @@ import org.apache.asterix.external.feed.runtime.FeedRuntime; import org.apache.asterix.external.feed.runtime.FeedRuntimeId; import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId; import org.apache.asterix.external.feed.runtime.SubscribableRuntime; +import org.apache.asterix.external.util.FeedUtils; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; /* @@ -91,6 +93,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp private FeedRuntimeInputHandler inputSideHandler; + private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE); + public FeedMetaComputeNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId, Map feedPolicyProperties, String operationId) throws HyracksDataException { @@ -103,6 +107,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp this.connectionId = feedConnectionId; this.feedManager = (IFeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext() .getApplicationObject()).getFeedManager(); + ctx.setSharedObject(message); } @Override @@ -126,7 +131,8 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp private void initializeNewFeedRuntime(FeedRuntimeId runtimeId) throws Exception { this.fta = new FrameTupleAccessor(recordDesc); this.inputSideHandler = new FeedRuntimeInputHandler(ctx, connectionId, runtimeId, coreOperator, - policyEnforcer.getFeedPolicyAccessor(), true, fta, recordDesc, feedManager, nPartitions); + policyEnforcer.getFeedPolicyAccessor(), policyEnforcer.getFeedPolicyAccessor().bufferingEnabled(), fta, + recordDesc, feedManager, nPartitions); DistributeFeedFrameWriter distributeWriter = new DistributeFeedFrameWriter(ctx, connectionId.getFeedId(), writer, runtimeType, partition, new FrameTupleAccessor(recordDesc), feedManager); @@ -157,6 +163,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { try { + FeedUtils.processFeedMessage(buffer, message, fta); inputSideHandler.nextFrame(buffer); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java index c596671..018aeaa 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java @@ -33,7 +33,7 @@ import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer; import org.apache.asterix.external.feed.runtime.FeedRuntime; import org.apache.asterix.external.feed.runtime.FeedRuntimeId; -import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.asterix.external.util.FeedUtils; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; @@ -41,7 +41,6 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; -import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable { @@ -119,7 +118,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper } else { reviveOldFeedRuntime(runtimeId); } - coreOperator.open(); } catch (Exception e) { LOGGER.log(Level.WARNING, "Failed to open feed store operator", e); @@ -167,7 +165,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { try { - processFeedMessage(buffer); + FeedUtils.processFeedMessage(buffer, message, fta); inputSideHandler.nextFrame(buffer); } catch (Exception e) { e.printStackTrace(); @@ -175,18 +173,6 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper } } - private void processFeedMessage(ByteBuffer buffer) { - // read the message and reduce the number of tuples - fta.reset(buffer); - int tc = fta.getTupleCount() - 1; - int offset = fta.getTupleStartOffset(tc); - int len = fta.getTupleLength(tc); - message.clear(); - message.put(buffer.array(), offset, len); - message.flip(); - IntSerDeUtils.putInt(buffer.array(), FrameHelper.getTupleCountOffset(buffer.capacity()), tc); - } - @Override public void fail() throws HyracksDataException { if (LOGGER.isLoggable(Level.WARNING)) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java index 93aa18b..60c80f1 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/parser/ADMDataParser.java @@ -265,7 +265,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars break; } case AdmLexer.TOKEN_INT_LITERAL: { - // For an INT value without any suffix, we return it as INT64 type value since it is the default integer type. + // For an INT value without any suffix, we return it as INT64 type value since it is + // the default integer type. parseAndCastNumeric(ATypeTag.INT64, objectType, out); break; } @@ -506,7 +507,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars } else { return null; } - // return ATypeHierarchy.canPromote(expectedTypeTag, typeTag) ? typeTag : null; + // return ATypeHierarchy.canPromote(expectedTypeTag, typeTag) ? typeTag : null; } else { // union List unionList = ((AUnionType) aObjectType).getUnionList(); for (IAType t : unionList) { @@ -531,7 +532,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars BitSet nulls = null; if (recType != null) { - //TODO: use BitSet Pool + // TODO: use BitSet Pool nulls = new BitSet(recType.getFieldNames().length); recBuilder.reset(recType); } else { @@ -569,7 +570,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars admLexer.getLastTokenImage().length() - 1); fieldId = recBuilder.getFieldId(fldName); if (fieldId < 0 && !recType.isOpen()) { - throw new ParseException("This record is closed, you can not add extra fields !!"); + throw new ParseException( + "This record is closed, you can not add extra fields! new field name: " + fldName); } else if (fieldId < 0 && recType.isOpen()) { aStringFieldName.setValue(admLexer.getLastTokenImage().substring(1, admLexer.getLastTokenImage().length() - 1)); @@ -895,7 +897,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars throw new ParseException(mismatchErrorMessage + objectType.getTypeName() + mismatchErrorMessage2 + typeTag); } - // If two type tags are not the same, either we try to promote or demote source type to the target type + // If two type tags are not the same, either we try to promote or demote source type to the + // target type if (targetTypeTag != typeTag) { if (ATypeHierarchy.canPromote(typeTag, targetTypeTag)) { // can promote typeTag to targetTypeTag @@ -907,7 +910,7 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1, out); } else if (ATypeHierarchy.canDemote(typeTag, targetTypeTag)) { - //can demote source type to the target type + // can demote source type to the target type ITypeConvertComputer demoteComputer = ATypeHierarchy.getTypeDemoteComputer(typeTag, targetTypeTag); if (demoteComputer == null) { throw new ParseException("Can't cast the " + typeTag + " type to the " + targetTypeTag + " type."); @@ -942,7 +945,8 @@ public class ADMDataParser extends AbstractDataParser implements IStreamDataPars if (targetTypeTag != typeTag) { ITypeConvertComputer promoteComputer = ATypeHierarchy.getTypePromoteComputer(typeTag, targetTypeTag); - // the availability if the promote computer should be consistent with the availability of a target type + // the availability if the promote computer should be consistent with + // the availability of a target type assert promoteComputer != null; // do the promotion; note that the type tag field should be skipped promoteComputer.convertType(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1, http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java index c5b39df..efbc6bf 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java @@ -57,6 +57,7 @@ public class AdapterFactoryProvider { // Compatability adapterFactories.put(ExternalDataConstants.ADAPTER_HDFS_CLASSNAME, GenericAdapterFactory.class); adapterFactories.put(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME, GenericAdapterFactory.class); + adapterFactories.put(ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER, GenericAdapterFactory.class); return adapterFactories; } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index dfe7aed..6b4b6ba 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -28,7 +28,6 @@ import org.apache.asterix.external.api.IInputStreamProvider; import org.apache.asterix.external.api.IInputStreamProviderFactory; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.api.IRecordDataParserFactory; -import org.apache.asterix.external.api.IRecordFlowController; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.api.IStreamDataParser; @@ -39,10 +38,13 @@ import org.apache.asterix.external.dataflow.FeedStreamDataFlowController; import org.apache.asterix.external.dataflow.IndexingDataFlowController; import org.apache.asterix.external.dataflow.RecordDataFlowController; import org.apache.asterix.external.dataflow.StreamDataFlowController; +import org.apache.asterix.external.input.stream.AInputStream; import org.apache.asterix.external.util.DataflowUtils; -import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.asterix.external.util.FeedLogManager; +import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.dataflow.std.file.FileSplit; public class DataflowControllerProvider { @@ -57,52 +59,67 @@ public class DataflowControllerProvider { * else * |-a. Set stream parser * 5. start(writer) + * @param feedLogFileSplits + * @param isFeed */ + // TODO: Instead, use a factory just like data source and data parser. @SuppressWarnings({ "rawtypes", "unchecked" }) public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx, int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory, - Map configuration, boolean indexingOp) throws Exception { + Map configuration, boolean indexingOp, boolean isFeed, FileSplit[] feedLogFileSplits) + throws Exception { + FeedLogManager feedLogManager = null; + if (isFeed) { + feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits); + } switch (dataSourceFactory.getDataSourceType()) { case RECORDS: - IRecordFlowController recordDataFlowController = null; - if (indexingOp) { - recordDataFlowController = new IndexingDataFlowController(); - } else if (ExternalDataUtils.isFeed(configuration)) { - recordDataFlowController = new FeedRecordDataFlowController(); - } else { - recordDataFlowController = new RecordDataFlowController(); - } - recordDataFlowController.configure(configuration, ctx); - recordDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration)); + IDataFlowController recordDataFlowController = null; IRecordReaderFactory recordReaderFactory = (IRecordReaderFactory) dataSourceFactory; IRecordReader recordReader = recordReaderFactory.createRecordReader(ctx, partition); + recordReader.configure(configuration); IRecordDataParserFactory recordParserFactory = (IRecordDataParserFactory) dataParserFactory; IRecordDataParser dataParser = recordParserFactory.createRecordParser(ctx); dataParser.configure(configuration, recordType); - recordDataFlowController.setRecordReader(recordReader); - recordDataFlowController.setRecordParser(dataParser); + if (indexingOp) { + recordDataFlowController = new IndexingDataFlowController(dataParser, recordReader); + } else if (isFeed) { + recordDataFlowController = new FeedRecordDataFlowController(feedLogManager, dataParser, + recordReader); + } else { + recordDataFlowController = new RecordDataFlowController(dataParser, recordReader); + } + recordDataFlowController.configure(configuration, ctx); + recordDataFlowController + .setTupleForwarder(DataflowUtils.getTupleForwarder(configuration, feedLogManager)); return recordDataFlowController; case STREAM: IStreamFlowController streamDataFlowController = null; - if (ExternalDataUtils.isFeed(configuration)) { - streamDataFlowController = new FeedStreamDataFlowController(); + if (isFeed) { + streamDataFlowController = new FeedStreamDataFlowController(feedLogManager); } else { streamDataFlowController = new StreamDataFlowController(); } streamDataFlowController.configure(configuration, ctx); - streamDataFlowController.setTupleForwarder(DataflowUtils.getTupleForwarder(configuration)); + streamDataFlowController + .setTupleForwarder(DataflowUtils.getTupleForwarder(configuration, feedLogManager)); IInputStreamProviderFactory streamProviderFactory = (IInputStreamProviderFactory) dataSourceFactory; + streamProviderFactory.configure(configuration); IInputStreamProvider streamProvider = streamProviderFactory.createInputStreamProvider(ctx, partition); + streamProvider.setFeedLogManager(feedLogManager); + streamProvider.configure(configuration); IStreamDataParserFactory streamParserFactory = (IStreamDataParserFactory) dataParserFactory; streamParserFactory.configure(configuration); IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition); streamParser.configure(configuration, recordType); - streamParser.setInputStream(streamProvider.getInputStream()); + AInputStream inputStream = streamProvider.getInputStream(); + streamParser.setInputStream(inputStream); streamDataFlowController.setStreamParser(streamParser); return streamDataFlowController; default: throw new AsterixException("Unknown data source type: " + dataSourceFactory.getDataSourceType()); } } + } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index 745c653..0d65f72 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -26,11 +26,13 @@ import org.apache.asterix.external.api.IInputStreamProviderFactory; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.input.HDFSDataSourceFactory; import org.apache.asterix.external.input.record.reader.couchbase.CouchbaseReaderFactory; +import org.apache.asterix.external.input.record.reader.stream.EmptyLineSeparatedRecordReaderFactory; import org.apache.asterix.external.input.record.reader.stream.LineRecordReaderFactory; import org.apache.asterix.external.input.record.reader.stream.SemiStructuredRecordReaderFactory; import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordReaderFactory; import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamProviderFactory; import org.apache.asterix.external.input.stream.factory.SocketInputStreamProviderFactory; +import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamProviderFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; @@ -65,6 +67,9 @@ public class DatasourceFactoryProvider { case ExternalDataConstants.STREAM_SOCKET: streamFactory = new SocketInputStreamProviderFactory(); break; + case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER: + streamFactory = new TwitterFirehoseStreamProviderFactory(); + break; default: throw new AsterixException("unknown input stream factory"); } @@ -101,6 +106,11 @@ public class DatasourceFactoryProvider { case ExternalDataConstants.READER_COUCHBASE: readerFactory = new CouchbaseReaderFactory(); break; + case ExternalDataConstants.READER_LINE_SEPARATED: + readerFactory = new EmptyLineSeparatedRecordReaderFactory() + .setInputStreamFactoryProvider(DatasourceFactoryProvider.getInputStreamFactory( + ExternalDataUtils.getRecordReaderStreamName(configuration), configuration)); + break; default: throw new AsterixException("unknown record reader factory: " + reader); } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java index e604d42..cab8a69 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java @@ -43,12 +43,13 @@ public class DataflowUtils { } } - public static ITupleForwarder getTupleForwarder(Map configuration) throws AsterixException { + public static ITupleForwarder getTupleForwarder(Map configuration, FeedLogManager feedLogManager) + throws AsterixException { ITupleForwarder policy = null; ITupleForwarder.TupleForwardPolicy policyType = null; String propValue = configuration.get(ITupleForwarder.FORWARD_POLICY); if (ExternalDataUtils.isFeed(configuration)) { - //TODO pass this value in the configuration and avoid this check for feeds + // TODO pass this value in the configuration and avoid this check for feeds policyType = TupleForwardPolicy.FEED; } else if (propValue == null) { policyType = TupleForwardPolicy.FRAME_FULL; @@ -57,7 +58,7 @@ public class DataflowUtils { } switch (policyType) { case FEED: - policy = new FeedTupleForwarder(); + policy = new FeedTupleForwarder(feedLogManager); break; case FRAME_FULL: policy = new FrameFullTupleForwarder(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java index 7bfe698..035c1c3 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java @@ -48,7 +48,7 @@ public class ExternalDataCompatibilityUtils { } } - //TODO:Add remaining aliases + // TODO:Add remaining aliases public static void addCompatabilityParameters(String adapterName, ARecordType itemType, Map configuration) throws AsterixException { // HDFS @@ -71,22 +71,29 @@ public class ExternalDataCompatibilityUtils { if (adapterName.equals(ExternalDataConstants.ALIAS_LOCALFS_ADAPTER) || adapterName.contains(ExternalDataConstants.ADAPTER_LOCALFS_CLASSNAME) || adapterName.contains(ExternalDataConstants.ALIAS_LOCALFS_PUSH_ADAPTER)) { - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { - throw new AsterixException("Unspecified format parameter for local file system adapter"); + if (configuration.get(ExternalDataConstants.KEY_READER) == null) { + if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + // If reader is specified, we will use the selected reader. If format is + // specified, we will assign a suitable reader for the format. + // TODO: better error message + throw new AsterixException( + "Unspecified (\"reader\" or \"format\") parameter for local filesystem adapter"); + } + configuration.put(ExternalDataConstants.KEY_READER, + configuration.get(ExternalDataConstants.KEY_FORMAT)); + configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER); } - configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT)); - configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.ALIAS_LOCALFS_ADAPTER); } // Socket - if (adapterName.equalsIgnoreCase(ExternalDataConstants.ALIAS_SOCKET_ADAPTER)) { + if (adapterName.equalsIgnoreCase(ExternalDataConstants.ALIAS_SOCKET_ADAPTER) + || adapterName.equalsIgnoreCase(ExternalDataConstants.ALIAS_SOCKET_CLIENT_ADAPTER)) { if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { throw new AsterixException("Unspecified format parameter for socket adapter"); } configuration.put(ExternalDataConstants.KEY_READER, configuration.get(ExternalDataConstants.KEY_FORMAT)); configuration.put(ExternalDataConstants.KEY_READER_STREAM, ExternalDataConstants.STREAM_SOCKET); } - // Twitter (Pull) if (adapterName.equals(ExternalDataConstants.ALIAS_TWITTER_PULL_ADAPTER)) { configuration.put(ExternalDataConstants.KEY_READER, ExternalDataConstants.READER_TWITTER_PULL); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 9bac07c..4b2826c 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -118,6 +118,7 @@ public class ExternalDataConstants { public static final String READER_DELIMITED = "delimited-text"; public static final String READER_TWITTER_PUSH = "twitter-push"; public static final String READER_TWITTER_PULL = "twitter-pull"; + public static final String READER_LINE_SEPARATED = "line-separated"; public static final String CLUSTER_LOCATIONS = "cluster-locations"; public static final String SCHEDULER = "hdfs-scheduler"; @@ -204,4 +205,5 @@ public class ExternalDataConstants { * Expected parameter values */ public static final String PARAMETER_OF_SIZE_ONE = "Value of size 1"; + public static final String LARGE_RECORD_ERROR_MESSAGE = "Record is too large"; } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 7c03e4d..c36b629 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -93,7 +93,8 @@ public class ExternalDataUtils { } public static boolean isExternal(String aString) { - return (aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) && aString.trim().length() > 1); + return (aString != null && aString.contains(ExternalDataConstants.EXTERNAL_LIBRARY_SEPARATOR) + && aString.trim().length() > 1); } public static ClassLoader getClassLoader(String dataverse, String library) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java index 72b438d..4737727 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java @@ -37,7 +37,8 @@ public class FeedLogManager { START, // partition start END, // partition end COMMIT, // a record commit within a partition - SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored + SNAPSHOT // an identifier that partitions with identifiers before this one should be + // ignored } public static final String PROGRESS_LOG_FILE_NAME = "progress.log"; @@ -52,10 +53,15 @@ public class FeedLogManager { private BufferedWriter progressLogger; private BufferedWriter errorLogger; private BufferedWriter recordLogger; + private StringBuilder stringBuilder = new StringBuilder(); - public FeedLogManager(File file) { + public FeedLogManager(File file) throws IOException { this.dir = file.toPath(); this.completed = new TreeSet(); + if (!exists()) { + create(); + } + open(); } public void endPartition() throws IOException { @@ -124,22 +130,31 @@ public class FeedLogManager { } public void logProgress(String log) throws IOException { - progressLogger.write(log); - progressLogger.newLine(); + stringBuilder.setLength(0); + stringBuilder.append(log); + stringBuilder.append(ExternalDataConstants.LF); + progressLogger.write(stringBuilder.toString()); + progressLogger.flush(); } public void logError(String error, Throwable th) throws IOException { - errorLogger.append(error); - errorLogger.newLine(); - errorLogger.append(th.toString()); - errorLogger.newLine(); + stringBuilder.setLength(0); + stringBuilder.append(error); + stringBuilder.append(ExternalDataConstants.LF); + stringBuilder.append(th.toString()); + stringBuilder.append(ExternalDataConstants.LF); + errorLogger.write(stringBuilder.toString()); + errorLogger.flush(); } - public void logRecord(String record, Exception e) throws IOException { - recordLogger.append(record); - recordLogger.newLine(); - recordLogger.append(e.toString()); - recordLogger.newLine(); + public void logRecord(String record, String errorMessage) throws IOException { + stringBuilder.setLength(0); + stringBuilder.append(record); + stringBuilder.append(ExternalDataConstants.LF); + stringBuilder.append(errorMessage); + stringBuilder.append(ExternalDataConstants.LF); + recordLogger.write(stringBuilder.toString()); + recordLogger.flush(); } public static String getSplitId(String log) { http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index 224ee31..c128545 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -19,6 +19,8 @@ package org.apache.asterix.external.util; import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -29,8 +31,12 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.util.IntSerDeUtils; import org.apache.hyracks.dataflow.std.file.FileSplit; public class FeedUtils { @@ -38,13 +44,27 @@ public class FeedUtils { return dataverseName + File.separator + feedName; } + public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, String nodeName, int partition) { + File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); + String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); + ClusterPartition nodePartition = AsterixClusterProperties.INSTANCE.getNodePartitions(nodeName)[0]; + String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName, + nodePartition.getPartitionId()); + // format: 'storage dir name'/partition_#/dataverse/feed/adapter_# + File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + + StoragePathUtil.ADAPTER_INSTANCE_PREFIX + partition); + return new FileSplit[] { StoragePathUtil.getFileSplitForClusterPartition(nodePartition, f) }; + } + public static FileSplit[] splitsForAdapter(String dataverseName, String feedName, AlgebricksPartitionConstraint partitionConstraints) throws Exception { File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); + String[] locations = null; if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) { throw new AlgebricksException("Can't create file splits for adapter with count partitioning constraints"); + } else { + locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); } - String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations(); List splits = new ArrayList(); String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); int i = 0; @@ -66,4 +86,22 @@ public class FeedUtils { return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath); } + public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition, + FileSplit[] feedLogFileSplits) throws IOException { + return new FeedLogManager( + FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(), + feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager()).getFile()); + } + + public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) { + // read the message and reduce the number of tuples + fta.reset(input); + int tc = fta.getTupleCount() - 1; + int offset = fta.getTupleStartOffset(tc); + int len = fta.getTupleLength(tc); + message.clear(); + message.put(input.array(), offset, len); + message.flip(); + IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc); + } } http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java index 631eef4..2e3b8ec 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FileSystemWatcher.java @@ -47,23 +47,25 @@ public class FileSystemWatcher { private final LinkedList files = new LinkedList(); private Iterator it; private final String expression; - private final FeedLogManager logManager; + private FeedLogManager logManager; private final Path path; private final boolean isFeed; private boolean done; private File current; private AbstractFeedDataFlowController controller; - public FileSystemWatcher(FeedLogManager logManager, Path inputResource, String expression, boolean isFeed) - throws IOException { + public FileSystemWatcher(Path inputResource, String expression, boolean isFeed) throws IOException { this.watcher = isFeed ? FileSystems.getDefault().newWatchService() : null; this.keys = isFeed ? new HashMap() : null; - this.logManager = logManager; this.expression = expression; this.path = inputResource; this.isFeed = isFeed; } + public void setFeedLogManager(FeedLogManager feedLogManager) { + this.logManager = feedLogManager; + } + public void init() throws IOException { LinkedList dirs = null; dirs = new LinkedList(); @@ -91,13 +93,6 @@ public class FileSystemWatcher { if (logManager == null) { return; } - if (logManager.exists()) { - logManager.open(); - } else { - logManager.create(); - logManager.open(); - return; - } /* * Done processing the progress log file. We now have: * the files that were completed. @@ -210,6 +205,9 @@ public class FileSystemWatcher { return false; } files.clear(); + if (keys.isEmpty()) { + return false; + } // Read new Events (Polling first to add all available files) WatchKey key; key = watcher.poll(); http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java new file mode 100644 index 0000000..6722a83 --- /dev/null +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableCharArrayString.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.classad; + +public class AMutableCharArrayString implements Comparable, CharSequence { + private char[] value; + private int length; + private int increment = 64; + + public AMutableCharArrayString(String str) { + this.value = str.toCharArray(); + this.length = value.length; + } + + public void decrementLength() { + length--; + } + + public AMutableCharArrayString() { + length = 0; + value = new char[increment]; + } + + @Override + public char charAt(int i) { + return value[i]; + } + + @Override + public String toString() { + return String.valueOf(value, 0, length); + } + + public AMutableCharArrayString(char[] value, int length) { + this.value = value; + this.length = length; + } + + public AMutableCharArrayString(AMutableCharArrayString aMutableCharArrayString) { + this.value = new char[aMutableCharArrayString.length]; + setValue(aMutableCharArrayString); + } + + public AMutableCharArrayString(int iniitialSize) { + this.value = new char[iniitialSize]; + this.length = 0; + } + + private void expand() { + char[] tmpValue = new char[length + increment]; + System.arraycopy(value, 0, tmpValue, 0, length); + value = tmpValue; + } + + private void copyAndExpand(int newSize) { + char[] tmpValue = new char[newSize]; + System.arraycopy(value, 0, tmpValue, 0, length); + value = tmpValue; + } + + public void appendChar(char aChar) { + if (length == value.length) { + expand(); + } + value[length] = aChar; + length++; + } + + public void erase(int position) { + if (position != length - 1) { + System.arraycopy(value, position + 1, value, position, length - (position + 1)); + } + length--; + } + + public void setLength(int l) { + this.length = l; + } + + public void setValue(AMutableCharArrayString otherString) { + if (otherString.length > value.length) { + // need to reallocate + value = new char[otherString.length]; + } + System.arraycopy(otherString.value, 0, value, 0, otherString.length); + this.length = otherString.length; + } + + public void setValue(String format) { + reset(); + appendString(format); + } + + public void reset() { + this.length = 0; + } + + public void setValue(AMutableCharArrayString otherString, int length) { + if (length > value.length) { + // need to reallocate + value = new char[length]; + } + System.arraycopy(otherString.value, 0, value, 0, length); + this.length = length; + } + + public void setValue(String otherString, int length) { + if (length > value.length) { + // need to reallocate + value = new char[length]; + } + otherString.getChars(0, otherString.length(), value, 0); + this.length = length; + } + + public void copyValue(char[] value, int length) { + if (length > this.value.length) { + // need to reallocate + this.value = new char[length]; + } + System.arraycopy(value, 0, this.value, 0, length); + this.length = length; + } + + public void setString(char[] value, int length) { + this.value = value; + this.length = length; + } + + public void setChar(int i, char ch) { + value[i] = ch; + } + + public void incrementLength() { + if (value.length == length) { + expand(); + } + length++; + } + + public boolean isEqualsIgnoreCaseLower(char[] compareTo) { + if (length == compareTo.length) { + for (int i = 0; i < length; i++) { + if (compareTo[i] != Character.toLowerCase(value[i])) { + return false; + } + } + return true; + } + return false; + } + + public void appendString(String aString) { + if (value.length - length < aString.length()) { + copyAndExpand(value.length + aString.length()); + } + aString.getChars(0, aString.length(), value, length); + length += aString.length(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof AMutableCharArrayString) { + AMutableCharArrayString s = (AMutableCharArrayString) o; + if (length == s.length) { + for (int i = 0; i < length; i++) { + if (value[i] != s.value[i]) { + return false; + } + } + return true; + } + } + return false; + } + + public boolean equalsIgnoreCase(Object o) { + if (o instanceof AMutableCharArrayString) { + AMutableCharArrayString s = (AMutableCharArrayString) o; + if (length == s.length) { + for (int i = 0; i < length; i++) { + if (Character.toLowerCase(value[i]) != Character.toLowerCase(s.value[i])) { + return false; + } + } + return true; + } + } + return false; + } + + public boolean equalsString(String aString) { + if (length == aString.length()) { + for (int i = 0; i < length; i++) { + if (value[i] != aString.charAt(i)) { + return false; + } + } + return true; + } + return false; + } + + public void erase(int position, int length) { + if (length + position >= this.length) { + this.length -= length; + } else { + System.arraycopy(value, position + length, value, position, this.length - (position + length)); + this.length -= length; + } + } + + public String substr(int i, int len) { + return String.copyValueOf(value, i, len); + } + + public int firstNonDigitChar() { + for (int i = 0; i < length; i++) { + if (!Character.isDigit(value[i])) { + return i; + } + } + return -1; + } + + public int fistNonDoubleDigitChar() { + boolean inFraction = false; + boolean prevCharIsPoint = false; + for (int i = 0; i < length; i++) { + if (!Character.isDigit(value[i])) { + if (inFraction) { + if (prevCharIsPoint) { + return i - 1; + } else { + return i; + } + } else { + if (value[i] == '.') { + inFraction = true; + prevCharIsPoint = true; + } + } + } else { + prevCharIsPoint = false; + } + } + return -1; + } + + @Override + public int compareTo(AMutableCharArrayString o) { + return toString().compareTo(o.toString()); + } + + public int compareTo(String o) { + return toString().compareTo(o); + } + + public int compareToIgnoreCase(AMutableCharArrayString o) { + return toString().compareToIgnoreCase(o.toString()); + } + + public void appendString(AMutableCharArrayString aString) { + if (value.length - length < aString.length()) { + copyAndExpand(value.length + aString.length()); + } + System.arraycopy(aString.value, 0, value, length, aString.length); + length += aString.length(); + } + + @Override + public int length() { + return length; + } + + public int size() { + return length; + } + + @Override + public CharSequence subSequence(int start, int end) { + return substr(start, end - start); + } + + public int firstIndexOf(char delim) { + return firstIndexOf(delim, 0); + } + + public int firstIndexOf(char delim, int startIndex) { + int position = startIndex; + while (position < length) { + if (value[position] == delim) { + return position; + } + position++; + } + return -1; + } + + public String substr(int lastIndex) { + return String.copyValueOf(value, lastIndex, length - lastIndex); + } + + public void prependChar(char c) { + if (value.length == length) { + copyAndExpand(value.length * 2); + } + System.arraycopy(value, 0, value, 1, length); + value[0] = c; + length += 1; + } + + public void insert(int i, String aString) { + if (value.length - length < aString.length()) { + copyAndExpand(value.length + aString.length()); + } + System.arraycopy(value, i, value, i + aString.length(), aString.length()); + aString.getChars(0, aString.length(), value, i); + length += aString.length(); + } + + public char[] getValue() { + return value; + } + + public int getLength() { + return length; + } + + public int getIncrement() { + return increment; + } + + public void setValue(char[] value) { + this.value = value; + } + + public void setIncrement(int increment) { + this.increment = increment; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java new file mode 100644 index 0000000..5c2a2f9 --- /dev/null +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AMutableNumberFactor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.classad; + +import org.apache.asterix.external.classad.Value.NumberFactor; + +public class AMutableNumberFactor { + private NumberFactor factor; + + public AMutableNumberFactor() { + factor = NumberFactor.NO_FACTOR; + } + + public NumberFactor getFactor() { + return factor; + } + + public void setFactor(NumberFactor factor) { + this.factor = factor; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ac683db0/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java new file mode 100644 index 0000000..04fe6ec --- /dev/null +++ b/asterix-external-data/src/test/java/org/apache/asterix/external/classad/AttributeReference.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.classad; + +import org.apache.asterix.om.base.AMutableInt32; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class AttributeReference extends ExprTree { + + private ExprTree expr; + private boolean absolute; + private AMutableCharArrayString attributeStr; + private ClassAd current = new ClassAd(false, false); + private ExprList adList = new ExprList(); + private Value val = new Value(); + private MutableBoolean rVal = new MutableBoolean(false); + private AttributeReference tempAttrRef; + private EvalState tstate = new EvalState(); + + public ExprTree getExpr() { + return expr; + } + + public void setExpr(ExprTree expr) { + this.expr = expr == null ? null : expr.self(); + } + + public AttributeReference() { + expr = null; + attributeStr = null; + absolute = false; + } + + /// Copy Constructor + public AttributeReference(AttributeReference ref) throws HyracksDataException { + copyFrom(ref); + } + + /// Assignment operator + @Override + public boolean equals(Object o) { + if (o instanceof AttributeReference) { + AttributeReference ref = (AttributeReference) o; + return sameAs(ref); + } + return false; + } + + /// node type + @Override + public NodeKind getKind() { + return NodeKind.ATTRREF_NODE; + } + + public static AttributeReference createAttributeReference(ExprTree expr, AMutableCharArrayString attrName) { + return createAttributeReference(expr, attrName, false); + } + + /** + * Return a copy of this attribute reference. + * + * @throws HyracksDataException + */ + @Override + public ExprTree copy() throws HyracksDataException { + AttributeReference newTree = new AttributeReference(); + newTree.copyFrom(this); + return newTree; + } + + /** + * Copy from the given reference into this reference. + * + * @param ref + * The reference to copy from. + * @return true if the copy succeeded, false otherwise. + * @throws HyracksDataException + */ + public boolean copyFrom(AttributeReference ref) throws HyracksDataException { + if (attributeStr == null) { + attributeStr = new AMutableCharArrayString(ref.attributeStr); + } else { + attributeStr.setValue(ref.attributeStr); + } + if (ref.expr != null) { + expr = ref.expr.copy(); + } + super.copyFrom(ref); + this.absolute = ref.absolute; + return true; + } + + /** + * Is this attribute reference the same as another? + * + * @param tree + * The reference to compare with + * @return true if they are the same, false otherwise. + */ + @Override + public boolean sameAs(ExprTree tree) { + boolean is_same; + ExprTree pSelfTree = tree.self(); + if (this == pSelfTree) { + is_same = true; + } else if (pSelfTree.getKind() != NodeKind.ATTRREF_NODE) { + is_same = false; + } else { + AttributeReference other_ref = (AttributeReference) pSelfTree; + if (absolute != other_ref.absolute || !attributeStr.equals(other_ref.attributeStr)) { + is_same = false; + } else if ((expr == null && other_ref.expr == null) || (expr.equals(other_ref.expr)) + || (expr != null && other_ref.expr != null && ((AttributeReference) expr).sameAs(other_ref.expr))) { + // Will this check result in infinite recursion? How do I stop it? + is_same = true; + } else { + is_same = false; + } + } + return is_same; + } + + // a private ctor for use in significant expr identification + private AttributeReference(ExprTree tree, AMutableCharArrayString attrname, boolean absolut) { + attributeStr = attrname; + expr = tree == null ? null : tree.self(); + absolute = absolut; + } + + @Override + public void privateSetParentScope(ClassAd parent) { + if (expr != null) { + expr.setParentScope(parent); + } + } + + public void getComponents(ExprTreeHolder tree, AMutableCharArrayString attr, MutableBoolean abs) + throws HyracksDataException { + tree.copyFrom(expr); + attr.setValue(attributeStr); + abs.setValue(absolute); + } + + public EvalResult findExpr(EvalState state, ExprTreeHolder tree, ExprTreeHolder sig, boolean wantSig) + throws HyracksDataException { + // establish starting point for search + if (expr == null) { + // "attr" and ".attr" + current = absolute ? state.getRootAd() : state.getCurAd(); + if (absolute && (current == null)) { // NAC - circularity so no root + return EvalResult.EVAL_FAIL; // NAC + } // NAC + } else { + // "expr.attr" + rVal.setValue(wantSig ? expr.publicEvaluate(state, val, sig) : expr.publicEvaluate(state, val)); + if (!rVal.booleanValue()) { + return (EvalResult.EVAL_FAIL); + } + + if (val.isUndefinedValue()) { + return (EvalResult.EVAL_UNDEF); + } else if (val.isErrorValue()) { + return (EvalResult.EVAL_ERROR); + } + + if (!val.isClassAdValue(current) && !val.isListValue(adList)) { + return (EvalResult.EVAL_ERROR); + } + } + + if (val.isListValue()) { + ExprList eList = new ExprList(); + // + // iterate through exprList and apply attribute reference + // to each exprTree + for (ExprTree currExpr : adList.getExprList()) { + if (currExpr == null) { + return (EvalResult.EVAL_FAIL); + } else { + if (tempAttrRef == null) { + tempAttrRef = new AttributeReference(); + } else { + tempAttrRef.reset(); + } + createAttributeReference(currExpr.copy(), attributeStr, false, tempAttrRef); + val.clear(); + // Create new EvalState, within this scope, because + // attrRef is only temporary, so we do not want to + // cache the evaluated result in the outer state object. + tstate.reset(); + tstate.setScopes(state.getCurAd()); + rVal.setValue(wantSig ? tempAttrRef.publicEvaluate(tstate, val, sig) + : tempAttrRef.publicEvaluate(tstate, val)); + if (!rVal.booleanValue()) { + return (EvalResult.EVAL_FAIL); + } + + ClassAd evaledAd = new ClassAd(); + ExprList evaledList = new ExprList(); + if (val.isClassAdValue(evaledAd)) { + eList.add(evaledAd); + continue; + } else if (val.isListValue(evaledList)) { + eList.add(evaledList.copy()); + continue; + } else { + eList.add(Literal.createLiteral(val)); + } + } + } + tree.setInnerTree(ExprList.createExprList(eList)); + ClassAd newRoot = new ClassAd(); + tree.setParentScope(newRoot); + return EvalResult.EVAL_OK; + } + // lookup with scope; this may side-affect state + + /* ClassAd::alternateScope is intended for transitioning Condor from + * old to new ClassAds. It allows unscoped attribute references + * in expressions that can't be found in the local scope to be + * looked for in an alternate scope. In Condor, the alternate + * scope is the Target ad in matchmaking. + * Expect alternateScope to be removed from a future release. + */ + if (current == null) { + return EvalResult.EVAL_UNDEF; + } + int rc = current.lookupInScope(attributeStr.toString(), tree, state); + if (expr == null && !absolute && rc == EvalResult.EVAL_UNDEF.ordinal() && current.getAlternateScope() != null) { + rc = current.getAlternateScope().lookupInScope(attributeStr.toString(), tree, state); + } + return EvalResult.values()[rc]; + } + + @Override + public boolean publicEvaluate(EvalState state, Value val) throws HyracksDataException { + ExprTreeHolder tree = new ExprTreeHolder(); + ExprTreeHolder dummy = new ExprTreeHolder(); + ClassAd curAd = new ClassAd(state.getCurAd()); + boolean rval; + // find the expression and the evalstate + switch (findExpr(state, tree, dummy, false)) { + case EVAL_FAIL: + return false; + case EVAL_ERROR: + val.setErrorValue(); + state.setCurAd(curAd); + return true; + case EVAL_UNDEF: + val.setUndefinedValue(); + state.setCurAd(curAd); + return true; + case EVAL_OK: { + if (state.getDepthRemaining() <= 0) { + val.setErrorValue(); + state.setCurAd(curAd); + return false; + } + state.decrementDepth(); + rval = tree.publicEvaluate(state, val); + state.incrementDepth(); + state.getCurAd().setValue(curAd); + return rval; + } + default: + throw new HyracksDataException("ClassAd: Should not reach here"); + } + } + + @Override + public boolean privateEvaluate(EvalState state, Value val, ExprTreeHolder sig) throws HyracksDataException { + ExprTreeHolder tree = new ExprTreeHolder(); + ExprTreeHolder exprSig = new ExprTreeHolder(); + ClassAd curAd = new ClassAd(state.getCurAd()); + MutableBoolean rval = new MutableBoolean(true); + switch (findExpr(state, tree, exprSig, true)) { + case EVAL_FAIL: + rval.setValue(false); + break; + case EVAL_ERROR: + val.setErrorValue(); + break; + case EVAL_UNDEF: + val.setUndefinedValue(); + break; + case EVAL_OK: { + if (state.getDepthRemaining() <= 0) { + val.setErrorValue(); + state.getCurAd().setValue(curAd); + return false; + } + state.decrementDepth(); + rval.setValue(tree.publicEvaluate(state, val)); + state.incrementDepth(); + break; + } + default: + throw new HyracksDataException("ClassAd: Should not reach here"); + } + sig.setInnerTree((new AttributeReference(exprSig, attributeStr, absolute))); + state.getCurAd().setValue(curAd); + return rval.booleanValue(); + } + + @Override + public boolean privateFlatten(EvalState state, Value val, ExprTreeHolder ntree, AMutableInt32 op) + throws HyracksDataException { + ExprTreeHolder tree = new ExprTreeHolder(); + ExprTreeHolder dummy = new ExprTreeHolder(); + ClassAd curAd; + boolean rval; + ntree.setInnerTree(null); // Just to be safe... wenger 2003-12-11. + // find the expression and the evalstate + curAd = state.getCurAd(); + switch (findExpr(state, tree, dummy, false)) { + case EVAL_FAIL: + return false; + case EVAL_ERROR: + val.setErrorValue(); + state.getCurAd().setValue(curAd); + return true; + case EVAL_UNDEF: + if (expr != null && state.isFlattenAndInline()) { + ExprTreeHolder expr_ntree = new ExprTreeHolder(); + Value expr_val = new Value(); + if (state.getDepthRemaining() <= 0) { + val.setErrorValue(); + state.getCurAd().setValue(curAd); + return false; + } + state.decrementDepth(); + rval = expr.publicFlatten(state, expr_val, expr_ntree); + state.incrementDepth(); + if (rval && expr_ntree.getInnerTree() != null) { + ntree.setInnerTree(createAttributeReference(expr_ntree, attributeStr)); + if (ntree.getInnerTree() != null) { + state.getCurAd().setValue(curAd); + return true; + } + } + } + ntree.setInnerTree(copy()); + state.getCurAd().setValue(curAd); + return true; + case EVAL_OK: { + // Don't flatten or inline a classad that's referred to + // by an attribute. + if (tree.getKind() == NodeKind.CLASSAD_NODE) { + ntree.setInnerTree(copy()); + val.setUndefinedValue(); + return true; + } + + if (state.getDepthRemaining() <= 0) { + val.setErrorValue(); + state.getCurAd().setValue(curAd); + return false; + } + state.decrementDepth(); + + rval = tree.publicFlatten(state, val, ntree); + state.incrementDepth(); + + // don't inline if it didn't flatten to a value, and clear cache + // do inline if FlattenAndInline was called + if (ntree.getInnerTree() != null) { + if (state.isFlattenAndInline()) { // NAC + return true; // NAC + } // NAC + ntree.setInnerTree(copy()); + val.setUndefinedValue(); + } + + state.getCurAd().setValue(curAd); + return rval; + } + default: + throw new HyracksDataException("ClassAd: Should not reach here"); + } + } + + /** + * Factory method to create attribute reference nodes. + * + * @param expr + * The expression part of the reference (i.e., in + * case of expr.attr). This parameter is NULL if the reference + * is absolute (i.e., .attr) or simple (i.e., attr). + * @param attrName + * The name of the reference. This string is + * duplicated internally. + * @param absolute + * True if the reference is an absolute reference + * (i.e., in case of .attr). This parameter cannot be true if + * expr is not NULL, default value is false; + */ + public static AttributeReference createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr, + boolean absolut) { + return (new AttributeReference(tree, attrStr, absolut)); + } + + public void setValue(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut) { + this.absolute = absolut; + this.attributeStr = attrStr; + this.expr = tree == null ? null : tree.self(); + } + + public static void createAttributeReference(ExprTree tree, AMutableCharArrayString attrStr, boolean absolut, + AttributeReference ref) { + ref.setValue(tree, attrStr, absolut); + } + + @Override + public boolean privateEvaluate(EvalState state, Value val) throws HyracksDataException { + ExprTreeHolder tree = new ExprTreeHolder(); + ExprTreeHolder dummy = new ExprTreeHolder(); + ClassAd curAd; + boolean rval; + + // find the expression and the evalstate + curAd = state.getCurAd(); + switch (findExpr(state, tree, dummy, false)) { + case EVAL_FAIL: + return false; + case EVAL_ERROR: + val.setErrorValue(); + state.getCurAd().setValue(curAd); + return true; + case EVAL_UNDEF: + val.setUndefinedValue(); + state.getCurAd().setValue(curAd); + return true; + + case EVAL_OK: { + if (state.getDepthRemaining() <= 0) { + val.setErrorValue(); + state.getCurAd().setValue(curAd); + return false; + } + state.decrementDepth(); + rval = tree.publicEvaluate(state, val); + state.incrementDepth(); + state.getCurAd().setValue(curAd); + return rval; + } + default: + throw new HyracksDataException("ClassAd: Should not reach here"); + } + } + + @Override + public void reset() { + if (expr != null) { + expr.reset(); + } + } +}