Open3D (C++ API)  0.12.0
HashmapCPU.h
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - Open3D: www.open3d.org -
3 // ----------------------------------------------------------------------------
4 // The MIT License (MIT)
5 //
6 // Copyright (c) 2018 www.open3d.org
7 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to deal
10 // in the Software without restriction, including without limitation the rights
11 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 // copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
14 //
15 // The above copyright notice and this permission notice shall be included in
16 // all copies or substantial portions of the Software.
17 //
18 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
24 // IN THE SOFTWARE.
25 // ----------------------------------------------------------------------------
26 
27 #pragma once
28 
29 #include <tbb/concurrent_unordered_map.h>
30 
31 #include <unordered_map>
32 
35 
36 namespace open3d {
37 namespace core {
38 template <typename Hash, typename KeyEq>
39 class CPUHashmap : public DeviceHashmap<Hash, KeyEq> {
40 public:
41  CPUHashmap(int64_t init_buckets,
42  int64_t init_capacity,
43  int64_t dsize_key,
44  int64_t dsize_value,
45  const Device& device);
46 
47  ~CPUHashmap();
48 
49  void Rehash(int64_t buckets) override;
50 
51  void Insert(const void* input_keys,
52  const void* input_values,
53  addr_t* output_addrs,
54  bool* output_masks,
55  int64_t count) override;
56 
57  void Activate(const void* input_keys,
58  addr_t* output_addrs,
59  bool* output_masks,
60  int64_t count) override;
61 
62  void Find(const void* input_keys,
63  addr_t* output_addrs,
64  bool* output_masks,
65  int64_t count) override;
66 
67  void Erase(const void* input_keys,
68  bool* output_masks,
69  int64_t count) override;
70 
71  int64_t GetActiveIndices(addr_t* output_indices) override;
72 
73  int64_t Size() const override;
74 
75  std::vector<int64_t> BucketSizes() const override;
76  float LoadFactor() const override;
77 
78 protected:
79  std::shared_ptr<tbb::concurrent_unordered_map<void*, addr_t, Hash, KeyEq>>
81 
82  std::shared_ptr<CPUHashmapBufferContext> buffer_ctx_;
83 
84  void InsertImpl(const void* input_keys,
85  const void* input_values,
86  addr_t* output_addrs,
87  bool* output_masks,
88  int64_t count);
89 
90  void Allocate(int64_t capacity, int64_t buckets);
91 };
92 
93 template <typename Hash, typename KeyEq>
95  int64_t init_capacity,
96  int64_t dsize_key,
97  int64_t dsize_value,
98  const Device& device)
99  : DeviceHashmap<Hash, KeyEq>(
100  init_buckets,
101  init_capacity,
102  dsize_key,
104  dsize_value,
105  device) {
106  Allocate(init_capacity, init_buckets);
107 }
108 
109 template <typename Hash, typename KeyEq>
111 
112 template <typename Hash, typename KeyEq>
114  return impl_->size();
115 }
116 
117 template <typename Hash, typename KeyEq>
118 void CPUHashmap<Hash, KeyEq>::Insert(const void* input_keys,
119  const void* input_values,
120  addr_t* output_addrs,
121  bool* output_masks,
122  int64_t count) {
123  int64_t new_size = Size() + count;
124  if (new_size > this->capacity_) {
125  float avg_capacity_per_bucket =
126  float(this->capacity_) / float(this->bucket_count_);
127  int64_t expected_buckets = std::max(
128  this->bucket_count_ * 2,
129  int64_t(std::ceil(new_size / avg_capacity_per_bucket)));
130  Rehash(expected_buckets);
131  }
132  InsertImpl(input_keys, input_values, output_addrs, output_masks, count);
133 }
134 
135 template <typename Hash, typename KeyEq>
136 void CPUHashmap<Hash, KeyEq>::Activate(const void* input_keys,
137  addr_t* output_addrs,
138  bool* output_masks,
139  int64_t count) {
140  int64_t new_size = Size() + count;
141  if (new_size > this->capacity_) {
142  float avg_capacity_per_bucket =
143  float(this->capacity_) / float(this->bucket_count_);
144  int64_t expected_buckets = std::max(
145  this->bucket_count_ * 2,
146  int64_t(std::ceil(new_size / avg_capacity_per_bucket)));
147  Rehash(expected_buckets);
148  }
149  InsertImpl(input_keys, nullptr, output_addrs, output_masks, count);
150 }
151 
152 template <typename Hash, typename KeyEq>
153 void CPUHashmap<Hash, KeyEq>::Find(const void* input_keys,
154  addr_t* output_addrs,
155  bool* output_masks,
156  int64_t count) {
157 #pragma omp parallel for
158  for (int64_t i = 0; i < count; ++i) {
159  uint8_t* key = const_cast<uint8_t*>(
160  static_cast<const uint8_t*>(input_keys) + this->dsize_key_ * i);
161 
162  auto iter = impl_->find(key);
163  bool flag = (iter != impl_->end());
164  output_masks[i] = flag;
165  output_addrs[i] = flag ? iter->second : 0;
166  }
167 }
168 
169 template <typename Hash, typename KeyEq>
170 void CPUHashmap<Hash, KeyEq>::Erase(const void* input_keys,
171  bool* output_masks,
172  int64_t count) {
173  for (int64_t i = 0; i < count; ++i) {
174  uint8_t* key = const_cast<uint8_t*>(
175  static_cast<const uint8_t*>(input_keys) + this->dsize_key_ * i);
176 
177  auto iter = impl_->find(key);
178  bool flag = (iter != impl_->end());
179  output_masks[i] = flag;
180  if (flag) {
181  buffer_ctx_->DeviceFree(iter->second);
182  impl_->unsafe_erase(iter);
183  }
184  }
185  this->bucket_count_ = impl_->unsafe_bucket_count();
186 }
187 
188 template <typename Hash, typename KeyEq>
190  int64_t count = impl_->size();
191  int64_t i = 0;
192  for (auto iter = impl_->begin(); iter != impl_->end(); ++iter, ++i) {
193  output_indices[i] = static_cast<int64_t>(iter->second);
194  }
195 
196  return count;
197 }
198 
199 template <typename Hash, typename KeyEq>
200 void CPUHashmap<Hash, KeyEq>::Rehash(int64_t buckets) {
201  int64_t iterator_count = Size();
202 
203  Tensor active_keys;
204  Tensor active_values;
205 
206  if (iterator_count > 0) {
207  Tensor active_addrs({iterator_count}, Dtype::Int32, this->device_);
208  GetActiveIndices(static_cast<addr_t*>(active_addrs.GetDataPtr()));
209 
210  Tensor active_indices = active_addrs.To(Dtype::Int64);
211  active_keys = this->GetKeyBuffer().IndexGet({active_indices});
212  active_values = this->GetValueBuffer().IndexGet({active_indices});
213  }
214 
215  float avg_capacity_per_bucket =
216  float(this->capacity_) / float(this->bucket_count_);
217 
218  int64_t new_capacity =
219  int64_t(std::ceil(buckets * avg_capacity_per_bucket));
220  Allocate(new_capacity, buckets);
221 
222  if (iterator_count > 0) {
223  Tensor output_addrs({iterator_count}, Dtype::Int32, this->device_);
224  Tensor output_masks({iterator_count}, Dtype::Bool, this->device_);
225 
226  InsertImpl(active_keys.GetDataPtr(), active_values.GetDataPtr(),
227  static_cast<addr_t*>(output_addrs.GetDataPtr()),
228  static_cast<bool*>(output_masks.GetDataPtr()),
229  iterator_count);
230  }
231 
232  impl_->rehash(buckets);
233  this->bucket_count_ = impl_->unsafe_bucket_count();
234 }
235 
236 template <typename Hash, typename KeyEq>
237 std::vector<int64_t> CPUHashmap<Hash, KeyEq>::BucketSizes() const {
238  int64_t bucket_count = impl_->unsafe_bucket_count();
239  std::vector<int64_t> ret;
240  for (int64_t i = 0; i < bucket_count; ++i) {
241  ret.push_back(impl_->unsafe_bucket_size(i));
242  }
243  return ret;
244 }
245 
246 template <typename Hash, typename KeyEq>
248  return impl_->load_factor();
249 }
250 
251 template <typename Hash, typename KeyEq>
252 void CPUHashmap<Hash, KeyEq>::InsertImpl(const void* input_keys,
253  const void* input_values,
254  addr_t* output_addrs,
255  bool* output_masks,
256  int64_t count) {
257 #pragma omp parallel for
258  for (int64_t i = 0; i < count; ++i) {
259  const uint8_t* src_key =
260  static_cast<const uint8_t*>(input_keys) + this->dsize_key_ * i;
261 
262  addr_t dst_kv_addr = buffer_ctx_->DeviceAllocate();
263  auto dst_kv_iter = buffer_ctx_->ExtractIterator(dst_kv_addr);
264 
265  uint8_t* dst_key = static_cast<uint8_t*>(dst_kv_iter.first);
266  uint8_t* dst_value = static_cast<uint8_t*>(dst_kv_iter.second);
267  std::memcpy(dst_key, src_key, this->dsize_key_);
268 
269  if (input_values != nullptr) {
270  const uint8_t* src_value =
271  static_cast<const uint8_t*>(input_values) +
272  this->dsize_value_ * i;
273  std::memcpy(dst_value, src_value, this->dsize_value_);
274  } else {
275  std::memset(dst_value, 0, this->dsize_value_);
276  }
277 
278  // Try insertion.
279  auto res = impl_->insert({dst_key, dst_kv_addr});
280 
281  output_addrs[i] = dst_kv_addr;
282  output_masks[i] = res.second;
283  }
284 
285 #pragma omp parallel for
286  for (int64_t i = 0; i < count; ++i) {
287  if (!output_masks[i]) {
288  buffer_ctx_->DeviceFree(output_addrs[i]);
289  }
290  }
291 
292  this->bucket_count_ = impl_->unsafe_bucket_count();
293 }
294 
295 template <typename Hash, typename KeyEq>
296 void CPUHashmap<Hash, KeyEq>::Allocate(int64_t capacity, int64_t buckets) {
297  this->capacity_ = capacity;
298 
299  this->buffer_ =
300  std::make_shared<HashmapBuffer>(this->capacity_, this->dsize_key_,
301  this->dsize_value_, this->device_);
302 
303  buffer_ctx_ = std::make_shared<CPUHashmapBufferContext>(
304  this->capacity_, this->dsize_key_, this->dsize_value_,
305  this->buffer_->GetKeyBuffer(), this->buffer_->GetValueBuffer(),
306  this->buffer_->GetHeap());
307  buffer_ctx_->Reset();
308 
309  impl_ = std::make_shared<
310  tbb::concurrent_unordered_map<void*, addr_t, Hash, KeyEq>>(
311  buckets, Hash(this->dsize_key_), KeyEq(this->dsize_key_));
312 }
313 
314 } // namespace core
315 } // namespace open3d
int64_t Size() const override
Definition: HashmapCPU.h:113
~CPUHashmap()
Definition: HashmapCPU.h:110
Tensor & GetKeyBuffer()
Definition: DeviceHashmap.h:161
int64_t dsize_value_
Definition: DeviceHashmap.h:175
void * GetDataPtr()
Definition: Tensor.h:961
void Allocate(int64_t capacity, int64_t buckets)
Definition: HashmapCPU.h:296
void Insert(const void *input_keys, const void *input_values, addr_t *output_addrs, bool *output_masks, int64_t count) override
Parallel insert contiguous arrays of keys and values.
Definition: HashmapCPU.h:118
Tensor & GetValueBuffer()
Definition: DeviceHashmap.h:162
std::shared_ptr< CPUHashmapBufferContext > buffer_ctx_
Definition: HashmapCPU.h:82
float LoadFactor() const override
Return size / bucket_count.
Definition: HashmapCPU.h:247
FN_SPECIFIERS MiniVec< float, N > ceil(const MiniVec< float, N > &a)
Definition: MiniVec.h:108
Base class: shared interface.
Definition: DeviceHashmap.h:101
int64_t capacity_
Definition: DeviceHashmap.h:173
void InsertImpl(const void *input_keys, const void *input_values, addr_t *output_addrs, bool *output_masks, int64_t count)
Definition: HashmapCPU.h:252
static const Dtype Int32
Definition: Dtype.h:44
Tensor IndexGet(const std::vector< Tensor > &index_tensors) const
Advanced indexing getter.
Definition: Tensor.cpp:689
std::shared_ptr< HashmapBuffer > buffer_
Definition: DeviceHashmap.h:179
Tensor To(Dtype dtype, bool copy=false) const
Definition: Tensor.cpp:453
void Activate(const void *input_keys, addr_t *output_addrs, bool *output_masks, int64_t count) override
Definition: HashmapCPU.h:136
Definition: Device.h:39
int64_t dsize_key_
Definition: DeviceHashmap.h:174
int count
Definition: FilePCD.cpp:61
std::vector< int64_t > BucketSizes() const override
Definition: HashmapCPU.h:237
Device device_
Definition: DeviceHashmap.h:177
static const Dtype Int64
Definition: Dtype.h:45
Definition: PinholeCameraIntrinsic.cpp:35
Definition: Tensor.h:48
const char const char value recording_handle imu_sample recording_handle uint8_t size_t data_size k4a_record_configuration_t config target_format k4a_capture_t capture_handle k4a_imu_sample_t imu_sample playback_handle k4a_logging_message_cb_t void min_level device_handle k4a_imu_sample_t timeout_in_ms capture_handle capture_handle capture_handle image_handle float
Definition: K4aPlugin.cpp:465
uint32_t addr_t
Definition: HashmapBuffer.h:58
void Find(const void *input_keys, addr_t *output_addrs, bool *output_masks, int64_t count) override
Parallel find a contiguous array of keys.
Definition: HashmapCPU.h:153
int64_t GetActiveIndices(addr_t *output_indices) override
Parallel collect all iterators in the hash table.
Definition: HashmapCPU.h:189
void Erase(const void *input_keys, bool *output_masks, int64_t count) override
Parallel erase a contiguous array of keys.
Definition: HashmapCPU.h:170
Definition: HashmapCPU.h:39
static const Dtype Bool
Definition: Dtype.h:48
CPUHashmap(int64_t init_buckets, int64_t init_capacity, int64_t dsize_key, int64_t dsize_value, const Device &device)
Definition: HashmapCPU.h:94
void Rehash(int64_t buckets) override
Definition: HashmapCPU.h:200
int64_t bucket_count_
Definition: DeviceHashmap.h:172
std::shared_ptr< tbb::concurrent_unordered_map< void *, addr_t, Hash, KeyEq > > impl_
Definition: HashmapCPU.h:80