OmniSciDB  fe05a0c208
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ArrowForeignStorage.cpp File Reference
#include "ArrowForeignStorage.h"
#include <arrow/api.h>
#include <arrow/csv/reader.h>
#include <arrow/io/file.h>
#include <arrow/util/decimal.h>
#include <tbb/parallel_for.h>
#include <tbb/task_group.h>
#include <array>
#include <future>
#include <vector>
#include "Catalog/DataframeTableDescriptor.h"
#include "DataMgr/ForeignStorage/ForeignStorageInterface.h"
#include "DataMgr/StringNoneEncoder.h"
#include "Logger/Logger.h"
#include "QueryEngine/ArrowResultSet.h"
#include "Shared/ArrowUtil.h"
#include "Shared/measure.h"
+ Include dependency graph for ArrowForeignStorage.cpp:

Go to the source code of this file.

Classes

struct  Frag
 
struct  ArrowFragment
 
class  ArrowForeignStorageBase
 
class  ArrowForeignStorage
 
class  ArrowCsvForeignStorage
 

Functions

std::vector< FragcalculateFragmentsOffsets (const arrow::ChunkedArray &array, size_t maxFragRows)
 
static SQLTypeInfo getOmnisciType (const arrow::DataType &type)
 
void setArrowTable (std::string name, std::shared_ptr< arrow::Table > table)
 
void releaseArrowTable (std::string name)
 
void registerArrowForeignStorage (std::shared_ptr< ForeignStorageInterface > fsi)
 
static std::shared_ptr
< arrow::DataType > 
getArrowImportType (const SQLTypeInfo type)
 
void registerArrowCsvForeignStorage (std::shared_ptr< ForeignStorageInterface > fsi)
 

Function Documentation

std::vector<Frag> calculateFragmentsOffsets ( const arrow::ChunkedArray &  array,
size_t  maxFragRows 
)

Definition at line 284 of file ArrowForeignStorage.cpp.

References i.

Referenced by ArrowForeignStorageBase::parseArrowTable().

285  {
286  std::vector<Frag> fragments;
287  size_t sz = 0;
288  size_t offset = 0;
289  fragments.push_back({0, 0, 0, 0});
290  size_t num_chunks = (size_t)array.num_chunks();
291  for (size_t i = 0; i < num_chunks;) {
292  auto& chunk = *array.chunk(i);
293  auto& frag = *fragments.rbegin();
294  if (maxFragRows - sz > chunk.length() - offset) {
295  sz += chunk.length() - offset;
296  if (i == num_chunks - 1) {
297  fragments.rbegin()->last_chunk = num_chunks - 1;
298  fragments.rbegin()->last_chunk_size =
299  array.chunk((int)num_chunks - 1)->length() - offset;
300  }
301  offset = 0;
302  i++;
303  } else {
304  frag.last_chunk = i;
305  frag.last_chunk_size = maxFragRows - sz;
306  offset += maxFragRows - sz;
307  sz = 0;
308  fragments.push_back({i, offset, 0, 0});
309  }
310  }
311  if (fragments.rbegin()->first_chunk == fragments.rbegin()->first_chunk &&
312  fragments.rbegin()->last_chunk_size == 0) {
313  // remove empty fragment at the end if any
314  fragments.pop_back();
315  }
316  return fragments;
317 }

+ Here is the caller graph for this function:

static std::shared_ptr<arrow::DataType> getArrowImportType ( const SQLTypeInfo  type)
static

Definition at line 891 of file ArrowForeignStorage.cpp.

References CHECK, SQLTypeInfo::get_precision(), SQLTypeInfo::get_scale(), SQLTypeInfo::get_size(), SQLTypeInfo::get_type(), SQLTypeInfo::get_type_name(), IS_INTEGER, kARRAY, kBOOLEAN, kCHAR, kDECIMAL, kDOUBLE, kFLOAT, kINTERVAL_DAY_TIME, kINTERVAL_YEAR_MONTH, kNUMERIC, kTEXT, kTIME, kTIMESTAMP, kVARCHAR, and to_string().

Referenced by ArrowCsvForeignStorage::registerTable().

891  {
892  using namespace arrow;
893  auto ktype = type.get_type();
894  if (IS_INTEGER(ktype)) {
895  switch (type.get_size()) {
896  case 1:
897  return int8();
898  case 2:
899  return int16();
900  case 4:
901  return int32();
902  case 8:
903  return int64();
904  default:
905  CHECK(false);
906  }
907  }
908  switch (ktype) {
909  case kBOOLEAN:
910  return arrow::boolean();
911  case kFLOAT:
912  return float32();
913  case kDOUBLE:
914  return float64();
915  case kCHAR:
916  case kVARCHAR:
917  case kTEXT:
918  return utf8();
919  case kDECIMAL:
920  case kNUMERIC:
921  return decimal(type.get_precision(), type.get_scale());
922  case kTIME:
923  return time32(TimeUnit::SECOND);
924  // case kDATE:
925  // TODO(wamsi) : Remove date64() once date32() support is added in cuDF. date32()
926  // Currently support for date32() is missing in cuDF.Hence, if client requests for
927  // date on GPU, return date64() for the time being, till support is added.
928  // return device_type_ == ExecutorDeviceType::GPU ? date64() : date32();
929  case kTIMESTAMP:
930  switch (type.get_precision()) {
931  case 0:
932  return timestamp(TimeUnit::SECOND);
933  case 3:
934  return timestamp(TimeUnit::MILLI);
935  case 6:
936  return timestamp(TimeUnit::MICRO);
937  case 9:
938  return timestamp(TimeUnit::NANO);
939  default:
940  throw std::runtime_error("Unsupported timestamp precision for Arrow: " +
941  std::to_string(type.get_precision()));
942  }
943  case kARRAY:
944  case kINTERVAL_DAY_TIME:
946  default:
947  throw std::runtime_error(type.get_type_name() + " is not supported in Arrow.");
948  }
949  return nullptr;
950 }
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
Definition: sqltypes.h:48
HOST DEVICE int get_scale() const
Definition: sqltypes.h:319
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:314
std::string to_string(char const *&&v)
int get_precision() const
Definition: sqltypes.h:317
Definition: sqltypes.h:51
std::string get_type_name() const
Definition: sqltypes.h:417
#define IS_INTEGER(T)
Definition: sqltypes.h:239
Definition: sqltypes.h:40
#define CHECK(condition)
Definition: Logger.h:203

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

