OmniSciDB  6686921089
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ArrowForeignStorage.cpp File Reference
#include "ArrowForeignStorage.h"
#include <arrow/api.h>
#include <arrow/csv/reader.h>
#include <arrow/io/file.h>
#include <arrow/util/decimal.h>
#include <tbb/parallel_for.h>
#include <tbb/task_group.h>
#include <array>
#include <future>
#include <vector>
#include "Catalog/DataframeTableDescriptor.h"
#include "DataMgr/ForeignStorage/ForeignStorageInterface.h"
#include "DataMgr/StringNoneEncoder.h"
#include "Logger/Logger.h"
#include "QueryEngine/ArrowResultSet.h"
#include "Shared/ArrowUtil.h"
#include "Shared/measure.h"
+ Include dependency graph for ArrowForeignStorage.cpp:

Go to the source code of this file.

Classes

struct  Frag
 
struct  ArrowFragment
 
class  ArrowForeignStorageBase
 
class  ArrowForeignStorage
 
class  ArrowCsvForeignStorage
 

Functions

std::vector< FragcalculateFragmentsOffsets (const arrow::ChunkedArray &array, size_t maxFragRows)
 
static SQLTypeInfo getOmnisciType (const arrow::DataType &type)
 
void setArrowTable (std::string name, std::shared_ptr< arrow::Table > table)
 
void releaseArrowTable (std::string name)
 
void registerArrowForeignStorage (std::shared_ptr< ForeignStorageInterface > fsi)
 
static std::shared_ptr
< arrow::DataType > 
getArrowImportType (const SQLTypeInfo type)
 
void registerArrowCsvForeignStorage (std::shared_ptr< ForeignStorageInterface > fsi)
 

Function Documentation

std::vector<Frag> calculateFragmentsOffsets ( const arrow::ChunkedArray &  array,
size_t  maxFragRows 
)

Definition at line 284 of file ArrowForeignStorage.cpp.

References i.

Referenced by ArrowForeignStorageBase::parseArrowTable().

285  {
286  std::vector<Frag> fragments;
287  size_t sz = 0;
288  size_t offset = 0;
289  fragments.push_back({0, 0, 0, 0});
290  size_t num_chunks = (size_t)array.num_chunks();
291  for (size_t i = 0; i < num_chunks;) {
292  auto& chunk = *array.chunk(i);
293  auto& frag = *fragments.rbegin();
294  if (maxFragRows - sz > chunk.length() - offset) {
295  sz += chunk.length() - offset;
296  if (i == num_chunks - 1) {
297  fragments.rbegin()->last_chunk = num_chunks - 1;
298  fragments.rbegin()->last_chunk_size =
299  array.chunk((int)num_chunks - 1)->length() - offset;
300  }
301  offset = 0;
302  i++;
303  } else {
304  frag.last_chunk = i;
305  frag.last_chunk_size = maxFragRows - sz;
306  offset += maxFragRows - sz;
307  sz = 0;
308  fragments.push_back({i, offset, 0, 0});
309  }
310  }
311  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
312  fragments.rbegin()->last_chunk_size == 0) {
313  // remove empty fragment at the end if any
314  fragments.pop_back();
315  }
316  return fragments;
317 }

+ Here is the caller graph for this function:

static std::shared_ptr<arrow::DataType> getArrowImportType ( const SQLTypeInfo  type)
static

Definition at line 894 of file ArrowForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_precision(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), IS_INTEGER, kARRAY, kBOOLEAN, kCHAR, kDATE, kDECIMAL, kDOUBLE, kFLOAT, kINTERVAL_DAY_TIME, kINTERVAL_YEAR_MONTH, kNUMERIC, kTEXT, kTIME, kTIMESTAMP, kVARCHAR, and to_string().

Referenced by ArrowCsvForeignStorage::registerTable().

894  {
895  using namespace arrow;
896  auto ktype = type.get_type();
897  if (IS_INTEGER(ktype)) {
898  switch (type.get_size()) {
899  case 1:
900  return int8();
901  case 2:
902  return int16();
903  case 4:
904  return int32();
905  case 8:
906  return int64();
907  default:
908  CHECK(false);
909  }
910  }
911  switch (ktype) {
912  case kBOOLEAN:
913  return arrow::boolean();
914  case kFLOAT:
915  return float32();
916  case kDOUBLE:
917  return float64();
918  case kCHAR:
919  case kVARCHAR:
920  case kTEXT:
921  return utf8();
922  case kDECIMAL:
923  case kNUMERIC:
924  return decimal(type.get_precision(), type.get_scale());
925  case kTIME:
926  return time32(TimeUnit::SECOND);
927  case kDATE:
928 #ifdef HAVE_CUDA
929  return arrow::date64();
930 #else
931  return arrow::date32();
932 #endif
933  case kTIMESTAMP:
934  switch (type.get_precision()) {
935  case 0:
936  return timestamp(TimeUnit::SECOND);
937  case 3:
938  return timestamp(TimeUnit::MILLI);
939  case 6:
940  return timestamp(TimeUnit::MICRO);
941  case 9:
942  return timestamp(TimeUnit::NANO);
943  default:
944  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
945  std::to_string(type.get_precision()));
946  }
947  case kARRAY:
948  case kINTERVAL_DAY_TIME:
950  default:
951  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
952  }
953  return nullptr;
954 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:339
Definition: sqltypes.h:49
HOST DEVICE int get_scale() const
Definition: sqltypes.h:334
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
std::string to_string(char const *&&v)
int get_precision() const
Definition: sqltypes.h:332
Definition: sqltypes.h:52
Definition: sqltypes.h:53
std::string get_type_name() const
Definition: sqltypes.h:432
#define IS_INTEGER(T)
Definition: sqltypes.h:245
Definition: sqltypes.h:41
#define CHECK(condition)
Definition: Logger.h:209

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

static SQLTypeInfo getOmnisciType ( const arrow::DataType &  type)
static

Definition at line 763 of file ArrowForeignStorage.cpp.

References DECIMAL, DOUBLE, FLOAT, kBIGINT, kBOOLEAN, kDATE, kDECIMAL, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, STRING, TIMESTAMP, and run_benchmark_import::type.

Referenced by ArrowForeignStorage::prepareTable().

763  {
764  using namespace arrow;
765  switch (type.id()) {
766  case Type::INT8:
767  return SQLTypeInfo(kTINYINT, false);
768  case Type::INT16:
769  return SQLTypeInfo(kSMALLINT, false);
770  case Type::INT32:
771  return SQLTypeInfo(kINT, false);
772  case Type::INT64:
773  return SQLTypeInfo(kBIGINT, false);
774  case Type::BOOL:
775  return SQLTypeInfo(kBOOLEAN, false);
776  case Type::FLOAT:
777  return SQLTypeInfo(kFLOAT, false);
778  case Type::DATE32:
779  case Type::DATE64:
780  return SQLTypeInfo(kDATE, false);
781  case Type::DOUBLE:
782  return SQLTypeInfo(kDOUBLE, false);
783  // uncomment when arrow 2.0 will be released and modin support for dictionary types
784  // in read_csv would be implemented
785 
786  // case Type::DICTIONARY: {
787  // auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
788  // // this is needed because createTable forces type.size to be equal to
789  // // comp_param / 8, no matter what type.size you set here
790  // type.set_comp_param(sizeof(uint32_t) * 8);
791  // return type;
792  // }
793  // case Type::STRING:
794  // return SQLTypeInfo(kTEXT, false, kENCODING_NONE);
795 
796  case Type::STRING: {
797  auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
798  // this is needed because createTable forces type.size to be equal to
799  // comp_param / 8, no matter what type.size you set here
800  type.set_comp_param(sizeof(uint32_t) * 8);
801  return type;
802  }
803  case Type::DECIMAL: {
804  const auto& decimal_type = static_cast<const arrow::DecimalType&>(type);
805  return SQLTypeInfo(kDECIMAL, decimal_type.precision(), decimal_type.scale(), false);
806  }
807  case Type::TIME32:
808  return SQLTypeInfo(kTIME, false);
809  case Type::TIMESTAMP:
810  switch (static_cast<const arrow::TimestampType&>(type).unit()) {
811  case TimeUnit::SECOND:
812  return SQLTypeInfo(kTIMESTAMP, 0, 0);
813  case TimeUnit::MILLI:
814  return SQLTypeInfo(kTIMESTAMP, 3, 0);
815  case TimeUnit::MICRO:
816  return SQLTypeInfo(kTIMESTAMP, 6, 0);
817  case TimeUnit::NANO:
818  return SQLTypeInfo(kTIMESTAMP, 9, 0);
819  }
820  default:
821  throw std::runtime_error(type.ToString() + " is not yet supported.");
822  }
823 }
Definition: sqltypes.h:49
#define DOUBLE
Definition: sqltypes.h:52
Definition: sqltypes.h:53
#define TIMESTAMP
#define STRING
#define DECIMAL
Definition: sqltypes.h:45
#define FLOAT

+ Here is the caller graph for this function:

void registerArrowCsvForeignStorage ( std::shared_ptr< ForeignStorageInterface fsi)

Definition at line 1033 of file ArrowForeignStorage.cpp.

Referenced by PersistentStorageMgr::PersistentStorageMgr().

1033  {
1034  fsi->registerPersistentStorageInterface(std::make_unique<ArrowCsvForeignStorage>());
1035 }

+ Here is the caller graph for this function:

void registerArrowForeignStorage ( std::shared_ptr< ForeignStorageInterface fsi)

Definition at line 863 of file ArrowForeignStorage.cpp.

Referenced by PersistentStorageMgr::PersistentStorageMgr().

863  {
864  fsi->registerPersistentStorageInterface(std::make_unique<ArrowForeignStorage>());
865 }

+ Here is the caller graph for this function:

void releaseArrowTable ( std::string  name)

Definition at line 859 of file ArrowForeignStorage.cpp.

References ArrowForeignStorage::tables.

Referenced by EmbeddedDatabase::DBEngineImpl::importArrowTable().

859  {
861 }
string name
Definition: setup.in.py:72
static std::map< std::string, std::shared_ptr< arrow::Table > > tables

+ Here is the caller graph for this function:

void setArrowTable ( std::string  name,
std::shared_ptr< arrow::Table >  table 
)

Definition at line 855 of file ArrowForeignStorage.cpp.

References setup::name, and ArrowForeignStorage::tables.

Referenced by EmbeddedDatabase::DBEngineImpl::importArrowTable().

855  {
857 }
string name
Definition: setup.in.py:72
static std::map< std::string, std::shared_ptr< arrow::Table > > tables

+ Here is the caller graph for this function: