singa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wang...@apache.org
Subject [07/12] incubator-singa git commit: Transfer code from nusinga repo to singa apache repo. New commuinication framework is implemented to unify the frameworks of existing distributed deep learning systems. Communication is now implmented using ZeroMQ. API
Date Sun, 03 May 2015 14:04:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/mshadow/tensor_random.h
----------------------------------------------------------------------
diff --git a/include/mshadow/tensor_random.h b/include/mshadow/tensor_random.h
new file mode 100644
index 0000000..b3f0b84
--- /dev/null
+++ b/include/mshadow/tensor_random.h
@@ -0,0 +1,299 @@
+#ifndef MSHADOW_TENSOR_RANDOM_H
+#define MSHADOW_TENSOR_RANDOM_H
+/*!
+ *  \file tensor_random.h
+ *  \brief Random inline functions for tensor.
+ *  \author Bing Xu, Tianqi Chen
+ *   Based on curand|MKL|stdlib
+ */
+#include <cstdlib>
+#include "tensor.h"
+#include "tensor_container.h"
+
+namespace mshadow {
+    /*! 
+     * \brief random number generator 
+     * \tparam Device the device of random number generator
+     */
+    template<typename Device>
+    class Random {};
+
+    /*! \brief CPU random number generator */
+    template<>
+    class Random<cpu> {
+    public:
+        /*!
+         * \brief constructor of random engine
+         * \param seed random number seed
+         */
+        Random<cpu>( int seed ){
+            #if MSHADOW_USE_MKL
+            int status = vslNewStream(&vStream_, VSL_BRNG_MT19937, seed);
+            utils::Assert( status == VSL_STATUS_OK, "MKL VSL Random engine failed to be initialized.\n" );
+            #else
+            srand(seed);
+            #endif
+            buffer_.Resize( Shape1( kRandBufferSize ) );
+        }
+        ~Random<cpu>() {
+            #if MSHADOW_USE_MKL
+            vslDeleteStream(&vStream_);
+            #endif
+        }
+        /*!
+         * \brief seed random number generator using this seed
+         * \param seed seed of prng
+         */
+        inline void Seed( int seed ){
+            #if MSHADOW_USE_MKL
+            int status = vslDeleteStream(&vStream_);
+            utils::Assert(status == VSL_STATUS_OK);
+            status = vslNewStream(&vStream_, VSL_BRNG_MT19937, seed);
+            utils::Assert(status == VSL_STATUS_OK);
+            #else
+            srand( seed );
+            #endif
+        }
+        /*!
+         * \brief generate data from uniform [a,b)
+         * \param dst destination
+         * \param a lower bound of uniform
+         * \param b upper bound of uniform
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline void SampleUniform( Tensor<cpu, dim> &dst, real_t a=0.0f, real_t b=1.0f ) {
+            Tensor<cpu, 2> mat = dst.FlatTo2D();
+            for ( index_t i = 0; i < mat.shape[1]; ++i ) {
+                #if MSHADOW_USE_MKL
+                #if MSHADOW_SINGLE_PRECISION
+                int status = vsRngUniform( 0, vStream_, mat.shape[0], mat[i].dptr, a, b );
+                #else
+                int status = vdRngUniform( 0, vStream_, mat.shape[0], mat[i].dptr, a, b );
+                #endif
+                utils::Assert(status == VSL_STATUS_OK, "Failed to generate random number by MKL.\n" );
+                #else
+                // use stdlib
+                for ( index_t j = 0; j < mat.shape[0]; ++j ) {
+                    mat[i][j] = this->RandNext()*(b-a) + a;
+                }
+                #endif
+            }
+        }
+        /*!
+         * \brief generate data from standard gaussian
+         * \param dst destination
+         * \param mu mean variable
+         * \param sigma standard deviation
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline void SampleGaussian( Tensor<cpu, dim> &dst, real_t mu = 0.0f, real_t sigma = 1.0f ) {
+            if( sigma <= 0.0f ) {
+                dst = mu; return;
+            }
+            Tensor<cpu, 2> mat = dst.FlatTo2D();
+            for (index_t i = 0; i < mat.shape[1]; ++i) {
+                #if MSHADOW_USE_MKL
+                #if MSHADOW_SINGLE_PRECISION
+                int status = vsRngGaussian( 0, vStream_, mat.shape[0], mat[i].dptr, mu, sigma );
+                #else
+                int status = vdRngGaussian( 0, vStream_, mat.shape[0], mat[i].dptr, mu, sigma );
+                #endif
+                utils::Assert(status == VSL_STATUS_OK, "Failed to generate random number by MKL.\n" );
+                #else
+                real_t g1 = 0.0f, g2 = 0.0f;
+                for (index_t j = 0; j < mat.shape[0]; ++j) {
+                    if( (j & 1) == 0 ){
+                        this->SampleNormal2D( g1, g2 );
+                        mat[i][j] = mu + g1 * sigma;
+                    }else{
+                        mat[i][j] = mu + g2 * sigma;
+                    }
+                }
+                #endif
+            }
+        }
+        /*!
+         * \brief return a temporal expression storing standard gaussian random variables
+         *        the temporal tensor is only valid before next call of gaussian or uniform
+         *        can be used as part of expression
+         *  Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result,
+         *           since second call of gaussian(s2) makes gaussian(s1) invalid
+         *           A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression
+         * \param shape shape of the tensor
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline expr::ReshapeExp<Tensor<cpu,1>,dim,1> gaussian( Shape<dim> shape ){
+            buffer_.Resize( Shape1( shape.Size() ) );
+            this->SampleGaussian( buffer_, 0.0f, 1.0f );
+            return expr::reshape( buffer_, shape );
+        }
+        /*!
+         * \brief return a temporal expression storing standard uniform [0,1)
+         *        the temporal tensor is only valid before next call of gaussian or uniform
+         *        can be used as part of expression
+         *  Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result,
+         *           since second call of gaussian(s2) makes gaussian(s1) invalid
+         *           A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression
+         * \param shape shape of the tensor
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline expr::ReshapeExp<Tensor<cpu,1>,dim,1> uniform( Shape<dim> shape ){
+            buffer_.Resize( Shape1( shape.Size() ) );
+            this->SampleUniform( buffer_, 0.0f, 1.0f );
+            return expr::reshape( buffer_, shape );
+        }
+    private:
+        /*! \brief get next random number from rand */
+        inline real_t RandNext( void ){
+            return static_cast<real_t>(rand()) / (static_cast<real_t>(RAND_MAX)+1.0f);
+        }
+        /*! \brief return a real numer uniform in (0,1) */
+        inline real_t RandNext2( void ){
+            return (static_cast<real_t>( rand() ) + 1.0 ) / (static_cast<real_t>(RAND_MAX) + 2.0);
+        }
+        /*!
+         * \brief sample iid xx,yy ~N(0,1)
+         * \param xx first  gaussian output
+         * \param yy second gaussian output
+         */
+        inline void SampleNormal2D( real_t &xx, real_t &yy ){
+            real_t x,y,s;
+            do{
+                x = 2.0f * RandNext2() - 1.0f;
+                y = 2.0f * RandNext2() - 1.0f;
+                s = x*x + y*y;
+            }while( s >= 1.0f || s == 0.0f );
+            real_t t = std::sqrt( -2.0f * std::log( s ) / s ) ;
+            xx = x * t; yy = y * t;
+        }
+    private:
+        #if MSHADOW_USE_MKL
+        /*! \brief stream used by MKL VSL */
+        VSLStreamStatePtr vStream_;
+        #endif
+        /*! \brief temporal space used to store random numbers */
+        TensorContainer<cpu,1> buffer_;
+    }; // class Random<cpu>
+
+#ifdef __CUDACC__
+
+    /*! \brief GPU random number generator */
+    template<>
+    class Random<gpu> {
+    public:
+        /*!
+         * \brief constructor of random engine
+         * \param seed random number seed
+         */
+        Random<gpu>(int seed) {
+            curandStatus_t status;
+            status = curandCreateGenerator(&gen_, CURAND_RNG_PSEUDO_DEFAULT);
+            utils::Assert(status == CURAND_STATUS_SUCCESS, "Can not create CURAND Generator");
+            this->Seed( seed );
+            buffer_.Resize( Shape1(kRandBufferSize) );
+        }
+
+        ~Random<gpu>() {
+            curandStatus_t status;
+            status = curandDestroyGenerator(gen_);
+            utils::Assert(status == CURAND_STATUS_SUCCESS, "Destory CURAND Gen failed");
+        }
+        /*!
+         * \brief seed random number generator using this seed
+         * \param seed seed of prng
+         */
+        inline void Seed( int seed ){
+            curandStatus_t status;
+            status = curandSetPseudoRandomGeneratorSeed(gen_, seed);
+            utils::Assert(status == CURAND_STATUS_SUCCESS, "Set CURAND seed failed.");
+        }
+        /*!
+         * \brief generate data from uniform [a,b)
+         * \param dst destination
+         * \param a lower bound of uniform
+         * \param b upper bound of uniform
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline void SampleUniform(Tensor<gpu, dim> &dst, real_t a=0.0f, real_t b=1.0f) {
+            if( a == 0.0f && b == 1.0f ){
+                dst = this->uniform( dst.shape );
+            }else{
+                dst = this->uniform( dst.shape ) *(b-a) + a;
+            }
+        }
+        /*!
+         * \brief generate data from standard gaussian
+         * \param dst destination
+         * \param mu mean variable
+         * \param sigma standard deviation
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline void SampleGaussian(Tensor<gpu, dim> &dst, real_t mu = 0.0f, real_t sigma = 1.0f) {
+            dst = this->gaussian( dst.shape, mu, sigma );
+        }
+        /*!
+         * \brief return a temporal expression storing standard gaussian random variables
+         *        the temporal tensor is only valid before next call of gaussian or uniform
+         *        can be used as part of expression
+         *  Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result,
+         *           since second call of gaussian(s2) makes gaussian(s1) invalid
+         *           A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression
+         * \param shape shape of the tensor
+         * \param mu mean
+         * \param sigma variance
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline expr::ReshapeExp<Tensor<gpu,1>,dim,1> gaussian( Shape<dim> shape, real_t mu=0.0f, real_t sigma=1.0f){
+            size_t aligned_sz = ((shape.Size() + 1UL)>>1)<<1;
+            // allocate alligned size
+            buffer_.Resize( Shape1( aligned_sz ) );
+            buffer_.Resize( Shape1( shape.Size() ) );
+            curandStatus_t status;
+            #if MSHADOW_SINGLE_PRECISION
+            status = curandGenerateNormal(gen_, buffer_.dptr, aligned_sz , mu, sigma);
+            #else
+            status = curandGenerateNormalDouble(gen_, buffer_.dptr, buffer_.shape[0], mu, sigma);
+            #endif
+            utils::Assert(status == CURAND_STATUS_SUCCESS, "CURAND Gen Uniform failed\n");
+            return expr::reshape( buffer_, shape );
+        }
+        /*!
+         * \brief return a temporal expression storing standard uniform [0,1)
+         *        the temporal tensor is only valid before next call of gaussian or uniform
+         *        can be used as part of expression
+         *  Caution: this means expression such as A = gaussian(s1) * gaussian(s2) will give invalid result,
+         *           since second call of gaussian(s2) makes gaussian(s1) invalid
+         *           A = gaussian(s1)*B+C; is correct; use one gaussian/uniform in each expression
+         * \param shape shape of the tensor
+         * \tparam dim dimension of tensor
+         */
+        template<int dim>
+        inline expr::ReshapeExp<Tensor<gpu,1>,dim,1> uniform(Shape<dim> shape) {
+            buffer_.Resize( Shape1( shape.Size() ) );
+            curandStatus_t status;
+            #if MSHADOW_SINGLE_PRECISION
+            status = curandGenerateUniform(gen_, buffer_.dptr, buffer_.shape[0] );
+            #else
+            status = curandGenerateUniformDouble(gen_, buffer_.dptr, buffer_.shape[0] );
+            #endif
+            utils::Assert(status == CURAND_STATUS_SUCCESS, "CURAND Gen Uniform failed\n");
+            return expr::reshape( buffer_, shape );
+        }
+    private:
+        /*! \brief random numbeer generator */
+        curandGenerator_t gen_;
+        /*! \brief templ buffer */
+        TensorContainer<gpu, 1> buffer_;
+    }; // class Random<gpu>
+    #endif
+
+}; // namespace mshadow
+
+#endif // MSHADOW_TENSOR_RANDOM_H

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/mshadow/tensor_sse-inl.hpp
----------------------------------------------------------------------
diff --git a/include/mshadow/tensor_sse-inl.hpp b/include/mshadow/tensor_sse-inl.hpp
new file mode 100644
index 0000000..b98383e
--- /dev/null
+++ b/include/mshadow/tensor_sse-inl.hpp
@@ -0,0 +1,431 @@
+#ifndef MSHADOW_TENSOR_SSE_INL_HPP
+#define MSHADOW_TENSOR_SSE_INL_HPP
+/*!
+ * \file tensor_sse-inl.hpp
+ * \brief support of sse2 optimization of some operations
+ * \author Tianqi Chen
+ */
+#ifdef __APPLE__
+#include <stdlib.h>
+#else
+#include <malloc.h>
+#endif
+
+#include "tensor_expr.h"
+#include "tensor.h"
+
+namespace mshadow {
+    /*! \brief namespace to support sse2 vectorization */
+    namespace sse2{
+        /*! 
+         * \brief analog to cudaMallocPitch, allocate a aligned space with num_line * lspace cells
+         * \param pitch output parameter, the actuall space allocated for each line
+         * \param lspace number of cells required for each line
+         * \param num_line number of lines to be allocated
+         */
+        inline void* AlignedMallocPitch( size_t &pitch, size_t lspace, size_t num_line ){
+            pitch = ((lspace+15) >> 4) << 4;
+            #ifdef _MSC_VER
+            void * res = _aligned_malloc( pitch*num_line, 16 ); 
+            #else
+            #ifdef __APPLE__
+            void *res = malloc( pitch * num_line );
+            #else
+            void * res = memalign( 16, pitch*num_line ); 
+            #endif
+            #endif
+            utils::Assert( res != NULL, "AlignedMallocPitch failed" );
+            return res;
+        }
+        /*! 
+         * \brief free aligned space 
+         * \param ptr pointer to space to be freed
+         */
+        inline void AlignedFree( void *ptr ){
+            #ifdef _MSC_VER
+            _aligned_free( ptr );
+            #else
+            free( ptr );
+            #endif
+        }
+        /*! \brief check if a pointer is aligned */
+        inline bool CheckAlign( size_t pitch ){
+            return !(pitch & ((1<<4)-1));
+        }
+        /*! \brief check if a pointer is aligned */
+        inline bool CheckAlign( void *ptr ){
+            return CheckAlign( (size_t)ptr );
+        }
+        /*! 
+         * \brief get upper bound of aligned index of size 
+         * \param size size of the array
+         * \param fsize size of float
+         */
+        inline index_t UpperAlign( index_t size, size_t fsize ){
+            return (( (size*fsize+15) >> 4 ) << 4) / fsize;
+        }
+        /*! 
+         * \brief get lower bound of aligned index of size 
+         * \param size size of the array
+         * \param fsize size of float
+         */
+        inline index_t LowerAlign( index_t size, size_t fsize ){
+            return (( (size*fsize) >> 4 ) << 4) / fsize;
+        }
+    }; // namespace sse2
+}; // namespace  mshadow
+
+#if MSHADOW_USE_SSE
+// sse types are not compatible with nvcc, only use them in cpu mode
+#include <emmintrin.h>
+
+namespace mshadow{
+    namespace sse2{
+        /*! 
+         * \brief float vector real type, used for vectorization 
+         * \tparam FloatType double or float
+         */
+        template<typename FloatType> struct FVec{};
+        
+        /*! \brief vector real type for float */
+        template<> 
+        struct FVec<float> {
+        public:
+            typedef __m128 DType;
+            /*! \brief number of float in vector */
+            const static index_t kSize = 4;
+            /*! \brief data content */
+            DType data_;
+        public:
+            /* constructors */
+            FVec( void ){}
+            FVec( DType data ):data_(data){}
+            /* set the float */
+            FVec( const float &s ){
+                data_ = _mm_set1_ps( s );
+            }
+            /*!\brief load from pointer src */
+            FVec( const float *src ){
+                data_ = _mm_load_ps( src );                
+            } 
+        public:
+            /*! \brief store data into dst space */
+            inline void Store( float *dst ) const{
+                return _mm_store_ps( dst, data_ );
+            }
+            /*! \brief sum of all content */
+            inline float Sum( void ) const{
+                DType ans  = _mm_add_ps( data_, _mm_movehl_ps( data_, data_ ) );
+                DType rst  = _mm_add_ss( ans, _mm_shuffle_ps( ans, ans, 1 ) );
+                #if defined(_MSC_VER) && ( _MSC_VER <= 1500 ) && defined(_WIN64)
+                return rst.m128_f32[ 0 ];
+                #else
+                float rr = _mm_cvtss_f32( rst ) ;
+                return rr;
+                #endif
+            }
+        };
+
+        /*! \brief vector real type for float */
+        template<> 
+        struct FVec<double> {
+        public:
+            typedef __m128d DType;
+            /*! \brief number of float in vector */
+            const static index_t kSize = 2;
+            /*! \brief data content */
+            DType data_;
+        public:
+            /* constructors */
+            FVec( void ){}
+            FVec( DType data ):data_(data){}
+            /* set the float */
+            FVec( const double &s ){
+                data_ = _mm_set1_pd( s );
+            }
+            /*!\brief load from pointer src */
+            FVec( const double *src ){
+                data_ = _mm_load_pd( src );                
+            } 
+        public:
+            /*! \brief store data into dst space */
+            inline void Store( double *dst ) const{
+                return _mm_store_pd( dst, data_ );
+            }
+            /*! \brief sum of all content */
+            inline double Sum( void ) const{
+                DType tmp =  _mm_add_sd( data_, _mm_unpackhi_pd( data_,data_ ) ) ;
+                #if defined(_MSC_VER) && ( _MSC_VER <= 1500 ) && defined(_WIN64)
+                return tmp.m128d_f64[0];
+                #else
+                double ans = _mm_cvtsd_f64( tmp );
+                return ans;
+                #endif
+            }
+        };
+    };
+
+    namespace sse2{
+        /*! \brief sse2 operator type of certain operator */
+        template<typename OP>
+        struct SSEOp{
+            const static bool kEnabled = false;
+        };        
+        template<>
+        struct SSEOp<op::plus>{
+            const static bool kEnabled = true;
+            MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){
+                return FVec<float>( _mm_add_ps( lhs.data_, rhs.data_ ) );
+            }
+            MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){
+                return FVec<double>( _mm_add_pd( lhs.data_, rhs.data_ ) );
+            }
+        };
+        template<>
+        struct SSEOp<op::minus>{
+            const static bool kEnabled = true;
+            MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){
+                return FVec<float>( _mm_sub_ps( lhs.data_, rhs.data_ ) );
+            }
+            MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){
+                return FVec<double>( _mm_sub_pd( lhs.data_, rhs.data_ ) );
+            }
+        };
+        template<>
+        struct SSEOp<op::mul>{
+            const static bool kEnabled = true;
+            MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){
+                return FVec<float>( _mm_mul_ps( lhs.data_, rhs.data_ ) );
+            }
+            MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){
+                return FVec<double>( _mm_mul_pd( lhs.data_, rhs.data_ ) );
+            }
+        };
+        template<>
+        struct SSEOp<op::div>{
+            const static bool kEnabled = true;
+            MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &lhs, const FVec<float> &rhs ){
+                return FVec<float>( _mm_div_ps( lhs.data_, rhs.data_ ) );
+            }
+            MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &lhs, const FVec<double> &rhs ){
+                return FVec<double>( _mm_div_pd( lhs.data_, rhs.data_ ) );
+            }
+        };
+
+        template<>
+        struct SSEOp<op::identity>{
+            const static bool kEnabled = true;
+            MSHADOW_CINLINE static FVec<float> Map( const FVec<float> &src ){
+                return src;
+            }
+            MSHADOW_CINLINE static FVec<double> Map( const FVec<double> &src ){
+                return src;
+            }
+        };
+    }; // namespace sse2
+    
+    namespace sse2{
+        // savers to do storage
+        template<typename SV, typename TFloat>
+        struct Saver{
+            MSHADOW_CINLINE static void Save( TFloat *dst, const FVec<TFloat> &src ){
+                FVec<TFloat> lhs( dst );
+                FVec<TFloat> ans = SSEOp<typename SV::OPType>::Map( lhs, src );
+                ans.Store( dst );
+            }
+        };
+        template<typename TFloat>
+        struct Saver<sv::saveto,TFloat>{
+            MSHADOW_CINLINE static void Save( TFloat *dst, const FVec<TFloat> &src ){
+                src.Store( dst );
+            }
+        };        
+    }; // namespace sse2
+}; // namespace mshadow
+
+namespace mshadow{
+    namespace expr{
+        // same as plan, but use sse2
+        template<typename ExpType>
+        class SSEPlan {
+        public:
+            /*!
+             * \brief evaluate the expression at index [y][x], x will be aligned to 4
+             *        to be implemented by SubType
+             */
+            MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const;
+            MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const;
+        };
+
+        template <typename Device, int dim>
+        class SSEPlan< Tensor<Device,dim> >{
+        public:
+            SSEPlan( const Tensor<Device,dim> &t )
+                :dptr_(t.dptr),stride_(t.shape.stride_){}
+            MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{
+                return sse2::FVec<real_t>( &dptr_[ y*stride_+x ] );
+            }
+            MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{
+                return dptr_[ y * stride_ + x ];
+            }
+        private:
+            const real_t  *dptr_;
+            index_t stride_;
+        };
+
+        template<>
+        class SSEPlan<ScalarExp>{
+        public:
+            SSEPlan( real_t scalar ):scalar_(scalar){}
+            MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{
+                return sse2::FVec<real_t>( scalar_ );
+            }
+            MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{
+                return scalar_;
+            }
+        private:
+            real_t scalar_;
+        };
+
+        template<typename OP, typename TA, typename TB,int etype>
+        class SSEPlan< BinaryMapExp<OP,TA,TB,etype> >{
+        public:
+            SSEPlan( const SSEPlan<TA> &lhs, const SSEPlan<TB> &rhs )
+                :lhs_(lhs), rhs_(rhs){}
+            MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{
+                return sse2::SSEOp<OP>::Map( lhs_.EvalSSE( y, x ), rhs_.EvalSSE( y, x ) );
+            }
+            MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{
+                return OP::Map( lhs_.Eval( y, x ), rhs_.Eval( y, x ) );
+            }
+        private:
+            SSEPlan<TA> lhs_;
+            SSEPlan<TB> rhs_;
+        };
+
+        template<typename OP, typename TA, int etype>
+        class SSEPlan< UnaryMapExp<OP,TA,etype> >{
+        public:
+            SSEPlan( const SSEPlan<TA> &src ):src_(src){}
+            MSHADOW_CINLINE sse2::FVec<real_t> EvalSSE( index_t y, index_t x ) const{
+                return sse2::SSEOp<OP>::Map( src_.EvalSSE( y, x ) );
+            }
+            MSHADOW_CINLINE real_t Eval( index_t y, index_t x ) const{
+                return OP::Map( src_.Eval( y, x ) );
+            }
+        private:
+            SSEPlan<TA> src_;
+        };
+
+        template<typename OP, typename TA, typename TB, int etype>
+        inline SSEPlan< BinaryMapExp<OP,TA,TB,etype> > MakeSSEPlan( const BinaryMapExp<OP,TA,TB,etype> &e );
+
+        inline SSEPlan<ScalarExp> MakeSSEPlan( const ScalarExp &e ){
+            return SSEPlan<ScalarExp>( e.scalar_ );
+        }
+
+        template<typename T>
+        inline SSEPlan<T> MakeSSEPlan( const ContainerExp<T> &e ){
+            return SSEPlan<T>( e.self() );
+        }
+
+        template<typename T,int dim>
+        inline SSEPlan<T> MakeSSEPlan( const MakeTensorExp<T,cpu,dim> &e ){
+            return SSEPlan<T>( e.real_self() );
+        }
+
+        template<typename OP, typename TA, int etype>
+        inline SSEPlan< UnaryMapExp<OP,TA,etype> > MakeSSEPlan( const UnaryMapExp<OP,TA,etype> &e ){
+            return SSEPlan< UnaryMapExp<OP,TA,etype> >( MakeSSEPlan(e.src_) );
+        }
+
+        template<typename OP, typename TA, typename TB, int etype>
+        inline SSEPlan< BinaryMapExp<OP,TA,TB,etype> > MakeSSEPlan( const BinaryMapExp<OP,TA,TB,etype> &e ){
+                return SSEPlan< BinaryMapExp<OP,TA,TB,etype> >( MakeSSEPlan(e.lhs_), MakeSSEPlan(e.rhs_) );
+        }
+    };
+
+    namespace expr{
+        /*!
+         * \brief static check sse enable
+         *        if a expression E can not be evaluated using sse, then kPass = false
+         * \tparam Device the type of Device
+         * \tparam dim dimension of the tensor
+         * \tparam E expression
+         */
+        template<typename E>
+        struct SSECheck{
+            const static bool kPass = false;
+        };
+        template<>
+        struct SSECheck<ScalarExp>{
+            const static bool kPass = true;
+        };
+        template<int dim>
+        struct SSECheck<Tensor<cpu,dim> >{
+            const static bool kPass = true;
+        };
+        
+        template<typename OP, typename TA, int etype>
+        struct SSECheck<UnaryMapExp<OP,TA,etype> >{
+            const static bool kPass = SSECheck<TA>::kPass && sse2::SSEOp<OP>::kEnabled;
+        };
+        template<typename OP, typename TA, typename TB, int etype>
+        struct SSECheck< BinaryMapExp<OP,TA,TB,etype> >{
+            const static bool kPass = SSECheck<TA>::kPass && SSECheck<TB>::kPass && sse2::SSEOp<OP>::kEnabled;
+        }; 
+    }; // namespace expr
+    namespace expr{
+        // check if data is aligned and allow sse operation
+        template<int dim,typename E>
+        struct SSEAlignCheck{
+            inline static bool Check( const E &exp ){
+                return false;
+            }
+        };
+        template<int dim>
+        struct SSEAlignCheck< dim, ScalarExp >{
+            inline static bool Check( const ScalarExp &exp ){
+                return true;
+            }
+        };
+        template<int dim>
+        struct SSEAlignCheck< dim,Tensor<cpu,dim> >{
+            inline static bool Check( const Tensor<cpu,dim> &t ){
+                return sse2::CheckAlign( t.dptr ) && sse2::CheckAlign( t.shape.stride_ * sizeof( real_t ) );
+            }
+        };
+        template<int dim, typename OP, typename TA, int etype>
+        struct SSEAlignCheck< dim, UnaryMapExp<OP,TA,etype> >{
+            inline static bool Check( const UnaryMapExp<OP,TA,etype> &t ){
+                return SSEAlignCheck<dim,TA>::Check( t.src_);
+            }
+        };
+        template<int dim, typename OP, typename TA, typename TB, int etype>
+        struct SSEAlignCheck< dim, BinaryMapExp<OP,TA,TB,etype> >{ 
+            inline static bool Check( const BinaryMapExp<OP,TA,TB,etype> &t ){
+                return SSEAlignCheck<dim,TA>::Check( t.lhs_ ) && 
+                    SSEAlignCheck<dim,TB>::Check( t.rhs_ );
+            }
+        };
+    }; // namespace expr
+
+    /*! 
+     * \brief use SSEPlan to compute result
+     */
+    template<typename SV, typename E, int dim>
+    inline void MapSSEPlan(Tensor<cpu,dim> _dst, const expr::SSEPlan<E> &plan){        
+        Tensor<cpu,2> dst = _dst.FlatTo2D();
+        const index_t xlen = sse2::LowerAlign( dst.shape[0], sizeof(real_t) );
+        for ( index_t y = 0; y < dst.shape[1]; y ++ ) {
+            for( index_t x = 0; x < xlen; x += sse2::FVec<real_t>::kSize ){
+                sse2::Saver<SV,real_t>::Save( &dst[y][x], plan.EvalSSE( y,x ) );
+            }
+            for( index_t x = xlen; x < dst.shape[0]; x ++ ){
+                SV::Save( dst[y][x], plan.Eval(y,x) );
+            }
+        }
+    }
+}; // namespace mshadow
+#endif // MSHADOW_USE_SSE
+#endif // MSHADOW_TENSOR_SSE_INL_HPP

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/neuralnet/base_layer.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/base_layer.h b/include/neuralnet/base_layer.h
new file mode 100644
index 0000000..863c223
--- /dev/null
+++ b/include/neuralnet/base_layer.h
@@ -0,0 +1,563 @@
+#ifndef INCLUDE_BASE_LAYER_H_
+#define INCLUDE_BASE_LAYER_H_
+
+#include <vector>
+#include <string>
+#include <map>
+#include <functional>
+#include <utility>
+#include <condition_variable>
+#include <mutex>
+#include <memory>
+#include <chrono>
+#include <algorithm>
+
+#include "proto/model.pb.h"
+#include "utils/param.h"
+#include "utils/common.h"
+#include "utils/blob.h"
+
+using std::vector;
+using std::shared_ptr;
+using std::make_shared;
+using std::string;
+using std::map;
+
+namespace singa{
+
+class Layer;
+typedef shared_ptr<Layer> SLayer;
+/**
+ * Base layer class.
+ * Children should implement at least Layer::Setup, Layer::ComputeFeature(),
+ * Layer::ComputGradient() functions for backpropagation method;
+ * TODO(wangwei) implement children layers to support contrastive divergence,
+ * The identifier of each layer is the literal string of the class name without
+ * the suffix "Layer", which is used in layer registration and creation.
+ */
+class Layer {
+ public:
+  Layer(){}
+  /**
+   * simply save the proto configuation.
+   * most initializations are done by Setup().
+   * @param layer_proto user defined layer configuration
+   */
+  virtual void Init(const LayerProto &proto);
+  /**
+   * copy layer configuration from the other Layer, and set the shape.
+   */
+  void Init(const Layer& other, const vector<int>& shape);
+  virtual ~Layer(){}
+  /**
+   * Marshal layer properties and data into google protobuf object
+   * (i.e., snapshot).
+   * Parameters are marshalled separately into another object (i.e., model).
+   * @param layer_proto
+   * @param copyData if true marshal data of DArray
+   */
+  virtual void ToProto(LayerProto *layer_proto, bool copyData);
+  /**
+   * Setup layer properties.
+   * Setup the shapes for data and parameters, also setup some properties
+   * based on the layer configuration and connected src layers.
+   * @param srclayers layers connecting to this layer
+   */
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers)=0;
+  /**
+   * \copydoc Setup(const LayerProto&, const vector<SLayer>&)
+   */
+  virtual void Setup();
+  /**
+   * Setup the layer properties except shape.
+   * the shape is already set and passed in to set other properties.
+   * perperties are set according to shapes of itself and connected layers, and
+   * configuration. this should not change the current shape_(
+   * shape check is done outside the function).
+   */
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers)=0;
+  /**
+   * \copybrief SetupAfterPartition(const LayerProto&, const vector<int> &,
+   * const vector<SLayer>& ).
+   */
+  virtual void SetupAfterPartition();
+  /**
+   * Layers that have paramters must overload this function.
+   * @return parameters associated with this layer
+   */
+  virtual vector<shared_ptr<Param>> GetParams(){
+    return vector<shared_ptr<Param>>();
+  }
+  /**
+   * Compute features of this layer based on connected layers.
+   * Implement forward propagation for BP; TODO Implement both postive phase
+   * and negative phase for CD.
+   * @param srclayers layers connecting to this layer
+   */
+  virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers)=0;
+  /**
+   * \copybrief ComputeFeature(const vector<SLayer>& srclayers)
+   */
+  virtual void ComputeFeature(bool training);
+  /**
+   * Compute gradients for parameters and connecting layers.
+   * Implement backward propagation for BP; TODO Calculate gradients for
+   * parameters for CD.
+   * @param srclayers layers connecting to this layer.
+   */
+  virtual void ComputeGradient(const vector<SLayer>& srclayers)=0;
+  /**
+   * \copybrief ComputeGradient(const vector<SLayer>& srclayers)
+   */
+  virtual void ComputeGradient();
+  /**
+   * decide on which dimension to do the partitioning.
+   * @mode kLayer, kData, kNone (no partition)
+   * @return the partition dimension, -1 for no partition
+   */
+  virtual int partition_dimension() const {
+    int ret=0;
+    if(partition_type()==kLayerPartition)
+      ret= 1;
+    else if(partition_type()==kNone)
+      ret= -1;
+    return ret;
+  }
+
+  /**
+   * return connection type between two layers.
+   * Currently support two connections: kOneToOne, and kOneToAll.
+   * kOneToOne indicates the dst neuron depends on only one neuron from src
+   * layer. kOneToAll indicates the dst neuron depends on all neurons from src
+   * layer. TODO support kOneToMany.
+   */
+  virtual ConnectionType connection_type(int k) const {
+    CHECK_LT(k, srclayers_.size());
+    return kOneToOne;
+  }
+  /**
+   * return partition type of this layer.
+   * E.g., kNone, kLayer or kData
+   */
+  virtual PartitionType partition_type() const {
+    return layer_proto_.partition_type();
+  }
+  /**
+   * location id is the execution unit (i.e., thread from the working group) ID.
+   */
+  virtual void set_locationid(int id){
+    layer_proto_.set_locationid(id);
+  }
+  virtual int locationid() const {
+    return layer_proto_.locationid();
+  }
+  /**
+   * partition id is the ID of the layer in the original layer.
+   */
+  virtual void set_partitionid(int id){
+    layer_proto_.set_partitionid(id);
+  }
+  virtual int partitiionid() const {
+    return layer_proto_.partitionid();
+  }
+  virtual void set_name(string name){
+    name_=name;
+    layer_proto_.set_name(name);
+  }
+  virtual const string type() const {
+    return layer_proto_.type();
+  }
+  /**
+   * Return name of this layer
+   */
+  const std::string &name() const {
+    return layer_proto_.name();
+  }
+  const vector<int>& shape(const Layer* layer=nullptr) const{
+    return data(layer).shape();
+  }
+
+  /**
+   * @return a const ref for Blob storing neuron values of this layer for BP
+   */
+  virtual const Blob<float>& data(const Layer* from=nullptr) const {
+    return data_;
+  }
+  virtual Blob<float>* mutable_data(const Layer* from=nullptr){
+    return &data_;
+  }
+
+  virtual const Blob<float>& grad(const Layer* from=nullptr) const {
+    return grad_;
+  }
+  /**
+   * @return a pointer to storing neuron grads of this layer for BP
+   */
+  virtual Blob<float>* mutable_grad(const Layer* from=nullptr) {
+    return &grad_;
+  }
+
+  /**
+   * return LayerS that connected to this layer
+   */
+  virtual const vector< SLayer> srclayers() const {
+    return srclayers_;
+  }
+  /**
+   * return LayerS that this layer connected to
+   */
+  virtual const vector<SLayer> dstlayers() const {
+    return dstlayers_;
+  }
+
+  virtual const int srclayers_size() const {
+    return srclayers_.size();
+  }
+  virtual const int dstlayers_size() const {
+    return dstlayers_.size();
+  }
+  virtual void ClearDstLayers() {
+    dstlayers_.clear();
+  }
+  virtual void ClearSrcLayers() {
+    srclayers_.clear();
+  }
+
+  virtual void AddSrcLayer(SLayer src){
+    srclayers_.push_back(src);
+  }
+  virtual void AddDstLayer(SLayer dst){
+    dstlayers_.push_back(dst);
+  }
+
+  virtual bool is_datalayer() const {
+    return false;
+  }
+  virtual bool is_parserlayer() const {
+    return false;
+  }
+  virtual bool is_losslayer() const {
+    return false;
+  }
+  virtual bool is_bridgesrclayer() const {
+    return false;
+  }
+  virtual bool is_bridgedstlayer() const {
+    return false;
+  }
+protected:
+  string name_;
+  //vector<shared_ptr<SyncedMem>> memblobs_;
+  Blob<float> data_, grad_;
+  // DArray pos_, neg_;//for CD
+  LayerProto layer_proto_;
+  vector<SLayer> srclayers_, dstlayers_;
+};
+
+/**
+ * For sending data to layer on other threads which may resident on other nodes
+ * due to layer/data partition.
+ */
+class BridgeSrcLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void SetupAfterPartition();
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+  virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers);
+  virtual void ComputeGradient(const vector<SLayer>& srclayers);
+  virtual bool is_bridgesrclayer() const {
+    return true;
+  }
+
+  virtual void set_ready(bool a) {
+    ready_=a;
+  }
+  virtual bool ready() const {
+    return ready_;
+  }
+ protected:
+  bool ready_;
+};
+/**
+ * For recv data from layer on other threads which may resident on other nodes
+ * due to layer/data partiton
+ */
+class BridgeDstLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void SetupAfterPartition();
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+  virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers);
+  virtual void ComputeGradient(const vector<SLayer>& srclayers);
+  virtual bool is_bridgedstlayer() const {
+    return true;
+  }
+  virtual void set_ready(bool a) {
+    ready_=a;
+  }
+  virtual bool ready() const {
+    return ready_;
+  }
+ protected:
+  bool ready_;
+};
+
+/**
+ * Concate src layers on one dimension
+ */
+class ConcateLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void SetupAfterPartition();
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+};
+
+
+/**
+ * base layer for prefetching records from local Shard, HDFS, lmdb, etc.
+ * cannot be partitioned, always returns kNone for partition type.
+ */
+
+class DataLayer: public Layer{
+ public:
+  virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers)=0;
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers)=0;
+  virtual bool is_datalayer() const {
+    return true;
+  }
+  virtual void ComputeGradient(const vector<SLayer>& srclayers){};
+  virtual const vector<Record>& records() const {
+    return records_;
+  }
+  virtual void Setup(){
+    vector<SLayer> dummy;
+    Setup(layer_proto_,dummy);
+    has_set_=true;
+  }
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+  virtual void SetupAfterPartition(){
+    if(!has_set_)
+    Setup();
+  }
+  virtual PartitionType partition_type () const {
+    return kNone;
+  }
+
+  virtual int batchsize() const {
+    return layer_proto_.data_param().batchsize();
+  }
+  virtual const Record& sample() const {
+    return sample_;
+  }
+
+  virtual Blob<float>* mutable_data(const Layer* layer=nullptr) {
+    return nullptr;
+  }
+  virtual Blob<float>* mutable_grad(const Layer* layer=nullptr) {
+    return nullptr;
+  }
+  void set_prefetch(bool prefetch){
+    prefetch_=prefetch;
+  }
+
+  virtual void ComputeFeature(bool training) {
+    if(!prefetch_)
+      ComputeFeature(training, srclayers_);
+  }
+
+  virtual void Prefetching(bool training){
+    CHECK(prefetch_);
+    ComputeFeature(training, srclayers_);
+  }
+
+ protected:
+  bool has_set_;
+  bool prefetch_;
+  int random_skip_, batchsize_;
+  Record sample_;
+  vector<Record> records_;
+};
+
+/**
+ * Slice this layer into multiple dst layers on one dimension
+ */
+class SliceLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void SetupAfterPartition();
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+
+  virtual const Blob<float>& data(const Layer* layer=nullptr) const;
+  virtual const Blob<float>& grad(const Layer* layer=nullptr) const;
+  virtual Blob<float>* mutable_data(const Layer* layer=nullptr);
+  virtual Blob<float>* mutable_grad(const Layer* layer=nullptr);
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+
+ protected:
+  int SliceID(const Layer* layer) const;
+  vector<Blob<float>> datavec_, gradvec_;
+};
+
+/**
+ * Replciate this layer into multiple dst layers
+ */
+class SplitLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void SetupAfterPartition();
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+};
+
+/**
+ * Loss layer to calculate loss and other metrics, e.g., precison.
+ */
+class LossLayer: public Layer{
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers)=0;
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers)=0;
+  virtual Blob<float>* mutable_grad(const Layer* layer=nullptr){
+    return nullptr;
+  }
+  virtual const Blob<float>& grad(const Layer* from=nullptr) const {
+    CHECK(false)<<"Loss layer has not gradient blob";
+    return grad_;
+  }
+  virtual bool is_losslayer() const {
+    return true;
+  }
+
+  virtual const Blob<float>& metric() const {
+    return metric_;
+  }
+ protected:
+  Blob<float> metric_;
+};
+
+/**
+ * parse the input records into Blobs.
+ */
+class ParserLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers)=0;
+  /**
+   * Parse records from DataLayer into blob.
+   * This function is called by
+   * ComputeFeature(bool, const vector<SLayer>& srclayers)  or Prefetch(bool).
+   */
+  virtual void ParseRecords(bool training, const vector<Record>& records, Blob<float>* blob)=0;
+  virtual bool is_parserlayer() const {
+    return true;
+  }
+  /**
+   * Dummy function. ParserLayer does not compute gradients.
+   */
+  virtual void ComputeGradient(const vector<SLayer>& srclayers){};
+  virtual void Setup(){
+    Setup(layer_proto_,srclayers_);
+    has_set_=true;
+    ready_=true;
+    prefetch_=false;
+  }
+  virtual void SetupAfterPartition(){
+    if(!has_set_)
+      Setup();
+  }
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers){}
+
+  virtual PartitionType partition_type () const{
+    return kNone;
+  }
+  virtual Blob<float>* mutable_grad(const Layer* layer=nullptr) {
+    return nullptr;
+  }
+  virtual const Blob<float>& grad(const Layer* from=nullptr) const {
+    CHECK(false)<<"Parser layer has not gradient blob";
+    return grad_;
+  }
+
+  virtual void ComputeFeature(bool training, const vector<SLayer>& srclayers){
+    if(!prefetch_){
+      DataLayer* datalayer=static_cast<DataLayer*>(srclayers[0].get());
+      ParseRecords(training, datalayer->records(), &data_);
+    }else{
+      std::unique_lock<std::mutex> lck(mtx_);
+      while(!ready_) cv_.wait(lck);
+      data_.CopyFrom(prefetch_data_);
+      ready_=false;
+      cv_.notify_all();
+    }
+  }
+  /**
+   * prefetching is transparent to parsing logics.
+   * users implement parsing logics in ParseRecords
+   * worker/training algorithm calls this function to do prefetching in a
+   * separate thread. Records are in fact parsed into prefetch_data_, and later
+   * copied into data_.
+   */
+  void Prefetching(bool training){
+    std::unique_lock<std::mutex> lck(mtx_);
+    while(ready_) cv_.wait(lck);
+    //data_.Swap(prefetch_data_);
+    DataLayer* datalayer=static_cast<DataLayer*>(srclayers_[0].get());
+    ParseRecords(training, datalayer->records(), &prefetch_data_);
+    ready_=true;
+    cv_.notify_all();
+  }
+
+  /**
+   * must be called before calling ComputeFeature(bool) if Prefetching runs in a
+   * separate thread
+   */
+  void set_prefetch(bool prefetch) {
+    if(prefetch){
+      if(prefetch_data_.count()==0)
+        prefetch_data_.ReshapeLike(data_);
+      ready_=false;
+    }
+    prefetch_=prefetch;
+  }
+
+ private:
+  std::mutex mtx_;
+  std::condition_variable cv_;
+  bool ready_;
+  bool has_set_;
+  bool prefetch_;
+  //!< prefetch_data_ is invisible to layer logics, i.e., parsing.
+  Blob<float> prefetch_data_;
+};
+} // singa
+
+#endif // INCLUDE_BASE_LAYER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/neuralnet/layer.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/layer.h b/include/neuralnet/layer.h
new file mode 100644
index 0000000..263d249
--- /dev/null
+++ b/include/neuralnet/layer.h
@@ -0,0 +1,297 @@
+#ifndef INCLUDE_NET_LAYER_H_
+#define INCLUDE_NET_LAYER_H_
+
+#include <vector>
+#include <string>
+#include <map>
+#include <functional>
+#include <utility>
+#include <memory>
+#include <chrono>
+#include <random>
+#include <lmdb.h>
+
+#include "proto/model.pb.h"
+#include "utils/data_shard.h"
+#include "neuralnet/base_layer.h"
+
+
+/**
+ * \file this file includes the declarations neuron layer classes that conduct
+ * the transformation of features.
+ */
+namespace singa {
+
+/**
+ * Convolution layer.
+ */
+class ConvolutionLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  /**
+   * need to reset some properties (e.g., weight matrix) according to
+   * shapes (after partition, e.g., partition is done against channel dimension)
+   */
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+  virtual vector<shared_ptr<Param>> GetParams() {
+    return vector<shared_ptr<Param>>{weight_, bias_};
+  }
+  virtual ConnectionType connection_type(int k) const {
+    CHECK_LT(k, srclayers_.size());
+    return kOneToAll;
+  }
+ protected:
+  int kernel_, pad_,  stride_ ;
+  int batchsize_,  channels_, height_,width_;
+  int col_height_, col_width_, conv_height_, conv_width_, num_filters_;
+  shared_ptr<Param> weight_, bias_;
+  Blob<float> col_data_, col_grad_;
+};
+
+class DropoutLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+ protected:
+  // drop probability
+  float pdrop_;
+  /* record which neuron is dropped, required for back propagating gradients,
+   * if mask[i]=0, then the i-th neuron is dropped.
+   */
+  Blob<float> mask_;
+};
+
+/**
+  * fully connected layer
+  */
+class InnerProductLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  /**
+   * need to reset weight matrix in case of LayerPartition
+   */
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+  virtual ConnectionType connection_type(int k) const {
+    CHECK_LT(k, srclayers_.size());
+    return kOneToAll;
+  }
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+  //virtual void ToProto(LayerProto *layer_proto, bool copyData);
+  virtual vector<shared_ptr<Param>> GetParams() {
+    return vector<shared_ptr<Param>>{weight_, bias_};
+  }
+
+ private:
+  //! dimension of the hidden layer
+  int hdim_;
+  //! dimension of the visible layer
+  int vdim_;
+  int batchsize_;
+  shared_ptr<Param> weight_, bias_;
+};
+
+class LabelLayer: public ParserLayer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void ParseRecords(bool training, const vector<Record>& records,
+      Blob<float>* blob);
+};
+
+class LRNLayer: public Layer {
+/**
+ * Local Response Normalization edge
+ * b_i=a_i/x_i^beta
+ * x_i=knorm+alpha*\sum_{j=max(0,i-n/2}^{min(N,i+n/2}(a_j)^2
+ * n is size of local response area.
+ * a_i, the activation (after ReLU) of a neuron convolved with the i-th kernel.
+ * b_i, the neuron after normalization, N is the total num of kernels
+ */
+
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+ protected:
+  //! shape of the bottom layer feature
+  int batchsize_, channels_, height_, width_;
+  //! size local response (neighbor) area
+  int lsize_;
+  //! hyper-parameter
+  float alpha_, beta_, knorm_;
+  Blob<float> norm_;
+};
+
+class MnistImageLayer: public ParserLayer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void ParseRecords(bool training, const vector<Record>& records,
+      Blob<float>* blob);
+
+ protected:
+  // height and width of the image after deformation
+  // kernel size for elastic distortion
+  // n^2 images are processed as a batch for elastic distortion
+  // conv height and conv width
+  // gauss kernel values, displacements, column image and tmp buffer
+  //float* gauss_, *displacementx_, *displacementy_, *colimg_, *tmpimg_;
+  float  gamma_, beta_, sigma_, kernel_, alpha_, norm_a_, norm_b_;
+  int resize_, elastic_freq_;
+};
+
+class PoolingLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+ protected:
+  int kernel_, pad_, stride_;
+  int batchsize_,channels_, height_, width_, pooled_height_, pooled_width_;
+  PoolingProto_PoolMethod pool_;
+};
+
+class ReLULayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+};
+
+
+class SoftmaxLossLayer: public LossLayer {
+  /*
+   * connected from the label layer and the last fc layer
+   */
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+  /**
+   * softmax is not recommendeded for partition because it requires the whole
+   * src layer for normalization.
+   */
+  virtual PartitionType partition_type() const {
+    if(layer_proto_.partition_type()==kLayerPartition)
+      return kNone;
+    else
+      return layer_proto_.partition_type();
+  }
+  virtual ConnectionType connection_type(int k) const {
+    CHECK_LT(k, srclayers_.size());
+    return kOneToAll;
+  }
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+ private:
+  int batchsize_;
+  int dim_;
+  float scale_;
+  int topk_;
+};
+
+class RGBImageLayer: public ParserLayer {
+ public:
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  virtual void ParseRecords(bool training, const vector<Record>& records,
+      Blob<float>* blob);
+
+ private:
+  float scale_;
+  int cropsize_;
+  bool mirror_;
+  Blob<float> mean_;
+};
+
+class ShardDataLayer: public DataLayer{
+ public:
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){};
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+ private:
+  shared_ptr<DataShard> shard_;
+};
+class LMDBDataLayer: public DataLayer{
+ public:
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers){};
+  virtual void Setup(const LayerProto& proto, const vector<SLayer>& srclayers);
+  void ConvertDatumToSingleLableImageRecord(const Datum& datum,
+    SingleLabelImageRecord* record);
+
+ private:
+  MDB_env* mdb_env_;
+  MDB_dbi mdb_dbi_;
+  MDB_txn* mdb_txn_;
+  MDB_cursor* mdb_cursor_;
+  MDB_val mdb_key_, mdb_value_;
+};
+
+/**
+ * This layer apply Tan function to neuron activations.
+ * f(x)=A tanh(Bx)
+ * f'(x)=B/A (A*A-f(x)*f(x))
+ */
+class TanhLayer: public Layer {
+ public:
+  virtual void Setup(const LayerProto& proto,
+      const vector<SLayer>& srclayers);
+
+  virtual void SetupAfterPartition(const LayerProto& proto,
+      const vector<int> &shape,
+      const vector<SLayer>& srclayers);
+
+
+  virtual void ComputeFeature(bool training, const vector<shared_ptr<Layer>>& srclayers);
+  virtual void ComputeGradient(const vector<shared_ptr<Layer>>& srclayers);
+ private:
+  float outer_scale_, inner_scale_;
+};
+
+
+}  // namespace singa
+
+#endif  // INCLUDE_NET_LAYER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/neuralnet/neuralnet.h
----------------------------------------------------------------------
diff --git a/include/neuralnet/neuralnet.h b/include/neuralnet/neuralnet.h
new file mode 100644
index 0000000..586a470
--- /dev/null
+++ b/include/neuralnet/neuralnet.h
@@ -0,0 +1,156 @@
+#ifndef INCLUDE_NET_NET_H_
+#define INCLUDE_NET_NET_H_
+
+#include <glog/logging.h>
+#include <vector>
+#include <map>
+#include <memory>
+
+#include "proto/model.pb.h"
+#include "neuralnet/layer.h"
+#include "utils/factory.h"
+#include "utils/graph.h"
+
+using std::vector;
+using std::string;
+using std::map;
+using std::shared_ptr;
+namespace singa {
+/**
+ * The neural network is constructed from user configured layers through google
+ * protocol buffer. TODO support constructing neural network by adding layers
+ * explicitly. E.g., users create layers and connect them manually in the code.
+ *
+ * Some layers, e.g., SplitLayer and BridgeSrcLayer/BridgeDstLayer will be added
+ * implicitly to partition the neural network.
+ */
+class NeuralNet {
+ public:
+  /**
+   * Register Layers
+   */
+  static void RegisterLayers();
+  /**
+   * Setup the neural network for training, test or validation.
+   *
+   * Parameters for test/validation net can share those from training after
+   * setup (done outside of this funcion).
+   *
+   * @param np proto for the neural network.
+   */
+  static shared_ptr<NeuralNet> SetupNeuralNet(const NetProto& np, Phase phase);
+
+ public:
+  /**
+   * construct the net structure from protocol buffer.
+   */
+  NeuralNet(NetProto net_proto, int group_size=1);
+  /**
+   * construct a json string representing the neuralnet graph.
+   * The json string can be used by other graph engine to draw a figure for
+   * displaying the neuralnet structure.
+   */
+  std::string ToString();
+  /**
+   * Print Norm1 of data and grad of each Layer and parameter.
+   * @param net, neural network
+   */
+  string DebugInfo();
+
+  /**
+   * to display the adjacency layers
+   */
+  std::string ToAdjacency();
+  /**
+   * Add layer explicitly used in manually programming/constructing neural net.
+   */
+  void AddLayer(const LayerProto &layer_proto){};
+  /**
+   * Add layer explicitly used in manually programming/constructing neural net.
+   */
+  void AddLayer(const Layer* layer){};
+  /**
+   * share weights from other neuralnet
+   */
+  void ShareParams(shared_ptr<NeuralNet> other,int flag);
+  void ToProto(NetProto *net_proto, bool copyData=false);
+  const std::vector<shared_ptr<Layer>>& layers() {
+    return layers_;
+  }
+  /**
+   * return ParserLayer of the neuralnet.
+   */
+  const std::vector<ParserLayer*>& parserlayers() {
+    if(parserlayers_.size()==0){
+      for(auto& layer: layers_)
+        if(layer->is_parserlayer())
+          parserlayers_.push_back(static_cast<ParserLayer*>(layer.get()));
+    }
+    return parserlayers_;
+  }
+  const std::vector<LossLayer*>& losslayers() {
+    if(losslayers_.size()==0){
+      for(auto& layer: layers_)
+        if(layer->is_losslayer())
+          losslayers_.push_back(static_cast<LossLayer*>(layer.get()));
+    }
+    return losslayers_;
+  }
+  const std::vector<DataLayer*>& datalayers() {
+    if(datalayers_.size()==0){
+      for(auto& layer: layers_)
+        if(layer->is_datalayer())
+          datalayers_.push_back(static_cast<DataLayer*>(layer.get()));
+    }
+    return datalayers_;
+  }
+  const std::vector<shared_ptr<Param>> &params()const {
+    return params_;
+  }
+  shared_ptr<Layer> name2layer(string name){
+    if (name2layer_.find(name)!=name2layer_.end())
+      return name2layer_[name];
+    else return nullptr;
+  }
+
+  shared_ptr<Param> paramid2param(int id) {
+    if(paramid2param_.size()==0){
+      for(auto& layer: layers_){
+        for(shared_ptr<Param> p: layer->GetParams()){
+          paramid2param_[p->id()]=p;
+        }
+      }
+    }
+    return paramid2param_[id];
+  }
+
+ protected:
+  void ConstructNeuralNet(const NetProto &net_proto);
+  void PartitionNeuralNet();
+  map<string, shared_ptr<Layer>> GetNameToLayer(
+    const vector<shared_ptr<Layer>>& layers);
+  Graph CreatePartitonedGraph(const vector<shared_ptr<Layer>>& layers,
+    const map<string, shared_ptr<Layer>>& name2layer);
+
+  /**
+   * Partition each layer according its partition type and dimension.
+   * @param layers original unpartitioned layers
+   */
+  map<string, vector<shared_ptr<Layer>>> PartitionLayers(
+      const vector<shared_ptr<Layer>>& layers);
+
+ protected:
+  vector<shared_ptr<Layer>> layers_;
+  vector<ParserLayer*> parserlayers_;
+  vector<LossLayer*> losslayers_;
+  vector<DataLayer*> datalayers_;
+  vector<shared_ptr<Param>> params_;
+  map<string, shared_ptr<Layer>> name2layer_;
+  map<int, shared_ptr<Param>> paramid2param_;
+
+  map<string, LayerProto> name2layerproto_;
+  int group_size_;
+  Graph graph_;
+};
+}  // namespace singa
+#endif  // INCLUDE_NET_NET_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/pm_server.h
----------------------------------------------------------------------
diff --git a/include/trainer/pm_server.h b/include/trainer/pm_server.h
new file mode 100644
index 0000000..b759844
--- /dev/null
+++ b/include/trainer/pm_server.h
@@ -0,0 +1,91 @@
+#ifndef INCLUDE_TRAINER_PM_SERVER_H_
+#define INCLUDE_TRAINER_PM_SERVER_H_
+
+#include <czmq.h>
+#include <memory>
+#include <vector>
+#include <map>
+#include <string.h>
+#include "proto/model.pb.h"
+#include "utils/updater.h"
+#include "utils/param.h"
+#include "communication/msg.h"
+#include "communication/socket.h"
+using std::vector;
+using std::string;
+using std::shared_ptr;
+
+namespace singa{
+
+/**
+ * Parameter manager at the server side.
+ *
+ * Repsond to worker's get/put/udpate request, and periodically syncing with
+ * other servers.
+ *
+ * Normally, the PMServer creates a response message for each request which
+ * will be sent back to the one who issued the request. However, if the request
+ * are not processed successfully, the original message will be returned. The
+ * sever does not know the returned message (response or the original message),
+ * it just sends it to the router. The router will decide to re-send the
+ * request to the server or send it to the worker.
+ *
+ */
+class PMServer{
+public:
+  typedef std::map<int, shared_ptr<Param>> ParamShard;
+
+	void Setup(int group_id, int server_id, shared_ptr<ParamShard> shard,
+       const UpdaterProto& proto);
+
+	~PMServer();
+
+	/**
+	 * Process GET request.
+   *
+   * @return the orignal message or response message
+   */
+	virtual Msg* HandleGet(Msg** msg);
+
+	/**
+	 * Process Update request.
+   *
+   * @return the orignal message or response message
+   */
+	virtual Msg* HandleUpdate(Msg** msg);
+
+	/**
+	 * Process PUT request.
+   *
+   * @return the original message or response message. If we don't want need to
+   * acknowledge the put request, then return nullptr.
+	 */
+	virtual Msg* HandlePut(Msg **msg);
+
+	/**
+   * TODO Process SYNC request.
+	 */
+	virtual Msg* HandleSyncRequest(Msg** msg);
+
+	/**
+   * TODO Process SYNC response.
+	 */
+	virtual int HandleSyncResponse(Msg** msg);
+
+  /**
+   * Scheduler for synchronizing server groups.
+   *
+   * TODO implement the Caffe's synchronization scheduler for data parallelism
+   */
+  virtual bool SyncNow();
+
+ protected:
+  int group_id_, server_id_;
+  shared_ptr<ParamShard> shard_;
+  shared_ptr<Dealer> dealer_;
+  shared_ptr<Updater> updater_;
+};
+
+} // namespace singa
+
+#endif // INCLUDE_TRAINER_PM_SERVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/pm_worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/pm_worker.h b/include/trainer/pm_worker.h
new file mode 100644
index 0000000..198f5bd
--- /dev/null
+++ b/include/trainer/pm_worker.h
@@ -0,0 +1,171 @@
+#ifndef INCLUDE_TRAINER_PM_WORKER_H_
+#define INCLUDE_TRAINER_PM_WORKER_H_
+
+#include <memory>
+#include <vector>
+#include <map>
+#include <string>
+#include <atomic>
+#include "utils/param.h"
+#include "communication/msg.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+using std::map;
+
+namespace singa {
+
+/**
+ * Counters used to construct a parameter shard.
+ *
+ * For each worker group:
+ *   Every unique Param object is associated with a ParamCounter object whose
+ *   param field points the to Param object itself.
+ *
+ *   Param objects sharing the same values (due to data parallelism) are
+ *   associated with the same ParamCounter whose param field also shares the
+ *   same values.
+ *
+ *   Usage: we need to aggregate gradients from all workers for the shared
+ *   parameters before sending the update request. The nUpdate counter counts
+ *   the number.
+ *
+ * TODO test with different physical architectures.
+ */
+class ParamCounter{
+  public:
+  ParamCounter(shared_ptr<Param> p,int local, int owner):
+    nUpdate(0), nGet(0), nPut(0), nCollect(0), nLocal(local), nTotal(0),
+    owner_procs(owner), param(p){}
+
+  /**
+   * Associate the counter to a Param object.
+   *
+   * @param p
+   * @param local 1 if this Param object is used by workers in this procs, 0
+   *  otherwise
+   * @param owner the procs id of the worker who ownes this Param object
+   */
+  void AddParam(shared_ptr<Param> p, int local, int owner){
+    nLocal+=local;
+    nTotal+=1;
+    if(owner_procs>-1)
+      owner_procs=owner;
+    if(nLocal>1){
+      // TODO copy p->param;
+    }
+  }
+  std::atomic<int> nUpdate, nGet, nPut, nCollect; //!< all counters are atomic
+
+  int nLocal; //!< # local workers uses the shared parameter
+  int nTotal; //!< # total workers uses the shared parameter
+  int owner_procs; //!< the procs id of the worker that owns the parameter
+  shared_ptr<Param> param;
+};
+
+
+/**
+ * Parameter manager at the worker side.
+ */
+class PMWorker{
+public:
+  /**
+   * Workers from the same group resident in the same process share the same
+   * ParamShard which contains ParamCounters for Param objects used/updated by
+   * these worekrs. Shared Param objects are associated with the same
+   * ParamCounter.
+   */
+  typedef std::map<int, shared_ptr<ParamCounter>> ParamShard;
+
+
+	void Setup(int group_id, int worker_id, shared_ptr<ParamShard> shard);
+
+  void set_id(int group_id, int worker_id){
+    group_id_=group_id;
+    worker_id_=worker_id;
+  }
+
+  /**
+   * @return server id where the parameter is maintained.
+   */
+  virtual int Sharding(int param_id);
+
+	/**
+	 * Generate a request message to Get the parameter object.
+	 */
+	virtual Msg* Get(shared_ptr<Param> param, int step);
+  virtual Msg* Get(Msg** msg);
+
+	/**
+	 * Generate a request message to Update the parameter object.
+	 */
+	virtual Msg* Update(shared_ptr<Param> param, int step);
+  virtual Msg* Update(Msg** msg);
+
+	/**
+	 * Collect a Param object returned from server.
+	 */
+	virtual Msg* Collect(Msg**);
+
+	/**
+	 * Generate a request message to Put the parameter object.
+	 */
+	virtual Msg* Put(shared_ptr<Param> param, int step);
+  virtual Msg* Put(Msg** msg);
+
+ protected:
+  int group_id_, worker_id_;
+  shared_ptr<ParamShard> shard_;
+};
+
+/**
+ * Testing worker functionality.The main thread reads the config file and set up the socket.
+ *
+ * Create the shared ParamShard, then starts worker thread which basically carries out the work.
+ * Each thread creates a PMClient object.
+ *
+ * The main thread then enter the loops to forward messages.
+ *
+ * Requests from the worker thread is prepend the paramId, which is stripped by the main thread
+ * before forwarding to the correct server.
+ *
+ * The 1st thread in Client 0 populates the servers with data (PUT request). Wait
+ * for a while before starting the client thread (which does get/update
+ * continuously).
+class SingaClient {
+public:
+	SingaClient(int worker_id, Topology &topology, vector<string> &hosts);
+	void StartClient();
+
+	int id() {
+		return id_;
+	}
+	ParamShard *param_shard() {
+		return param_shard_;
+	}
+	char *backend_endpoint() {
+		return backend_endpoint_;
+	}
+
+private:
+	int id_, local_id_, group_id_;
+	char backend_endpoint_[256];
+	vector<char*> neighbors_;
+	ParamShard *param_shard_;
+
+	int param_to_server_id(int paramId);//< mapping paramId to server ID
+};
+
+//Zthread function for the worker thread, in the global namespace.
+//Basically a loop of: compute, get, update, compute, etc.
+void ClientThread(void *args, zctx_t *ctx, void *pipe);
+
+vector<Param*> gen_random_params();
+void test_get(PMClient *client);
+void test_update(PMClient *client, vector<Param*> params);
+void test_collect(PMClient *client);
+ */
+
+} // namespace singa
+#endif // INCLUDE_TRAINER_PM_WORKER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/server.h
----------------------------------------------------------------------
diff --git a/include/trainer/server.h b/include/trainer/server.h
new file mode 100644
index 0000000..d113c7d
--- /dev/null
+++ b/include/trainer/server.h
@@ -0,0 +1,22 @@
+#ifndef INCLUDE_TRAINER_SERVER_H_
+#define INCLUDE_TRAINER_SERVER_H_
+#include <memory>
+#include "trainer/pm_server.h"
+#include "communication/socket.h"
+
+using std::shared_ptr;
+namespace singa {
+class Server{
+ public:
+  Server(int group_id, int server_id);
+  void Setup(const UpdaterProto& proto, shared_ptr<PMServer::ParamShard> shard,
+    shared_ptr<Dealer> dealer);
+  void Run();
+
+ protected:
+  int group_id_, server_id_;
+  shared_ptr<PMServer> pmserver_;
+  shared_ptr<Dealer> dealer_;
+};
+} /* Server */
+#endif //INCLUDE_TRAINER_SERVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/trainer.h
----------------------------------------------------------------------
diff --git a/include/trainer/trainer.h b/include/trainer/trainer.h
new file mode 100644
index 0000000..34d95f1
--- /dev/null
+++ b/include/trainer/trainer.h
@@ -0,0 +1,50 @@
+#ifndef INCLUDE_TRAINER_TRAINER_H_
+#define INCLUDE_TRAINER_TRAINER_H_
+#include "proto/cluster.pb.h"
+#include "proto/model.pb.h"
+#include "utils/updater.h"
+#include "utils/param.h"
+#include "utils/singleton.h"
+#include "utils/factory.h"
+#include "neuralnet/neuralnet.h"
+#include "trainer/pm_worker.h"
+#include "trainer/pm_server.h"
+#include "trainer/worker.h"
+#include "trainer/server.h"
+
+namespace singa {
+/**
+ * Every running process has a training object which launches one or more
+ * worker (and server) threads.
+ *
+ * The main thread runs a loop to forward messages between workers and servers.
+ */
+class Trainer{
+ public:
+  /**
+   * Start the training in one process
+   *
+   * @param modelproto
+   * @param clusterproto
+   */
+  void Start(const ModelProto& modelproto, const ClusterProto& clusterproto,
+    int procs_id);
+
+  // TODO add Resume() function to continue training from a previously stopped
+  // point.
+
+ protected:
+  void Run();
+  /**
+   * Register default implementations for all base classes used in the system,
+   * e.g., the Updater, BaseMsg, etc.
+   *
+   * All built-in layer implementations are
+   * registered here.
+   * For other base classes, use its base class name (string) as the key and the
+   * implementation class as the value, e.g., <"Updater" SGDUpdater>.
+   */
+  void RegisterDefaultClasses(const singa::ModelProto& proto);
+};
+} /* singa */
+#endif // INCLUDE_TRAINER_TRAINER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/trainer/worker.h
----------------------------------------------------------------------
diff --git a/include/trainer/worker.h b/include/trainer/worker.h
new file mode 100644
index 0000000..609e7dc
--- /dev/null
+++ b/include/trainer/worker.h
@@ -0,0 +1,218 @@
+#ifndef INCLUDE_TRAINER_WORKER_H_
+#define INCLUDE_TRAINER_WORKER_H_
+#include <map>
+#include <exception>
+#include "neuralnet/neuralnet.h"
+#include "proto/model.pb.h"
+#include "trainer/pm_worker.h"
+#include "utils/cluster.h"
+#include "communication/socket.h"
+#include "communication/msg.h"
+
+namespace singa {
+/**
+ * Collecting metrics, like accuracy, loss, etc.
+ */
+class Performance{
+ public:
+  /**
+   * Collect from LossLayer of net.
+   */
+  explicit Performance(shared_ptr<NeuralNet> net);
+  /**
+   * aggregate metrics from LossLayerS
+   */
+  void Update();
+  void Reset();
+  string ToString();
+ private:
+  vector<string> name_;
+  shared_ptr<NeuralNet> net_;
+  vector<vector<float>> metric_;
+  int counter_; //!< inc by 1 for every Update
+};
+
+/**
+ * The Worker class which runs the training algorithm.
+ * The first worker group will initialize parameters of the Net,
+ * and put them into the distributed memory/table.
+ */
+class Worker {
+ public:
+  Worker(int group_id, int worker_id);
+  ~Worker(){}
+  void Setup(const ModelProto& model, shared_ptr<NeuralNet> train_net,
+      shared_ptr<PMWorker::ParamShard> shard, shared_ptr<Dealer> layer_dealer,
+    shared_ptr<Dealer> param_dealer);
+  void set_test_net(shared_ptr<NeuralNet> test_net){
+    test_net_=test_net;
+  }
+  void set_validation_net(shared_ptr<NeuralNet> val_net){
+    validation_net_=val_net;
+  }
+
+  int Put(shared_ptr<Param> param, int step);
+  int Get(shared_ptr<Param> param, int step);
+  int Update(shared_ptr<Param> param, int step);
+  int Collect(shared_ptr<Param> param, int step);
+  /**
+    * check validation/test firstly, then TrainOneBatch
+    * Performance collects performance for the whole neuralnet.
+    * Hence, no need to collect performance in every thread.
+    * Only the main thread will pass none null perf.
+    */
+  void RunOneBatch(int step, Performance* perf=nullptr);
+  /**
+    * Train one mini-batch.
+    * Test/Validation is done before training.
+    */
+  virtual void TrainOneBatch(int step)=0;
+  /**
+   * Test/validate one mini-batch.
+   */
+  virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase)=0;
+  /**
+    * Test the perforance of the learned model on validation or test dataset.
+    * Test is done by the first group.
+    * @param net, neural network
+    * @param phase kValidation or kTest.
+    */
+  void Test(shared_ptr<NeuralNet> net, int nsteps, bool dispperf);
+
+  /**
+    * Main function of Worker.
+    * 1. Train the neuralnet step by step, test/validation is done periodically.
+    * 2. TODO Communicate with others, e.g., zookeeper, after every step.
+    */
+  virtual void Run();
+
+
+  /**
+   * Pull data from layers resident on other nodes due to Model Partition.
+  void Pull(zsock_t* pull, shared_ptr<NeuralNet> net);
+   */
+
+  /**
+   * Check is it time to display training info, e.g., loss and precison.
+   */
+  const bool DisplayNow(const int step) const {
+    return (modelproto_.display_frequency() > 0
+        && step >= modelproto_.display_after_steps()
+        && ((step - modelproto_.display_after_steps())
+          % modelproto_.display_frequency() == 0));
+  }
+
+  const bool DisplayDebugInfo(const int step) const {
+    return DisplayNow(step)&&modelproto_.debug()&&group_id_==0;
+  }
+
+  /**
+   * return true if the stop condition is satisfied, e.g., the maximum number
+   * of steps have been reached.
+   */
+  const bool StopNow(const int step) const{
+    return (step >= modelproto_.train_steps());
+  }
+  /**
+   * Check is it time to do checkpoint.
+   * @param step the ::Train() has been called this num times.
+   */
+  const bool CheckpointNow(const int step) const{
+    return (group_id_==0
+        && modelproto_.checkpoint_frequency() > 0
+        && step >= modelproto_.checkpoint_after_steps()
+        && ((step - modelproto_.checkpoint_after_steps())
+          % modelproto_.checkpoint_frequency() == 0));
+  }
+  /**
+   * Check is it time to do test.
+   * @param step the ::Train() has been called this num times.
+   */
+  const bool TestNow(const int step) const{
+    return (group_id_==0
+        && modelproto_.test_frequency() > 0
+        && step >= modelproto_.test_after_steps()
+        && ((step - modelproto_.test_after_steps())
+          % modelproto_.test_frequency() == 0));
+  }
+  /**
+   * Check is it time to do validation.
+   * @param step the ::Train() has been called step times.
+   */
+  const bool ValidateNow(const int step) {
+    return (group_id_==0
+        && modelproto_.validation_frequency() > 0
+        && step >= modelproto_.validation_after_steps()
+        && ((step - modelproto_.validation_after_steps())
+          % modelproto_.validation_frequency() == 0));
+  }
+
+
+  /**
+   * start training from scratch.
+   * setup training/test/validation neuralnets, then call Run().
+  void Start(ModelProto model);
+   */
+  /**
+   * TODO Resume from snapshot
+  void Resume();
+   */
+  void ReceiveBlobs(shared_ptr<NeuralNet> net);
+  void SendBlob();
+ protected:
+  int group_id_, worker_id_;
+  int step_;
+  ModelProto modelproto_;
+  shared_ptr<PMWorker> pmworker_;
+  shared_ptr<NeuralNet> train_net_, test_net_, validation_net_;
+  shared_ptr<Dealer> layer_dealer_, param_dealer_;
+  Poller layer_poller_, param_poller_;
+};
+
+class WorkerException: public std::exception{
+ public:
+  const char* what() throw(){
+    return "Worker Exception";
+  }
+};
+
+
+class BPWorker: public Worker{
+ public:
+  ~BPWorker(){}
+  BPWorker(int group_id, int worker_id):Worker(group_id, worker_id){}
+  virtual void TrainOneBatch(int step);
+  virtual void TestOneBatch(shared_ptr<NeuralNet> net, int step, Phase phase);
+  void Forward(shared_ptr<NeuralNet> net, int step, bool training);
+  void Backward(shared_ptr<NeuralNet> net, int step);
+    /**
+   * Profiling the time cost of training one batch.
+  string TimerInfo(){
+    char buf[1024];
+    float ticks=ticks_*1000;
+    float tf=tForward_/ticks, tb=tBackward_/ticks,
+          td=tSyncData_/ticks, tp=tSyncParam_/ticks;
+    float total=tf+tb+td+tp;
+    sprintf(buf,
+        "Total\t%6.2f\tforward\t%6.2f\tbackward\t%6.2f\t"
+        // syncdata\t%6.2f\tsyncparam\t%6.2f\n"
+        , total,tf,tb);
+    float gensync=Param::worker_gen_sync/ticks;
+    float handlesync=Param::worker_handle_sync/ticks;
+    sprintf(buf+strlen(buf),
+        "worker_gen_sync\t%6.2f\tworker_handle_sync\t%6.2f\n",
+        gensync, handlesync);
+    Param::worker_gen_sync=0;
+    Param::worker_handle_sync=0;
+    tForward_=0;
+    tBackward_=0;
+    tSyncData_=0;
+    tSyncData_=0;
+    ticks_=0;
+    return string(buf);
+  }
+   */
+};
+}  // namespace singa
+
+#endif  // INCLUDE_TRAINER_WORKER_H_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/blob.h
----------------------------------------------------------------------
diff --git a/include/utils/blob.h b/include/utils/blob.h
new file mode 100644
index 0000000..08068eb
--- /dev/null
+++ b/include/utils/blob.h
@@ -0,0 +1,166 @@
+/**
+ * The code is adapted from that of Caffe whose license is attached.
+ *
+ * COPYRIGHT
+ * All contributions by the University of California:
+ * Copyright (c) 2014, The Regents of the University of California (Regents)
+ * All rights reserved.
+ * All other contributions:
+ * Copyright (c) 2014, the respective contributors
+ * All rights reserved.
+ * Caffe uses a shared copyright model: each contributor holds copyright over
+ * their contributions to Caffe. The project versioning records all such
+ * contribution and copyright details. If a contributor wants to further mark
+ * their specific copyright on a particular contribution, they should indicate
+ * their copyright solely in the commit message of the change when it is
+ * committed.
+ * LICENSE
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * 1. Redistributions of source code must retain the above copyright notice, this
+ * list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+ * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * CONTRIBUTION AGREEMENT
+ * By contributing to the BVLC/caffe repository through pull-request, comment,
+ * or otherwise, the contributor releases their content to the
+ * license and copyright terms herein.
+ *
+ */
+#ifndef INCLUDE_UTILS_BLOB_
+#define INCLUDE_UTILS_BLOB_
+#include <memory>
+#include <vector>
+#include <glog/logging.h>
+#include "proto/model.pb.h"
+using std::shared_ptr;
+using std::vector;
+
+#define NOT_IMPLEMENTED LOG(FATAL) << "Not implemented function"
+inline void MallocHost(void** ptr, size_t size) {
+  *ptr = malloc(size);
+}
+
+inline void FreeHost(void* ptr) {
+  free(ptr);
+}
+
+/**
+ * @brief Manages memory allocation and synchronization between the host (CPU)
+ *        and device (GPU).
+ *
+ * TODO(dox): more thorough description.
+ */
+class SyncedMemory {
+ public:
+  SyncedMemory()
+      : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED),
+        own_cpu_data_(false) {}
+  explicit SyncedMemory(size_t size)
+      : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED),
+        own_cpu_data_(false) {}
+  ~SyncedMemory();
+  const void* cpu_data();
+  void set_cpu_data(void* data);
+  const void* gpu_data();
+  void* mutable_cpu_data();
+  void* mutable_gpu_data();
+  enum SyncedHead { UNINITIALIZED, HEAD_AT_CPU, HEAD_AT_GPU, SYNCED };
+  SyncedHead head() { return head_; }
+  size_t size() { return size_; }
+
+ private:
+  void to_cpu();
+  void to_gpu();
+  void* cpu_ptr_;
+  void* gpu_ptr_;
+  size_t size_;
+  SyncedHead head_;
+  bool own_cpu_data_;
+
+};  // class SyncedMemory
+
+
+template <typename Dtype>
+class Blob {
+ public:
+  Blob(): count_(0), capacity_(0) {}
+  Blob(const vector<int>&shape);
+  /**
+   * @brief Change the dimensions of the blob, allocating new memory if
+   *        necessary.
+   *
+   * This function can be called both to create an initial allocation
+   * of memory, and to adjust the dimensions of a top blob during Layer::Reshape
+   * or Layer::Forward. When changing the size of blob, memory will only be
+   * reallocated if sufficient memory does not already exist, and excess memory
+   * will never be freed.
+   *
+   * Note that reshaping an input blob and immediately calling Net::Backward is
+   * an error; either Net::Forward or Net::Reshape need to be called to
+   * propagate the new input shape to higher layers.
+   */
+  void Reshape(const vector<int>& shape);
+  void ReshapeLike(const Blob& other);
+  const vector<int>& shape() const{
+    return shape_;
+  }
+  inline int count() const { return count_; }
+  /**
+   * @brief Copy from a source Blob.
+   *
+   * @param source the Blob to copy from
+   * @param reshape if false, require this Blob to be pre-shaped to the shape
+   *        of other (and die otherwise); if true, Reshape this Blob to other's
+   *        shape if necessary
+   */
+  void CopyFrom(const Blob<Dtype>& source, bool reshape = false);
+
+  inline const shared_ptr<SyncedMemory>& data() const {
+    CHECK(data_);
+    return data_;
+  }
+
+  const Dtype* cpu_data() const;
+  void set_cpu_data(Dtype* data);
+  const Dtype* gpu_data() const;
+  Dtype* mutable_cpu_data();
+  Dtype* mutable_gpu_data();
+  /*
+  void FromProto(const BlobProto& proto);
+  */
+  void ToProto(singa::BlobProto* proto) const;
+
+  /// @brief Compute the sum of absolute values (L1 norm) of the data.
+  Dtype asum_data() const;
+  Dtype sum_data() const;
+
+  /**
+   * @brief Set the data_ shared_ptr to point to the SyncedMemory holding the
+   *        data_ of Blob other -- useful in Layer&s which simply perform a copy
+   *        in their Forward pass.
+   *
+   * This deallocates the SyncedMemory holding this Blob's data_, as
+   * shared_ptr calls its destructor when reset with the "=" operator.
+   */
+  void ShareData(const Blob& other);
+  void Swap(Blob& other);
+  shared_ptr<SyncedMemory> data_;
+ protected:
+  vector<int> shape_;
+  int count_;
+  int capacity_;
+};  // class Blob
+
+#endif // INCLUDE_UTILS_BLOB_