static SQLTypeInfo getOmnisciType ( const arrow::DataType &  type)
static

Definition at line 763 of file ArrowForeignStorage.cpp.

References DECIMAL, DOUBLE, FLOAT, kBIGINT, kBOOLEAN, kDECIMAL, kDOUBLE, kENCODING_DICT, kFLOAT, kINT, kSMALLINT, kTEXT, kTIME, kTIMESTAMP, kTINYINT, STRING, TIMESTAMP, and run_benchmark_import::type.

Referenced by ArrowForeignStorage::prepareTable().

763  {
764  using namespace arrow;
765  switch (type.id()) {
766  case Type::INT8:
767  return SQLTypeInfo(kTINYINT, false);
768  case Type::INT16:
769  return SQLTypeInfo(kSMALLINT, false);
770  case Type::INT32:
771  return SQLTypeInfo(kINT, false);
772  case Type::INT64:
773  return SQLTypeInfo(kBIGINT, false);
774  case Type::BOOL:
775  return SQLTypeInfo(kBOOLEAN, false);
776  case Type::FLOAT:
777  return SQLTypeInfo(kFLOAT, false);
778  case Type::DOUBLE:
779  return SQLTypeInfo(kDOUBLE, false);
780  // uncomment when arrow 2.0 will be released and modin support for dictionary types
781  // in read_csv would be implemented
782 
783  // case Type::DICTIONARY: {
784  // auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
785  // // this is needed because createTable forces type.size to be equal to
786  // // comp_param / 8, no matter what type.size you set here
787  // type.set_comp_param(sizeof(uint32_t) * 8);
788  // return type;
789  // }
790  // case Type::STRING:
791  // return SQLTypeInfo(kTEXT, false, kENCODING_NONE);
792 
793  case Type::STRING: {
794  auto type = SQLTypeInfo(kTEXT, false, kENCODING_DICT);
795  // this is needed because createTable forces type.size to be equal to
796  // comp_param / 8, no matter what type.size you set here
797  type.set_comp_param(sizeof(uint32_t) * 8);
798  return type;
799  }
800  case Type::DECIMAL: {
801  const auto& decimal_type = static_cast<const arrow::DecimalType&>(type);
802  return SQLTypeInfo(kDECIMAL, decimal_type.precision(), decimal_type.scale(), false);
803  }
804  case Type::TIME32:
805  return SQLTypeInfo(kTIME, false);
806  case Type::TIMESTAMP:
807  switch (static_cast<const arrow::TimestampType&>(type).unit()) {
808  case TimeUnit::SECOND:
809  return SQLTypeInfo(kTIMESTAMP, 0, 0);
810  case TimeUnit::MILLI:
811  return SQLTypeInfo(kTIMESTAMP, 3, 0);
812  case TimeUnit::MICRO:
813  return SQLTypeInfo(kTIMESTAMP, 6, 0);
814  case TimeUnit::NANO:
815  return SQLTypeInfo(kTIMESTAMP, 9, 0);
816  }
817  default:
818  throw std::runtime_error(type.ToString() + " is not yet supported.");
819  }
820 }
Definition: sqltypes.h:48
#define DOUBLE
Definition: sqltypes.h:51
#define TIMESTAMP
#define STRING
#define DECIMAL
Definition: sqltypes.h:44
#define FLOAT

+ Here is the caller graph for this function:

void registerArrowCsvForeignStorage ( std::shared_ptr< ForeignStorageInterface fsi)

Definition at line 1024 of file ArrowForeignStorage.cpp.

Referenced by PersistentStorageMgr::PersistentStorageMgr().

1024  {
1025  fsi->registerPersistentStorageInterface(std::make_unique<ArrowCsvForeignStorage>());
1026 }

+ Here is the caller graph for this function:

void registerArrowForeignStorage ( std::shared_ptr< ForeignStorageInterface fsi)

Definition at line 860 of file ArrowForeignStorage.cpp.

Referenced by PersistentStorageMgr::PersistentStorageMgr().

860  {
861  fsi->registerPersistentStorageInterface(std::make_unique<ArrowForeignStorage>());
862 }

+ Here is the caller graph for this function:

void releaseArrowTable ( std::string  name)

Definition at line 856 of file ArrowForeignStorage.cpp.

References ArrowForeignStorage::tables.

Referenced by EmbeddedDatabase::DBEngineImpl::importArrowTable().

856  {
858 }
string name
Definition: setup.in.py:72
static std::map< std::string, std::shared_ptr< arrow::Table > > tables

+ Here is the caller graph for this function:

void setArrowTable ( std::string  name,
std::shared_ptr< arrow::Table >  table 
)

Definition at line 852 of file ArrowForeignStorage.cpp.

References setup::name, test_readcsv::table, and ArrowForeignStorage::tables.

Referenced by EmbeddedDatabase::DBEngineImpl::importArrowTable().

852  {
854 }
string name
Definition: setup.in.py:72
static std::map< std::string, std::shared_ptr< arrow::Table > > tables

+ Here is the caller graph for this function: