OmniSciDB  0264ff685a
ArrowForeignStorage.cpp File Reference
#include "ArrowForeignStorage.h"
#include <arrow/api.h>
#include <arrow/csv/reader.h>
#include <arrow/io/file.h>
#include <arrow/util/decimal.h>
#include <tbb/parallel_for.h>
#include <tbb/task_group.h>
#include <array>
#include <future>
#include <vector>
#include "Catalog/DataframeTableDescriptor.h"
#include "DataMgr/ForeignStorage/ForeignStorageInterface.h"
#include "DataMgr/StringNoneEncoder.h"
#include "Logger/Logger.h"
#include "QueryEngine/ArrowResultSet.h"
#include "Shared/ArrowUtil.h"
#include "Shared/measure.h"
+ Include dependency graph for ArrowForeignStorage.cpp:

Go to the source code of this file.

Classes

struct  Frag
 
struct  ArrowFragment
 
class  ArrowForeignStorageBase
 
class  ArrowForeignStorage
 
class  ArrowCsvForeignStorage
 

Functions

std::vector< FragcalculateFragmentsOffsets (const arrow::ChunkedArray &array, size_t maxFragRows)
 
static SQLTypeInfo getOmnisciType (const arrow::DataType &type)
 
void setArrowTable (std::string name, std::shared_ptr< arrow::Table > table)
 
void releaseArrowTable (std::string name)
 
void registerArrowForeignStorage ()
 
static std::shared_ptr< arrow::DataType > getArrowImportType (const SQLTypeInfo type)
 
void registerArrowCsvForeignStorage ()
 

Function Documentation

◆ calculateFragmentsOffsets()

std::vector<Frag> calculateFragmentsOffsets ( const arrow::ChunkedArray &  array,
size_t  maxFragRows 
)

Definition at line 284 of file ArrowForeignStorage.cpp.

Referenced by ArrowForeignStorageBase::parseArrowTable().

285  {
286  std::vector<Frag> fragments;
287  size_t sz = 0;
288  size_t offset = 0;
289  fragments.push_back({0, 0, 0, 0});
290  size_t num_chunks = (size_t)array.num_chunks();
291  for (size_t i = 0; i < num_chunks;) {
292  auto& chunk = *array.chunk(i);
293  auto& frag = *fragments.rbegin();
294  if (maxFragRows - sz > chunk.length() - offset) {
295  sz += chunk.length() - offset;
296  if (i == num_chunks - 1) {
297  fragments.rbegin()->last_chunk = num_chunks - 1;
298  fragments.rbegin()->last_chunk_size =
299  array.chunk((int)num_chunks - 1)->length() - offset;
300  }
301  offset = 0;
302  i++;
303  } else {
304  frag.last_chunk = i;
305  frag.last_chunk_size = maxFragRows - sz;
306  offset += maxFragRows - sz;
307  sz = 0;
308  fragments.push_back({i, offset, 0, 0});
309  }
310  }
311  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
312  fragments.rbegin()->last_chunk_size == 0) {
313  // remove empty fragment at the end if any
314  fragments.pop_back();
315  }
316  return fragments;
317 }
+ Here is the caller graph for this function:

◆ getArrowImportType()

static std::shared_ptr<arrow::DataType> getArrowImportType ( const SQLTypeInfo  type)
static

Definition at line 892 of file ArrowForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_precision(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), IS_INTEGER, kARRAY, kBOOLEAN, kCHAR, kDECIMAL, kDOUBLE, kFLOAT, kINTERVAL_DAY_TIME, kINTERVAL_YEAR_MONTH, kNUMERIC, kTEXT, kTIME, kTIMESTAMP, kVARCHAR, and to_string().

Referenced by ArrowCsvForeignStorage::registerTable().

892  {
893  using namespace arrow;
894  auto ktype = type.get_type();
895  if (IS_INTEGER(ktype)) {
896  switch (type.get_size()) {
897  case 1:
898  return int8();
899  case 2:
900  return int16();
901  case 4:
902  return int32();
903  case 8:
904  return int64();
905  default:
906  CHECK(false);
907  }
908  }
909  switch (ktype) {
910  case kBOOLEAN:
911  return boolean();
912  case kFLOAT:
913  return float32();
914  case kDOUBLE:
915  return float64();
916  case kCHAR:
917  case kVARCHAR:
918  case kTEXT:
919  return utf8();
920  case kDECIMAL:
921  case kNUMERIC:
922  return decimal(type.get_precision(), type.get_scale());
923  case kTIME:
924  return time32(TimeUnit::SECOND);
925  // case kDATE:
926  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
927  // Currently support for date32() is missing in cuDF.Hence, if client requests for
928  // date on GPU, return date64() for the time being, till support is added.
929  // return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
930  case kTIMESTAMP:
931  switch (type.get_precision()) {
932  case 0:
933  return timestamp(TimeUnit::SECOND);
934  case 3:
935  return timestamp(TimeUnit::MILLI);
936  case 6:
937  return timestamp(TimeUnit::MICRO);
938  case 9:
939  return timestamp(TimeUnit::NANO);
940  default:
941  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
942  std::to_string(type.get_precision()));
943  }
944  case kARRAY:
945  case kINTERVAL_DAY_TIME:
947  default:
948  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
949  }
950  return nullptr;
951 }
int get_precision() const
Definition: sqltypes.h:314
Definition: sqltypes.h:48
HOST DEVICE int get_size() const
Definition: sqltypes.h:321
HOST DEVICE int get_scale() const
Definition: sqltypes.h:316
std::string to_string(char const *&&v)
Definition: sqltypes.h:51
Definition: Importer.h:62
std::string get_type_name() const
Definition: sqltypes.h:414
#define IS_INTEGER(T)
Definition: sqltypes.h:236
Definition: sqltypes.h:40
#define CHECK(condition)
Definition: Logger.h:197
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:311
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ getOmnisciType()