http://git-wip-us.apache.org/repos/asf/incubator-singa/blob/b2dc51d2/include/utils/cluster.h
----------------------------------------------------------------------
diff --git a/include/utils/cluster.h b/include/utils/cluster.h
new file mode 100644
index 0000000..4812987
--- /dev/null
+++ b/include/utils/cluster.h
@@ -0,0 +1,125 @@
+#ifndef INCLUDE_UTILS_CLUSTER_H_
+#define INCLUDE_UTILS_CLUSTER_H_
+#include <glog/logging.h>
+#include <string>
+#include <utility>
+#include <memory>
+#include <vector>
+#include "proto/cluster.pb.h"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+namespace singa {
+
+/**
+ * Cluster is a singleton object, which provides cluster configuations,
+ * e.g., the topology of the cluster.
+ * All IDs start from 0.
+ */
+class Cluster {
+ public:
+  static shared_ptr<Cluster> Get();
+  static shared_ptr<Cluster> Get(const ClusterProto& cluster, int procs_id);
+
+  const int nserver_groups()const{ return cluster_.nserver_groups(); }
+  const int nworker_groups()const { return cluster_.nworker_groups(); }
+  int nworkers_per_group()const {return cluster_.nworkers_per_group();}
+  int nservers_per_group()const {return cluster_.nservers_per_group();}
+  int nworkers_per_procs()const{return cluster_.nworkers_per_procs();}
+  int nservers_per_procs()const{return cluster_.nservers_per_procs();}
+  int nworker_groups_per_server_group() const {
+    return cluster_.nworker_groups()/cluster_.nserver_groups();
+  }
+
+  /**
+   * @return true if the calling procs has server threads, otherwise false
+   */
+  bool has_server()const {
+    if(server_worker_separate()){
+      CHECK_LT(procs_id_, nprocs());
+      return procs_id_>=nworker_procs();
+    }else
+      return procs_id_<nserver_procs();
+  }
+  /**
+   * @return true if the calling procs has worker threads.
+   */
+  bool has_worker()const {
+    if(server_worker_separate()){
+      return procs_id_<nworker_procs();
+    }else
+      return procs_id_<nprocs();
+  }
+  /**
+   * @return global procs id, which starts from 0.
+   */
+  int procs_id()const {return procs_id_;}
+  bool server_worker_separate() const {
+    return cluster_.server_worker_separate();
+  }
+  int nworker_procs() const {
+    return nworker_groups()*nworkers_per_group()/nworkers_per_procs();
+  }
+  int nserver_procs() const {
+    return nserver_groups()*nservers_per_group()/nservers_per_procs();
+  }
+  int nprocs() const {
+    return cluster_.nprocs();
+  }
+
+  const string endpoint() const {
+    return endpoint(procs_id());
+  }
+  /**
+   * @return endpoint of the router of a procs with the specified id
+   */
+  const string endpoint(int procs_id) const {
+    CHECK_LT(procs_id, nprocs());
+    CHECK_GE(procs_id, 0);
+    return endpoints_.at(procs_id);
+  }
+  const string workspace() {return cluster_.workspace();}
+  const string vis_folder(){
+    return cluster_.workspace()+"/visualization";
+  }
+  const string log_folder(){
+    if(cluster_.has_log_dir()){
+      return cluster_.workspace()+"log";
+    }else
+      return "";
+  }
+
+  const int stub_timeout() const {
+    return cluster_.stub_timeout();
+  }
+  const int worker_timeout() const {
+    return cluster_.worker_timeout();
+  }
+  const int server_timeout() const {
+    return cluster_.server_timeout();
+  }
+
+  /**
+   * bandwidth MB/s
+  float bandwidth() const {
+    return cluster_.bandwidth();
+  }
+   */
+
+ private:
+  Cluster(const ClusterProto &cluster, int procs_id) ;
+  void SetupFolders(const ClusterProto &cluster);
+
+ private:
+  int procs_id_;
+  std::vector<std::string> endpoints_;
+  // cluster config proto
+  ClusterProto cluster_;
+  // make this class a singlton
+  static shared_ptr<Cluster> instance_;
+};
+}  // namespace singa
+
+#endif  // INCLUDE_UTILS_CLUSTER_H_


Mime
View raw message