/*! * Copyright 2017 XGBoost contributors */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef XGBOOST_USE_NCCL #include "nccl.h" #endif // Uncomment to enable #define TIMERS namespace dh { #define HOST_DEV_INLINE XGBOOST_DEVICE __forceinline__ #define DEV_INLINE __device__ __forceinline__ /* * Error handling functions */ #define safe_cuda(ans) ThrowOnCudaError((ans), __FILE__, __LINE__) inline cudaError_t ThrowOnCudaError(cudaError_t code, const char *file, int line) { if (code != cudaSuccess) { std::stringstream ss; ss << file << "(" << line << ")"; std::string file_and_line; ss >> file_and_line; throw thrust::system_error(code, thrust::cuda_category(), file_and_line); } return code; } #ifdef XGBOOST_USE_NCCL #define safe_nccl(ans) ThrowOnNcclError((ans), __FILE__, __LINE__) inline ncclResult_t ThrowOnNcclError(ncclResult_t code, const char *file, int line) { if (code != ncclSuccess) { std::stringstream ss; ss << "NCCL failure :" << ncclGetErrorString(code) << " "; ss << file << "(" << line << ")"; throw std::runtime_error(ss.str()); } return code; } #endif template T *Raw(thrust::device_vector &v) { // NOLINT return raw_pointer_cast(v.data()); } template const T *Raw(const thrust::device_vector &v) { // NOLINT return raw_pointer_cast(v.data()); } inline int NVisibleDevices() { int n_visgpus = 0; dh::safe_cuda(cudaGetDeviceCount(&n_visgpus)); return n_visgpus; } inline int NDevicesAll(int n_gpus) { int n_devices_visible = dh::NVisibleDevices(); int n_devices = n_gpus < 0 ? n_devices_visible : n_gpus; return (n_devices); } inline int NDevices(int n_gpus, int num_rows) { int n_devices = dh::NDevicesAll(n_gpus); // fix-up device number to be limited by number of rows n_devices = n_devices > num_rows ? num_rows : n_devices; return (n_devices); } // if n_devices=-1, then use all visible devices inline void SynchronizeNDevices(int n_devices, std::vector dList) { for (int d_idx = 0; d_idx < n_devices; d_idx++) { int device_idx = dList[d_idx]; safe_cuda(cudaSetDevice(device_idx)); safe_cuda(cudaDeviceSynchronize()); } } inline void SynchronizeAll() { for (int device_idx = 0; device_idx < NVisibleDevices(); device_idx++) { safe_cuda(cudaSetDevice(device_idx)); safe_cuda(cudaDeviceSynchronize()); } } inline std::string DeviceName(int device_idx) { cudaDeviceProp prop; dh::safe_cuda(cudaGetDeviceProperties(&prop, device_idx)); return std::string(prop.name); } inline size_t AvailableMemory(int device_idx) { size_t device_free = 0; size_t device_total = 0; safe_cuda(cudaSetDevice(device_idx)); dh::safe_cuda(cudaMemGetInfo(&device_free, &device_total)); return device_free; } /** * \fn inline int max_shared_memory(int device_idx) * * \brief Maximum shared memory per block on this device. * * \param device_idx Zero-based index of the device. */ inline size_t MaxSharedMemory(int device_idx) { cudaDeviceProp prop; dh::safe_cuda(cudaGetDeviceProperties(&prop, device_idx)); return prop.sharedMemPerBlock; } // ensure gpu_id is correct, so not dependent upon user knowing details inline int GetDeviceIdx(int gpu_id) { // protect against overrun for gpu_id return (std::abs(gpu_id) + 0) % dh::NVisibleDevices(); } inline void CheckComputeCapability() { int n_devices = NVisibleDevices(); for (int d_idx = 0; d_idx < n_devices; ++d_idx) { cudaDeviceProp prop; safe_cuda(cudaGetDeviceProperties(&prop, d_idx)); std::ostringstream oss; oss << "CUDA Capability Major/Minor version number: " << prop.major << "." << prop.minor << " is insufficient. Need >=3.5"; int failed = prop.major < 3 || prop.major == 3 && prop.minor < 5; if (failed) LOG(WARNING) << oss.str() << " for device: " << d_idx; } } /* * Range iterator */ class Range { public: class Iterator { friend class Range; public: XGBOOST_DEVICE int64_t operator*() const { return i_; } XGBOOST_DEVICE const Iterator &operator++() { i_ += step_; return *this; } XGBOOST_DEVICE Iterator operator++(int) { Iterator copy(*this); i_ += step_; return copy; } XGBOOST_DEVICE bool operator==(const Iterator &other) const { return i_ >= other.i_; } XGBOOST_DEVICE bool operator!=(const Iterator &other) const { return i_ < other.i_; } XGBOOST_DEVICE void Step(int s) { step_ = s; } protected: XGBOOST_DEVICE explicit Iterator(int64_t start) : i_(start) {} public: uint64_t i_; int step_ = 1; }; XGBOOST_DEVICE Iterator begin() const { return begin_; } // NOLINT XGBOOST_DEVICE Iterator end() const { return end_; } // NOLINT XGBOOST_DEVICE Range(int64_t begin, int64_t end) : begin_(begin), end_(end) {} XGBOOST_DEVICE void Step(int s) { begin_.Step(s); } private: Iterator begin_; Iterator end_; }; template __device__ Range GridStrideRange(T begin, T end) { begin += blockDim.x * blockIdx.x + threadIdx.x; Range r(begin, end); r.Step(gridDim.x * blockDim.x); return r; } template __device__ Range BlockStrideRange(T begin, T end) { begin += threadIdx.x; Range r(begin, end); r.Step(blockDim.x); return r; } // Threadblock iterates over range, filling with value. Requires all threads in // block to be active. template __device__ void BlockFill(IterT begin, size_t n, ValueT value) { for (auto i : BlockStrideRange(static_cast(0), n)) { begin[i] = value; } } /* * Kernel launcher */ template T1 DivRoundUp(const T1 a, const T2 b) { return static_cast(ceil(static_cast(a) / b)); } template __global__ void LaunchNKernel(size_t begin, size_t end, L lambda) { for (auto i : GridStrideRange(begin, end)) { lambda(i); } } template __global__ void LaunchNKernel(int device_idx, size_t begin, size_t end, L lambda) { for (auto i : GridStrideRange(begin, end)) { lambda(i, device_idx); } } template inline void LaunchN(int device_idx, size_t n, L lambda) { if (n == 0) { return; } safe_cuda(cudaSetDevice(device_idx)); const int GRID_SIZE = static_cast(DivRoundUp(n, ITEMS_PER_THREAD * BLOCK_THREADS)); LaunchNKernel<<>>(static_cast(0), n, lambda); } /* * Memory */ enum MemoryType { kDevice, kDeviceManaged }; template class BulkAllocator; template class DVec2; template class DVec { friend class DVec2; private: T *ptr_; size_t size_; int device_idx_; public: void ExternalAllocate(int device_idx, void *ptr, size_t size) { if (!Empty()) { throw std::runtime_error("Tried to allocate DVec but already allocated"); } ptr_ = static_cast(ptr); size_ = size; device_idx_ = device_idx; safe_cuda(cudaSetDevice(device_idx_)); } DVec() : ptr_(NULL), size_(0), device_idx_(-1) {} size_t Size() const { return size_; } int DeviceIdx() const { return device_idx_; } bool Empty() const { return ptr_ == NULL || size_ == 0; } T *Data() { return ptr_; } const T *Data() const { return ptr_; } std::vector AsVector() const { std::vector h_vector(Size()); safe_cuda(cudaSetDevice(device_idx_)); safe_cuda(cudaMemcpy(h_vector.data(), ptr_, Size() * sizeof(T), cudaMemcpyDeviceToHost)); return h_vector; } void Fill(T value) { auto d_ptr = ptr_; LaunchN(device_idx_, Size(), [=] __device__(size_t idx) { d_ptr[idx] = value; }); } void Print() { auto h_vector = this->AsVector(); for (auto e : h_vector) { std::cout << e << " "; } std::cout << "\n"; } thrust::device_ptr tbegin() { return thrust::device_pointer_cast(ptr_); } thrust::device_ptr tend() { return thrust::device_pointer_cast(ptr_ + Size()); } template DVec &operator=(const std::vector &other) { this->copy(other.begin(), other.end()); return *this; } DVec &operator=(DVec &other) { if (other.Size() != Size()) { throw std::runtime_error( "Cannot copy assign DVec to DVec, sizes are different"); } safe_cuda(cudaSetDevice(this->DeviceIdx())); if (other.DeviceIdx() == this->DeviceIdx()) { dh::safe_cuda(cudaMemcpy(this->Data(), other.Data(), other.Size() * sizeof(T), cudaMemcpyDeviceToDevice)); } else { std::cout << "deviceother: " << other.DeviceIdx() << " devicethis: " << this->DeviceIdx() << std::endl; std::cout << "size deviceother: " << other.Size() << " devicethis: " << this->DeviceIdx() << std::endl; throw std::runtime_error("Cannot copy to/from different devices"); } return *this; } template void copy(IterT begin, IterT end) { safe_cuda(cudaSetDevice(this->DeviceIdx())); if (end - begin != Size()) { throw std::runtime_error( "Cannot copy assign vector to DVec, sizes are different"); } thrust::copy(begin, end, this->tbegin()); } void copy(thrust::device_ptr begin, thrust::device_ptr end) { safe_cuda(cudaSetDevice(this->DeviceIdx())); if (end - begin != Size()) { throw std::runtime_error( "Cannot copy assign vector to dvec, sizes are different"); } safe_cuda(cudaMemcpy(this->Data(), begin.get(), Size() * sizeof(T), cudaMemcpyDefault)); } }; /** * @class DVec2 device_helpers.cuh * @brief wrapper for storing 2 DVec's which are needed for cub::DoubleBuffer */ template class DVec2 { private: DVec d1_, d2_; cub::DoubleBuffer buff_; int device_idx_; public: void ExternalAllocate(int device_idx, void *ptr1, void *ptr2, size_t size) { if (!Empty()) { throw std::runtime_error("Tried to allocate DVec2 but already allocated"); } device_idx_ = device_idx; d1_.ExternalAllocate(device_idx_, ptr1, size); d2_.ExternalAllocate(device_idx_, ptr2, size); buff_.d_buffers[0] = static_cast(ptr1); buff_.d_buffers[1] = static_cast(ptr2); buff_.selector = 0; } DVec2() : d1_(), d2_(), buff_(), device_idx_(-1) {} size_t Size() const { return d1_.Size(); } int DeviceIdx() const { return device_idx_; } bool Empty() const { return d1_.Empty() || d2_.Empty(); } cub::DoubleBuffer &buff() { return buff_; } DVec &D1() { return d1_; } DVec &D2() { return d2_; } T *Current() { return buff_.Current(); } DVec &CurrentDVec() { return buff_.selector == 0 ? D1() : D2(); } T *other() { return buff_.Alternate(); } }; template class BulkAllocator { std::vector d_ptr_; std::vector size_; std::vector device_idx_; static const int kAlign = 256; size_t AlignRoundUp(size_t n) const { n = (n + kAlign - 1) / kAlign; return n * kAlign; } template size_t GetSizeBytes(DVec *first_vec, size_t first_size) { return AlignRoundUp(first_size * sizeof(T)); } template size_t GetSizeBytes(DVec *first_vec, size_t first_size, Args... args) { return GetSizeBytes(first_vec, first_size) + GetSizeBytes(args...); } template void AllocateDVec(int device_idx, char *ptr, DVec *first_vec, size_t first_size) { first_vec->ExternalAllocate(device_idx, static_cast(ptr), first_size); } template void AllocateDVec(int device_idx, char *ptr, DVec *first_vec, size_t first_size, Args... args) { AllocateDVec(device_idx, ptr, first_vec, first_size); ptr += AlignRoundUp(first_size * sizeof(T)); AllocateDVec(device_idx, ptr, args...); } char *AllocateDevice(int device_idx, size_t bytes, MemoryType t) { char *ptr; safe_cuda(cudaSetDevice(device_idx)); safe_cuda(cudaMalloc(&ptr, bytes)); return ptr; } template size_t GetSizeBytes(DVec2 *first_vec, size_t first_size) { return 2 * AlignRoundUp(first_size * sizeof(T)); } template size_t GetSizeBytes(DVec2 *first_vec, size_t first_size, Args... args) { return GetSizeBytes(first_vec, first_size) + GetSizeBytes(args...); } template void AllocateDVec(int device_idx, char *ptr, DVec2 *first_vec, size_t first_size) { first_vec->ExternalAllocate( device_idx, static_cast(ptr), static_cast(ptr + AlignRoundUp(first_size * sizeof(T))), first_size); } template void AllocateDVec(int device_idx, char *ptr, DVec2 *first_vec, size_t first_size, Args... args) { AllocateDVec(device_idx, ptr, first_vec, first_size); ptr += (AlignRoundUp(first_size * sizeof(T)) * 2); AllocateDVec(device_idx, ptr, args...); } public: BulkAllocator() = default; // prevent accidental copying, moving or assignment of this object BulkAllocator(const BulkAllocator&) = delete; BulkAllocator(BulkAllocator&&) = delete; void operator=(const BulkAllocator&) = delete; void operator=(BulkAllocator&&) = delete; ~BulkAllocator() { for (size_t i = 0; i < d_ptr_.size(); i++) { if (!(d_ptr_[i] == nullptr)) { safe_cuda(cudaSetDevice(device_idx_[i])); safe_cuda(cudaFree(d_ptr_[i])); d_ptr_[i] = nullptr; } } } // returns sum of bytes for all allocations size_t Size() { return std::accumulate(size_.begin(), size_.end(), static_cast(0)); } template void Allocate(int device_idx, bool silent, Args... args) { size_t size = GetSizeBytes(args...); char *ptr = AllocateDevice(device_idx, size, MemoryT); AllocateDVec(device_idx, ptr, args...); d_ptr_.push_back(ptr); size_.push_back(size); device_idx_.push_back(device_idx); if (!silent) { const int mb_size = 1048576; LOG(CONSOLE) << "Allocated " << size / mb_size << "MB on [" << device_idx << "] " << DeviceName(device_idx) << ", " << AvailableMemory(device_idx) / mb_size << "MB remaining."; } } }; // Keep track of cub library device allocation struct CubMemory { void *d_temp_storage; size_t temp_storage_bytes; // Thrust using value_type = char; // NOLINT CubMemory() : d_temp_storage(nullptr), temp_storage_bytes(0) {} ~CubMemory() { Free(); } template T *Pointer() { return static_cast(d_temp_storage); } void Free() { if (this->IsAllocated()) { safe_cuda(cudaFree(d_temp_storage)); } } void LazyAllocate(size_t num_bytes) { if (num_bytes > temp_storage_bytes) { Free(); safe_cuda(cudaMalloc(&d_temp_storage, num_bytes)); temp_storage_bytes = num_bytes; } } // Thrust char *allocate(std::ptrdiff_t num_bytes) { // NOLINT LazyAllocate(num_bytes); return reinterpret_cast(d_temp_storage); } // Thrust void deallocate(char *ptr, size_t n) { // NOLINT // Do nothing } bool IsAllocated() { return d_temp_storage != nullptr; } }; /* * Utility functions */ template void Print(const DVec &v, size_t max_items = 10) { std::vector h = v.as_vector(); for (size_t i = 0; i < std::min(max_items, h.size()); i++) { std::cout << " " << h[i]; } std::cout << "\n"; } /** * @brief Helper macro to measure timing on GPU * @param call the GPU call * @param name name used to track later * @param stream cuda stream where to measure time */ #define TIMEIT(call, name) \ do { \ dh::Timer t1234; \ call; \ t1234.printElapsed(name); \ } while (0) // Load balancing search template void FindMergePartitions(int device_idx, CoordinateT *d_tile_coordinates, size_t num_tiles, int tile_size, SegmentT segments, OffsetT num_rows, OffsetT num_elements) { dh::LaunchN(device_idx, num_tiles + 1, [=] __device__(int idx) { OffsetT diagonal = idx * tile_size; CoordinateT tile_coordinate; cub::CountingInputIterator nonzero_indices(0); // Search the merge path // Cast to signed integer as this function can have negatives cub::MergePathSearch(static_cast(diagonal), segments + 1, nonzero_indices, static_cast(num_rows), static_cast(num_elements), tile_coordinate); // Output starting offset d_tile_coordinates[idx] = tile_coordinate; }); } template __global__ void LbsKernel(CoordinateT *d_coordinates, SegmentIterT segment_end_offsets, FunctionT f, OffsetT num_segments) { int tile = blockIdx.x; CoordinateT tile_start_coord = d_coordinates[tile]; CoordinateT tile_end_coord = d_coordinates[tile + 1]; int64_t tile_num_rows = tile_end_coord.x - tile_start_coord.x; int64_t tile_num_elements = tile_end_coord.y - tile_start_coord.y; cub::CountingInputIterator tile_element_indices(tile_start_coord.y); CoordinateT thread_start_coord; typedef typename std::iterator_traits::value_type SegmentT; __shared__ struct { SegmentT tile_segment_end_offsets[TILE_SIZE + 1]; SegmentT output_segment[TILE_SIZE]; } temp_storage; for (auto item : dh::BlockStrideRange(int(0), int(tile_num_rows + 1))) { temp_storage.tile_segment_end_offsets[item] = segment_end_offsets[min(static_cast(tile_start_coord.x + item), static_cast(num_segments - 1))]; } __syncthreads(); int64_t diag = threadIdx.x * ITEMS_PER_THREAD; // Cast to signed integer as this function can have negatives cub::MergePathSearch(diag, // Diagonal temp_storage.tile_segment_end_offsets, // List A tile_element_indices, // List B tile_num_rows, tile_num_elements, thread_start_coord); CoordinateT thread_current_coord = thread_start_coord; #pragma unroll for (int ITEM = 0; ITEM < ITEMS_PER_THREAD; ++ITEM) { if (tile_element_indices[thread_current_coord.y] < temp_storage.tile_segment_end_offsets[thread_current_coord.x]) { temp_storage.output_segment[thread_current_coord.y] = thread_current_coord.x + tile_start_coord.x; ++thread_current_coord.y; } else { ++thread_current_coord.x; } } __syncthreads(); for (auto item : dh::BlockStrideRange(int(0), int(tile_num_elements))) { f(tile_start_coord.y + item, temp_storage.output_segment[item]); } } template void SparseTransformLbs(int device_idx, dh::CubMemory *temp_memory, OffsetT count, SegmentIterT segments, OffsetT num_segments, FunctionT f) { typedef typename cub::CubVector::Type CoordinateT; dh::safe_cuda(cudaSetDevice(device_idx)); const int BLOCK_THREADS = 256; const int ITEMS_PER_THREAD = 1; const int TILE_SIZE = BLOCK_THREADS * ITEMS_PER_THREAD; auto num_tiles = dh::DivRoundUp(count + num_segments, BLOCK_THREADS); CHECK(num_tiles < std::numeric_limits::max()); temp_memory->LazyAllocate(sizeof(CoordinateT) * (num_tiles + 1)); CoordinateT *tmp_tile_coordinates = reinterpret_cast(temp_memory->d_temp_storage); FindMergePartitions(device_idx, tmp_tile_coordinates, num_tiles, BLOCK_THREADS, segments, num_segments, count); LbsKernel <<>>(tmp_tile_coordinates, segments + 1, f, num_segments); } template void DenseTransformLbs(int device_idx, OffsetT count, OffsetT num_segments, FunctionT f) { CHECK(count % num_segments == 0) << "Data is not dense."; LaunchN(device_idx, count, [=] __device__(OffsetT idx) { OffsetT segment = idx / (count / num_segments); f(idx, segment); }); } /** * \fn template * void TransformLbs(int device_idx, dh::CubMemory *temp_memory, OffsetT count, * SegmentIterT segments, OffsetT num_segments, bool is_dense, FunctionT f) * * \brief Load balancing search function. Reads a CSR type matrix description * and allows a function to be executed on each element. Search 'modern GPU load * balancing search' for more information. * * \author Rory * \date 7/9/2017 * * \tparam FunctionT Type of the function t. * \tparam SegmentIterT Type of the segments iterator. * \tparam OffsetT Type of the offset. * \param device_idx Zero-based index of the device. * \param [in,out] temp_memory Temporary memory allocator. * \param count Number of elements. * \param segments Device pointer to segments. * \param num_segments Number of segments. * \param is_dense True if this object is dense. * \param f Lambda to be executed on matrix elements. */ template void TransformLbs(int device_idx, dh::CubMemory *temp_memory, OffsetT count, SegmentIterT segments, OffsetT num_segments, bool is_dense, FunctionT f) { if (is_dense) { DenseTransformLbs(device_idx, count, num_segments, f); } else { SparseTransformLbs(device_idx, temp_memory, count, segments, num_segments, f); } } /** * @brief Helper function to sort the pairs using cub's segmented RadixSortPairs * @param tmp_mem cub temporary memory info * @param keys keys double-buffer array * @param vals the values double-buffer array * @param nVals number of elements in the array * @param nSegs number of segments * @param offsets the segments */ template void SegmentedSort(dh::CubMemory *tmp_mem, dh::DVec2 *keys, dh::DVec2 *vals, int nVals, int nSegs, const dh::DVec &offsets, int start = 0, int end = sizeof(T1) * 8) { size_t tmpSize; dh::safe_cuda(cub::DeviceSegmentedRadixSort::SortPairs( NULL, tmpSize, keys->buff(), vals->buff(), nVals, nSegs, offsets.Data(), offsets.Data() + 1, start, end)); tmp_mem->LazyAllocate(tmpSize); dh::safe_cuda(cub::DeviceSegmentedRadixSort::SortPairs( tmp_mem->d_temp_storage, tmpSize, keys->buff(), vals->buff(), nVals, nSegs, offsets.Data(), offsets.Data() + 1, start, end)); } /** * @brief Helper function to perform device-wide sum-reduction * @param tmp_mem cub temporary memory info * @param in the input array to be reduced * @param out the output reduced value * @param nVals number of elements in the input array */ template void SumReduction(dh::CubMemory &tmp_mem, dh::DVec &in, dh::DVec &out, int nVals) { size_t tmpSize; dh::safe_cuda( cub::DeviceReduce::Sum(NULL, tmpSize, in.Data(), out.Data(), nVals)); tmp_mem.LazyAllocate(tmpSize); dh::safe_cuda(cub::DeviceReduce::Sum(tmp_mem.d_temp_storage, tmpSize, in.Data(), out.Data(), nVals)); } /** * @brief Helper function to perform device-wide sum-reduction, returns to the * host * @param tmp_mem cub temporary memory info * @param in the input array to be reduced * @param nVals number of elements in the input array */ template typename std::iterator_traits::value_type SumReduction(dh::CubMemory &tmp_mem, T in, int nVals) { using ValueT = typename std::iterator_traits::value_type; size_t tmpSize; dh::safe_cuda(cub::DeviceReduce::Sum(nullptr, tmpSize, in, in, nVals)); // Allocate small extra memory for the return value tmp_mem.LazyAllocate(tmpSize + sizeof(ValueT)); auto ptr = reinterpret_cast(tmp_mem.d_temp_storage) + 1; dh::safe_cuda(cub::DeviceReduce::Sum( reinterpret_cast(ptr), tmpSize, in, reinterpret_cast(tmp_mem.d_temp_storage), nVals)); ValueT sum; dh::safe_cuda(cudaMemcpy(&sum, tmp_mem.d_temp_storage, sizeof(ValueT), cudaMemcpyDeviceToHost)); return sum; } /** * @brief Fill a given constant value across all elements in the buffer * @param out the buffer to be filled * @param len number of elements i the buffer * @param def default value to be filled */ template void FillConst(int device_idx, T *out, int len, T def) { dh::LaunchN(device_idx, len, [=] __device__(int i) { out[i] = def; }); } /** * @brief gather elements * @param out1 output gathered array for the first buffer * @param in1 first input buffer * @param out2 output gathered array for the second buffer * @param in2 second input buffer * @param instId gather indices * @param nVals length of the buffers */ template void Gather(int device_idx, T1 *out1, const T1 *in1, T2 *out2, const T2 *in2, const int *instId, int nVals) { dh::LaunchN(device_idx, nVals, [=] __device__(int i) { int iid = instId[i]; T1 v1 = in1[iid]; T2 v2 = in2[iid]; out1[i] = v1; out2[i] = v2; }); } /** * @brief gather elements * @param out output gathered array * @param in input buffer * @param instId gather indices * @param nVals length of the buffers */ template void Gather(int device_idx, T *out, const T *in, const int *instId, int nVals) { dh::LaunchN(device_idx, nVals, [=] __device__(int i) { int iid = instId[i]; out[i] = in[iid]; }); } /** * \class AllReducer * * \brief All reducer class that manages its own communication group and * streams. Must be initialised before use. If XGBoost is compiled without NCCL * this is a dummy class that will error if used with more than one GPU. */ class AllReducer { bool initialised; #ifdef XGBOOST_USE_NCCL std::vector comms; std::vector streams; std::vector device_ordinals; #endif public: AllReducer() : initialised(false) {} /** * \fn void Init(const std::vector &device_ordinals) * * \brief Initialise with the desired device ordinals for this communication * group. * * \param device_ordinals The device ordinals. */ void Init(const std::vector &device_ordinals) { #ifdef XGBOOST_USE_NCCL this->device_ordinals = device_ordinals; comms.resize(device_ordinals.size()); dh::safe_nccl(ncclCommInitAll(comms.data(), static_cast(device_ordinals.size()), device_ordinals.data())); streams.resize(device_ordinals.size()); for (size_t i = 0; i < device_ordinals.size(); i++) { safe_cuda(cudaSetDevice(device_ordinals[i])); safe_cuda(cudaStreamCreate(&streams[i])); } initialised = true; #else CHECK_EQ(device_ordinals.size(), 1) << "XGBoost must be compiled with NCCL to use more than one GPU."; #endif } ~AllReducer() { #ifdef XGBOOST_USE_NCCL if (initialised) { for (auto &stream : streams) { dh::safe_cuda(cudaStreamDestroy(stream)); } for (auto &comm : comms) { ncclCommDestroy(comm); } } #endif } /** * \brief Allreduce. Use in exactly the same way as NCCL but without needing * streams or comms. * * \param communication_group_idx Zero-based index of the communication group. * \param sendbuff The sendbuff. * \param recvbuff The recvbuff. * \param count Number of elements. */ void AllReduceSum(int communication_group_idx, const double *sendbuff, double *recvbuff, int count) { #ifdef XGBOOST_USE_NCCL CHECK(initialised); dh::safe_cuda(cudaSetDevice(device_ordinals[communication_group_idx])); dh::safe_nccl(ncclAllReduce(sendbuff, recvbuff, count, ncclDouble, ncclSum, comms[communication_group_idx], streams[communication_group_idx])); #endif } /** * \brief Allreduce. Use in exactly the same way as NCCL but without needing streams or comms. * * \param count Number of. * * \param communication_group_idx Zero-based index of the communication group. \param sendbuff. * \param sendbuff The sendbuff. * \param recvbuff The recvbuff. * \param count Number of. */ void AllReduceSum(int communication_group_idx, const int64_t *sendbuff, int64_t *recvbuff, int count) { #ifdef XGBOOST_USE_NCCL CHECK(initialised); dh::safe_cuda(cudaSetDevice(device_ordinals[communication_group_idx])); dh::safe_nccl(ncclAllReduce(sendbuff, recvbuff, count, ncclInt64, ncclSum, comms[communication_group_idx], streams[communication_group_idx])); #endif } /** * \fn void Synchronize() * * \brief Synchronizes the entire communication group. */ void Synchronize() { #ifdef XGBOOST_USE_NCCL for (int i = 0; i < device_ordinals.size(); i++) { dh::safe_cuda(cudaSetDevice(device_ordinals[i])); dh::safe_cuda(cudaStreamSynchronize(streams[i])); } #endif } }; /** * \brief Executes some operation on each element of the input vector, using a * single controlling thread for each element. * * \tparam T Generic type parameter. * \tparam FunctionT Type of the function t. * \param shards The shards. * \param f The func_t to process. */ template void ExecuteShards(std::vector *shards, FunctionT f) { #pragma omp parallel for schedule(static, 1) for (int shard = 0; shard < shards->size(); ++shard) { f(shards->at(shard)); } } /** * \brief Executes some operation on each element of the input vector, using a * single controlling thread for each element. In addition, passes the shard index * into the function. * * \tparam T Generic type parameter. * \tparam FunctionT Type of the function t. * \param shards The shards. * \param f The func_t to process. */ template void ExecuteIndexShards(std::vector *shards, FunctionT f) { #pragma omp parallel for schedule(static, 1) for (int shard = 0; shard < shards->size(); ++shard) { f(shard, shards->at(shard)); } } /** * \brief Executes some operation on each element of the input vector, using a single controlling * thread for each element, returns the sum of the results. * * \tparam ReduceT Type of the reduce t. * \tparam T Generic type parameter. * \tparam FunctionT Type of the function t. * \param shards The shards. * \param f The func_t to process. * * \return A reduce_t. */ template ReduceT ReduceShards(std::vector *shards, FunctionT f) { std::vector sums(shards->size()); #pragma omp parallel for schedule(static, 1) for (int shard = 0; shard < shards->size(); ++shard) { sums[shard] = f(shards->at(shard)); } return std::accumulate(sums.begin(), sums.end(), ReduceT()); } } // namespace dh