static SQLTypeInfo getOmnisciType ( const arrow::DataType &  type)
static

Definition at line 763 of file ArrowForeignStorage.cpp.

References kBIGINT, kBOOLEAN, kDECIMAL, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, omnisci.dtypes::STRING, and run_benchmark_import::type.

Referenced by ArrowForeignStorage::prepareTable().

763  {
764  using namespace arrow;
765  switch (type.id()) {
766  case Type::INT8:
767  return SQLTypeInfo(kTINYINT, false);
768  case Type::INT16:
769  return SQLTypeInfo(kSMALLINT, false);
770  case Type::INT32:
771  return SQLTypeInfo(kINT, false);
772  case Type::INT64:
773  return SQLTypeInfo(kBIGINT, false);
774  case Type::BOOL:
775  return SQLTypeInfo(kBOOLEAN, false);
776  case Type::FLOAT:
777  return SQLTypeInfo(kFLOAT, false);
778  case Type::DOUBLE:
779  return SQLTypeInfo(kDOUBLE, false);
780  // uncomment when arrow 2.0 will be released and modin support for dictionary types
781  // in read_csv would be implemented
782 
783  // case Type::DICTIONARY: {
784  // auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
785  // // this is needed because createTable forces type.size to be equal to
786  // // comp_param / 8, no matter what type.size you set here
787  // type.set_comp_param(sizeof(uint32_t) * 8);
788  // return type;
789  // }
790  // case Type::STRING:
791  // return SQLTypeInfo(kTEXT, false, kENCODING_NONE);
792 
793  case Type::STRING: {
794  auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
795  // this is needed because createTable forces type.size to be equal to
796  // comp_param / 8, no matter what type.size you set here
797  type.set_comp_param(sizeof(uint32_t) * 8);
798  return type;
799  }
800  case Type::DECIMAL: {
801  const auto& decimal_type = static_cast<const arrow::DecimalType&>(type);
802  return SQLTypeInfo(kDECIMAL, decimal_type.precision(), decimal_type.scale(), false);
803  }
804  case Type::TIME32:
805  return SQLTypeInfo(kTIME, false);
806  case Type::TIMESTAMP:
807  switch (static_cast<const arrow::TimestampType&>(type).unit()) {
808  case TimeUnit::SECOND:
809  return SQLTypeInfo(kTIMESTAMP, 0, 0);
810  case TimeUnit::MILLI:
811  return SQLTypeInfo(kTIMESTAMP, 3, 0);
812  case TimeUnit::MICRO:
813  return SQLTypeInfo(kTIMESTAMP, 6, 0);
814  case TimeUnit::NANO:
815  return SQLTypeInfo(kTIMESTAMP, 9, 0);
816  }
817  default:
818  throw std::runtime_error(type.ToString() + " is not yet supported.");
819  }
820 }
Definition: sqltypes.h:48
Definition: sqltypes.h:51
Definition: Importer.h:62
Definition: sqltypes.h:44
+ Here is the caller graph for this function:

◆ registerArrowCsvForeignStorage()

void registerArrowCsvForeignStorage ( )

Definition at line 1025 of file ArrowForeignStorage.cpp.

References ForeignStorageInterface::registerPersistentStorageInterface().

Referenced by DBHandler::DBHandler().

1025  {
1027  std::make_unique<ArrowCsvForeignStorage>());
1028 }
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ registerArrowForeignStorage()

void registerArrowForeignStorage ( )

Definition at line 860 of file ArrowForeignStorage.cpp.

References ForeignStorageInterface::registerPersistentStorageInterface().

860  {
862  std::make_unique<ArrowForeignStorage>());
863 }
static void registerPersistentStorageInterface(std::unique_ptr< PersistentForeignStorageInterface > persistent_foreign_storage)
+ Here is the call graph for this function:

◆ releaseArrowTable()

void releaseArrowTable ( std::string  name)

Definition at line 856 of file ArrowForeignStorage.cpp.

References ArrowForeignStorage::tables.

856  {
858 }
name
Definition: setup.py:35
static std::map< std::string, std::shared_ptr< arrow::Table > > tables

◆ setArrowTable()

void setArrowTable ( std::string  name,
std::shared_ptr< arrow::Table >  table 
)

Definition at line 852 of file ArrowForeignStorage.cpp.

References setup::name, and ArrowForeignStorage::tables.

852  {
854 }
name
Definition: setup.py:35
static std::map< std::string, std::shared_ptr< arrow::Table > > tables