OmniSciDB  04ee39c94c
anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT > Class Template Reference

Public Member Functions

 Deduplicater (int8_t *row_buff, const size_t row_size, const size_t row_count, const size_t key_count)
 
size_t run ()
 

Private Member Functions

void runDispatch (std::vector< std::unordered_set< std::vector< KeyT >>> &mask_set, std::vector< std::mutex > &mutex_set, const size_t start_entry, const size_t end_entry)
 

Private Attributes

int8_t * buff_
 
const size_t entry_sz_
 
const size_t entry_cnt_
 
const size_t key_cnt_
 

Detailed Description

template<bool isColumnar, typename KeyT = int64_t>
class anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >

Definition at line 1166 of file ProfileTest.cpp.

Constructor & Destructor Documentation

◆ Deduplicater()

template<bool isColumnar, typename KeyT = int64_t>
anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::Deduplicater ( int8_t *  row_buff,
const size_t  row_size,
const size_t  row_count,
const size_t  key_count 
)
inline

Member Function Documentation

◆ run()

template<bool isColumnar, typename KeyT = int64_t>
size_t anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::run ( )
inline

Definition at line 1176 of file ProfileTest.cpp.

References CHECK_GE, and cpu_threads().

1176  {
1177  std::vector<std::future<void>> child_threads;
1178  const size_t cpu_count = cpu_threads();
1179  const size_t stride = (entry_cnt_ + cpu_count - 1) / cpu_count;
1180 
1181  std::vector<std::unordered_set<std::vector<KeyT>>> mask_set(
1182  cpu_count, std::unordered_set<std::vector<KeyT>>());
1183  std::vector<std::mutex> mutex_set(cpu_count);
1184  for (size_t start_entry = 0, i = 0; start_entry < entry_cnt_;
1185  start_entry += stride, ++i) {
1186  const auto end_entry = std::min(entry_cnt_, start_entry + stride);
1187  child_threads.push_back(std::async(std::launch::async,
1189  this,
1190  std::ref(mask_set),
1191  std::ref(mutex_set),
1192  start_entry,
1193  end_entry));
1194  }
1195 
1196  for (auto& child : child_threads) {
1197  child.get();
1198  }
1199 
1200  size_t row_count = 0;
1201  for (auto& mask : mask_set) {
1202  row_count += mask.size();
1203  }
1204  CHECK_GE(entry_cnt_, row_count);
1205  return row_count;
1206  }
#define CHECK_GE(x, y)
Definition: Logger.h:200
void runDispatch(std::vector< std::unordered_set< std::vector< KeyT >>> &mask_set, std::vector< std::mutex > &mutex_set, const size_t start_entry, const size_t end_entry)
int cpu_threads()
Definition: thread_count.h:23
+ Here is the call graph for this function:

◆ runDispatch()

template<bool isColumnar, typename KeyT = int64_t>
void anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::runDispatch ( std::vector< std::unordered_set< std::vector< KeyT >>> &  mask_set,
std::vector< std::mutex > &  mutex_set,
const size_t  start_entry,
const size_t  end_entry 
)
inlineprivate

Definition at line 1214 of file ProfileTest.cpp.

References CHECK_EQ, and anonymous_namespace{ProfileTest.cpp}::reset_entry().

1217  {
1218  CHECK_EQ(mask_set.size(), mutex_set.size());
1219  const size_t set_size = mask_set.size();
1220  for (size_t i = start_entry; i < end_entry; ++i) {
1221  std::vector<KeyT> keys(key_cnt_);
1222  auto key_buffers = reinterpret_cast<KeyT*>(buff_);
1223  if (isColumnar) {
1224  for (size_t k = 0; k < key_cnt_; ++k) {
1225  keys[k] = key_buffers[i + k * entry_cnt_];
1226  }
1227  } else {
1228  for (size_t k = 0; k < key_cnt_; ++k) {
1229  keys[k] = reinterpret_cast<const KeyT*>(buff_ + i * entry_sz_)[k];
1230  }
1231  }
1232  CHECK_EQ(keys.size(), key_cnt_);
1233  const size_t mask_idx = std::hash<decltype(keys)>()(keys) % set_size;
1234  const bool inserted = [&]() {
1235  std::lock_guard<std::mutex> mask_lock(mutex_set[mask_idx]);
1236  auto it_ok = mask_set[mask_idx].insert(keys);
1237  return it_ok.second;
1238  }();
1239  if (!inserted) {
1240  if (isColumnar) {
1241  for (size_t k = 0; k < key_cnt_; ++k) {
1242  reset_entry(key_buffers + i + k * entry_cnt_);
1243  }
1244  } else {
1245  for (size_t k = 0; k < key_cnt_; ++k) {
1246  reset_entry(reinterpret_cast<KeyT*>(buff_ + i * entry_sz_) + k);
1247  }
1248  }
1249  }
1250  }
1251  }
#define CHECK_EQ(x, y)
Definition: Logger.h:195
+ Here is the call graph for this function:

Member Data Documentation

◆ buff_

template<bool isColumnar, typename KeyT = int64_t>
int8_t* anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::buff_
private

Definition at line 1209 of file ProfileTest.cpp.

◆ entry_cnt_

template<bool isColumnar, typename KeyT = int64_t>
const size_t anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::entry_cnt_
private

Definition at line 1211 of file ProfileTest.cpp.

◆ entry_sz_

template<bool isColumnar, typename KeyT = int64_t>
const size_t anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::entry_sz_
private

Definition at line 1210 of file ProfileTest.cpp.

◆ key_cnt_

template<bool isColumnar, typename KeyT = int64_t>
const size_t anonymous_namespace{ProfileTest.cpp}::Deduplicater< isColumnar, KeyT >::key_cnt_
private

Definition at line 1212 of file ProfileTest.cpp.


The documentation for this class was generated from the following file: