OmniSciDB  2e3a973ef4
ArrowCsvForeignStorage.cpp File Reference
#include "ArrowCsvForeignStorage.h"
#include <arrow/api.h>
#include <arrow/csv/reader.h>
#include <arrow/io/file.h>
#include <arrow/util/bit_util.h>
#include <arrow/util/decimal.h>
#include <tbb/parallel_for.h>
#include <tbb/task_group.h>
#include <array>
#include <future>
#include "Catalog/DataframeTableDescriptor.h"
#include "DataMgr/ForeignStorage/ForeignStorageInterface.h"
#include "DataMgr/StringNoneEncoder.h"
#include "Logger/Logger.h"
#include "QueryEngine/ArrowResultSet.h"
#include "Shared/ArrowUtil.h"
#include "Shared/measure.h"
+ Include dependency graph for ArrowCsvForeignStorage.cpp:

Go to the source code of this file.

Classes

struct  Frag
 
class  ArrowCsvForeignStorage
 
struct  ArrowCsvForeignStorage::ArrowFragment
 

Functions

void registerArrowCsvForeignStorage (void)
 
template<typename T >
void setNulls (int8_t *data, int count)
 
void generateSentinelValues (int8_t *data, const SQLTypeInfo &columnType, size_t count)
 
static std::shared_ptr< arrow::DataType > getArrowImportType (const SQLTypeInfo type)
 
void getSizeAndOffset (const Frag &frag, const std::shared_ptr< arrow::Array > &chunk, int i, int &size, int &offset)
 
void generateNullValues (const std::vector< Frag > &fragments, arrow::ChunkedArray *arr_col_chunked_array, const SQLTypeInfo &columnType)
 
template<typename T >
void setNullValues (const std::vector< Frag > &fragments, arrow::ChunkedArray *arr_col_chunked_array)
 

Function Documentation

◆ generateNullValues()

void generateNullValues ( const std::vector< Frag > &  fragments,
arrow::ChunkedArray *  arr_col_chunked_array,
const SQLTypeInfo columnType 
)

Definition at line 800 of file ArrowCsvForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_size(), and SQLTypeInfo::is_integer().

Referenced by getSizeAndOffset(), and ArrowCsvForeignStorage::registerTable().

802  {
803  const size_t typeSize = columnType.get_size();
804  if (columnType.is_integer()) {
805  switch (typeSize) {
806  case 1:
807  setNullValues<int8_t>(fragments, arr_col_chunked_array);
808  break;
809  case 2:
810  setNullValues<int16_t>(fragments, arr_col_chunked_array);
811  break;
812  case 4:
813  setNullValues<int32_t>(fragments, arr_col_chunked_array);
814  break;
815  case 8:
816  setNullValues<int64_t>(fragments, arr_col_chunked_array);
817  break;
818  default:
819  // TODO: throw unsupported integer type exception
820  CHECK(false);
821  }
822  } else {
823  if (typeSize == 4) {
824  setNullValues<float>(fragments, arr_col_chunked_array);
825  } else {
826  setNullValues<double>(fragments, arr_col_chunked_array);
827  }
828  }
829 }
bool is_integer() const
Definition: sqltypes.h:419
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ generateSentinelValues()

void generateSentinelValues ( int8_t *  data,
const SQLTypeInfo columnType,
size_t  count 
)

Definition at line 113 of file ArrowCsvForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_size(), and SQLTypeInfo::is_integer().

Referenced by ArrowCsvForeignStorage::read().

113  {
114  const size_t type_size = columnType.get_size();
115  if (columnType.is_integer()) {
116  switch (type_size) {
117  case 1:
118  setNulls<int8_t>(data, count);
119  break;
120  case 2:
121  setNulls<int16_t>(data, count);
122  break;
123  case 4:
124  setNulls<int32_t>(data, count);
125  break;
126  case 8:
127  setNulls<int64_t>(data, count);
128  break;
129  default:
130  // TODO: throw unsupported integer type exception
131  CHECK(false);
132  }
133  } else {
134  if (type_size == 4) {
135  setNulls<float>(data, count);
136  } else {
137  setNulls<double>(data, count);
138  }
139  }
140 }
bool is_integer() const
Definition: sqltypes.h:419
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
#define CHECK(condition)
Definition: Logger.h:197
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getArrowImportType()

static std::shared_ptr<arrow::DataType> getArrowImportType ( const SQLTypeInfo  type)
static

Definition at line 236 of file ArrowCsvForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_precision(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), IS_INTEGER, kARRAY, kBOOLEAN, kCHAR, kDECIMAL, kDOUBLE, kFLOAT, kINTERVAL_DAY_TIME, kINTERVAL_YEAR_MONTH, kNUMERIC, kTEXT, kTIME, kTIMESTAMP, kVARCHAR, and to_string().

Referenced by ArrowCsvForeignStorage::registerTable().

236  {
237  using namespace arrow;
238  auto ktype = type.get_type();
239  if (IS_INTEGER(ktype)) {
240  switch (type.get_size()) {
241  case 1:
242  return int8();
243  case 2:
244  return int16();
245  case 4:
246  return int32();
247  case 8:
248  return int64();
249  default:
250  CHECK(false);
251  }
252  }
253  switch (ktype) {
254  case kBOOLEAN:
255  return boolean();
256  case kFLOAT:
257  return float32();
258  case kDOUBLE:
259  return float64();
260  case kCHAR:
261  case kVARCHAR:
262  case kTEXT:
263  return utf8();
264  case kDECIMAL:
265  case kNUMERIC:
266  return decimal(type.get_precision(), type.get_scale());
267  case kTIME:
268  return time32(TimeUnit::SECOND);
269  // case kDATE:
270  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
271  // Currently support for date32() is missing in cuDF.Hence, if client requests for
272  // date on GPU, return date64() for the time being, till support is added.
273  // return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
274  case kTIMESTAMP:
275  switch (type.get_precision()) {
276  case 0:
277  return timestamp(TimeUnit::SECOND);
278  case 3:
279  return timestamp(TimeUnit::MILLI);
280  case 6:
281  return timestamp(TimeUnit::MICRO);
282  case 9:
283  return timestamp(TimeUnit::NANO);
284  default:
285  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
286  std::to_string(type.get_precision()));
287  }
288  case kARRAY:
289  case kINTERVAL_DAY_TIME:
291  default:
292  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
293  }
294  return nullptr;
295 }
int get_precision() const
Definition: sqltypes.h:262
Definition: sqltypes.h:51
HOST DEVICE int get_size() const
Definition: sqltypes.h:269
HOST DEVICE int get_scale() const
Definition: sqltypes.h:264
std::string to_string(char const *&&v)
Definition: sqltypes.h:54
Definition: Importer.h:62
std::string get_type_name() const
Definition: sqltypes.h:362
#define IS_INTEGER(T)
Definition: sqltypes.h:168
Definition: sqltypes.h:43
#define CHECK(condition)
Definition: Logger.h:197
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:259
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getSizeAndOffset()

void getSizeAndOffset ( const Frag frag,
const std::shared_ptr< arrow::Array > &  chunk,
int  i,
int &  size,
int &  offset 
)

Definition at line 304 of file ArrowCsvForeignStorage.cpp.

References Frag::first_chunk, Frag::first_chunk_offset, generateNullValues(), Frag::last_chunk, and Frag::last_chunk_size.

Referenced by ArrowCsvForeignStorage::createDecimalColumn(), ArrowCsvForeignStorage::createDictionaryEncodedColumn(), and ArrowCsvForeignStorage::registerTable().

308  {
309  offset = (i == frag.first_chunk) ? frag.first_chunk_offset : 0;
310  size = (i == frag.last_chunk) ? frag.last_chunk_size : (chunk->length() - offset);
311 }
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ registerArrowCsvForeignStorage()

void registerArrowCsvForeignStorage ( void  )

Definition at line 95 of file ArrowCsvForeignStorage.cpp.

References ForeignStorageInterface::registerPersistentStorageInterface().

Referenced by DBHandler::DBHandler().

95  {
97  std::make_unique<ArrowCsvForeignStorage>());
98 }
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ setNulls()

template<typename T >
void setNulls ( int8_t *  data,
int  count 
)

Definition at line 106 of file ArrowCsvForeignStorage.cpp.

106  {
107  T* dataT = reinterpret_cast<T*>(data);
108  const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
109  : std::numeric_limits<T>::max();
110  std::fill(dataT, dataT + count, null_value);
111 }

◆ setNullValues()

template<typename T >
void setNullValues ( const std::vector< Frag > &  fragments,
arrow::ChunkedArray *  arr_col_chunked_array 
)

Definition at line 752 of file ArrowCsvForeignStorage.cpp.

References Frag::first_chunk, anonymous_namespace{TypedDataAccessors.h}::is_null(), Frag::last_chunk, and run_benchmark_import::res.

753  {
754  const T null_value = std::is_signed<T>::value ? std::numeric_limits<T>::min()
755  : std::numeric_limits<T>::max();
756 
757  tbb::parallel_for(
758  tbb::blocked_range<size_t>(0, fragments.size()),
759  [&](const tbb::blocked_range<size_t>& r0) {
760  for (size_t f = r0.begin(); f != r0.end(); ++f) {
761  tbb::parallel_for(
762  tbb::blocked_range<size_t>(fragments[f].first_chunk,
763  fragments[f].last_chunk + 1),
764  [&](const tbb::blocked_range<size_t>& r1) {
765  for (auto chunk_index = r1.begin(); chunk_index != r1.end();
766  ++chunk_index) {
767  auto chunk = arr_col_chunked_array->chunk(chunk_index).get();
768  if (chunk->data()->null_count == chunk->data()->length) {
769  // it means we will insert sentinel values in read function
770  continue;
771  }
772  auto data = const_cast<uint8_t*>(chunk->data()->buffers[1]->data());
773  if (data && chunk->null_bitmap()) {
774  T* dataT = reinterpret_cast<T*>(data);
775  const uint8_t* bitmap_data = chunk->null_bitmap_data();
776  const int64_t length = chunk->length();
777  const int64_t bitmap_length = chunk->null_bitmap()->size() - 1;
778  for (int64_t bitmap_idx = 0; bitmap_idx < bitmap_length;
779  ++bitmap_idx) {
780  T* res = dataT + bitmap_idx * 8;
781  for (int8_t bitmap_offset = 0; bitmap_offset < 8; ++bitmap_offset) {
782  auto is_null = (~bitmap_data[bitmap_idx] >> bitmap_offset) & 1;
783  auto val = is_null ? null_value : res[bitmap_offset];
784  res[bitmap_offset] = val;
785  }
786  }
787 
788  for (int64_t j = bitmap_length * 8; j < length; ++j) {
789  auto is_null = (~bitmap_data[bitmap_length] >> (j % 8)) & 1;
790  auto val = is_null ? null_value : dataT[j];
791  dataT[j] = val;
792  }
793  }
794  }
795  });
796  }
797  });
798 }
bool is_null(const T &v, const SQLTypeInfo &t)
+ Here is the call graph for this function: