drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hnfgns <...@git.apache.org>
Subject [GitHub] drill pull request: DRILL-4313: Improve method of picking a random...
Date Mon, 07 Mar 2016 23:12:50 GMT
Github user hnfgns commented on a diff in the pull request:

    https://github.com/apache/drill/pull/396#discussion_r55290473
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -1392,6 +1390,206 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
     }
     
    +
    +connectionStatus_t PooledDrillClientImpl::connect(const char* connStr){
    +    connectionStatus_t stat = CONN_SUCCESS;
    +    std::string pathToDrill, protocol, hostPortStr;
    +    std::string host;
    +    std::string port;
    +    m_connectStr=connStr;
    +    Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
    +    if(!strcmp(protocol.c_str(), "zk")){
    +        // Get a list of drillbits
    +        ZookeeperImpl zook;
    +        std::vector<std::string> drillbits;
    +        int err = zook.getAllDrillbits(hostPortStr.c_str(), pathToDrill.c_str(), drillbits);
    +        if(!err){
    +            Utils::shuffle(drillbits);
    +            // The original shuffled order is maintained if we shuffle first and then
add any missing elements
    +            Utils::add(m_drillbits, drillbits);
    +            exec::DrillbitEndpoint e;
    +            size_t nextIndex=0;
    +            {
    +                boost::lock_guard<boost::mutex> cLock(m_cMutex);
    +                m_lastConnection++;
    +                nextIndex = (m_lastConnection)%(getDrillbitCount());
    +            }
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
    +                    << "(" << (void*)this << ")"
    +                    << ": Current counter is: " 
    +                    << m_lastConnection << std::endl;)
    +                err=zook.getEndPoint(m_drillbits, nextIndex, e);
    +            if(!err){
    +                host=boost::lexical_cast<std::string>(e.address());
    +                port=boost::lexical_cast<std::string>(e.user_port());
    +            }
    +        }
    +        if(err){
    +            return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER,
zook.getError().c_str()));
    +        }
    +        zook.close();
    +        m_bIsDirectConnection=false;
    +    }else if(!strcmp(protocol.c_str(), "local")){
    +        char tempStr[MAX_CONNECT_STR+1];
    +        strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
    +        host=strtok(tempStr, ":");
    +        port=strtok(NULL, "");
    +        m_bIsDirectConnection=true;
    +    }else{
    +        return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " <<
host << ":" << port << std::endl;)
    +        DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
    +    stat =  pDrillClientImpl->connect(host.c_str(), port.c_str());
    +    if(stat == CONN_SUCCESS){
    +        boost::lock_guard<boost::mutex> lock(m_poolMutex);
    +        m_clientConnections.push_back(pDrillClientImpl);
    +    }else{
    +        DrillClientError* pErr = pDrillClientImpl->getError();
    +        handleConnError((connectionStatus_t)pErr->status, pErr->msg);
    +        delete pDrillClientImpl;
    +    }
    +    return stat;
    +}
    +
    +connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
    +    // Assume there is one valid connection to at least one drillbit
    +    connectionStatus_t stat=CONN_FAILURE;
    +    // Keep a copy of the user properties
    +    if(props!=NULL){
    +        m_pUserProperties = new DrillUserProperties;
    +        for(size_t i=0; i<props->size(); i++){
    +            m_pUserProperties->setProperty(
    +                    props->keyAt(i),
    +                    props->valueAt(i)
    +                    );
    +        }
    +    }
    +    DrillClientImpl* pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) "
<< pDrillClientImpl->m_connectedHost << std::endl;)
    +        stat=pDrillClientImpl->validateHandshake(m_pUserProperties);
    +    }
    +    else{
    +        stat =  handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
    +    }
    +    return stat;
    +}
    +
    +DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType
t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
    +    DrillClientQueryResult* pDrillClientQueryResult = NULL;
    +    DrillClientImpl* pDrillClientImpl = NULL;
    +    pDrillClientImpl = getOneConnection();
    +    if(pDrillClientImpl != NULL){
    +        pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
    +        m_queriesExecuted++;
    +    }
    +    return pDrillClientQueryResult;
    +}
    +
    +void PooledDrillClientImpl::freeQueryResources(DrillClientQueryResult* pQryResult){
    +    // Nothing to do. If this class ever keeps track of executing queries then it will
need 
    +    // to implement this call to free any query specific resources the pool might have

    +    // allocated
    +    return;
    +}
    +
    +bool PooledDrillClientImpl::Active(){
    +    boost::lock_guard<boost::mutex> lock(m_poolMutex);
    +    for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin();
it != m_clientConnections.end(); ++it){
    +        if((*it)->Active()){
    +            return true;
    +        }
    +    }
    +    return false;
    +}
    +
    +void PooledDrillClientImpl::Close() {
    +    boost::lock_guard<boost::mutex> lock(m_poolMutex);
    --- End diff --
    
    It is reasonable to assume that an instance will be reused following a call to #Close().
So we need to reset all variables here in #Close(): m_drillbits, m_lastConnection...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message