OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::TextFileBufferParser Class Referenceabstract

#include <TextFileBufferParser.h>

+ Inheritance diagram for foreign_storage::TextFileBufferParser:

Public Member Functions

virtual ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false, bool skip_dict_encoding=false) const =0
 
virtual import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const =0
 
virtual size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
 
virtual void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const =0
 

Static Public Member Functions

static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, const bool skip_dict_encoding=false)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Static Public Attributes

static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Static Private Member Functions

static void processInvalidGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 

Detailed Description

Definition at line 94 of file TextFileBufferParser.h.

Member Function Documentation

std::map< int, DataBlockPtr > foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks ( const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
const bool  skip_dict_encoding = false 
)
static

Definition at line 59 of file TextFileBufferParser.cpp.

References DataBlockPtr::arraysPtr, threading_serial::async(), CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), foreign_storage::RegexFileBufferParser::parseBuffer(), and foreign_storage::InternalSystemDataWrapper::populateChunkBuffers().

61  {
62  std::map<int, DataBlockPtr> result;
63  std::vector<std::pair<const size_t, std::future<int8_t*>>>
64  encoded_data_block_ptrs_futures;
65  // make all async calls to string dictionary here and then continue execution
66  for (const auto& import_buffer : import_buffers) {
67  if (import_buffer == nullptr) {
68  continue;
69  }
70  DataBlockPtr p;
71  if (import_buffer->getTypeInfo().is_number() ||
72  import_buffer->getTypeInfo().is_time() ||
73  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
74  p.numbersPtr = import_buffer->getAsBytes();
75  } else if (import_buffer->getTypeInfo().is_string()) {
76  auto string_payload_ptr = import_buffer->getStringBuffer();
77  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
78  p.stringsPtr = string_payload_ptr;
79  } else {
80  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
81  p.numbersPtr = nullptr;
82 
83  if (!skip_dict_encoding) {
84  auto column_id = import_buffer->getColumnDesc()->columnId;
85  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
86  column_id,
87  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
88  import_buffer->addDictEncodedString(*string_payload_ptr);
89  return import_buffer->getStringDictBuffer();
90  })));
91  }
92  }
93  } else if (import_buffer->getTypeInfo().is_geometry()) {
94  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
95  p.stringsPtr = geo_payload_ptr;
96  } else {
97  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
98  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
99  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
100  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
101  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
102  } else {
103  p.arraysPtr = import_buffer->getArrayBuffer();
104  }
105  }
106  result[import_buffer->getColumnDesc()->columnId] = p;
107  }
108 
109  if (!skip_dict_encoding) {
110  // wait for the async requests we made for string dictionary
111  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
112  encoded_ptr_future.second.wait();
113  }
114  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
115  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
116  }
117  }
118  return result;
119 }
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:234
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:235
future< Result > async(Fn &&fn, Args &&...args)
#define IS_STRING(T)
Definition: sqltypes.h:309
#define CHECK(condition)
Definition: Logger.h:291
int8_t * numbersPtr
Definition: sqltypes.h:233

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData ( const std::list< const ColumnDescriptor * > &  columns,
std::list< const ColumnDescriptor * >::iterator &  cd_it,
const size_t  col_idx,
ParseBufferRequest request 
)
static

Fill the current row of the request with invalid (null) data as row will be marked as rejected

Definition at line 171 of file TextFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::import_buffers, and processInvalidGeoColumn().

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), and foreign_storage::RegexFileBufferParser::parseBuffer().

175  {
176  size_t col_idx = starting_col_idx;
177 
178  for (; cd_it != columns.end(); cd_it++) {
179  auto cd = *cd_it;
180  const auto& col_ti = cd->columnType;
181  if (col_ti.is_geometry()) {
182  if (request.import_buffers[col_idx] != nullptr) {
183  processInvalidGeoColumn(request.import_buffers,
184  col_idx,
185  request.copy_params,
186  cd,
187  request.getCatalog());
188  } else {
189  ++col_idx;
190  col_idx += col_ti.get_physical_cols();
191  }
192  // skip remaining physical columns
193  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
194  ++cd_it;
195  }
196  } else {
197  if (request.import_buffers[col_idx] != nullptr) {
198  request.import_buffers[col_idx]->add_value(cd,
199  {},
200  true,
201  request.copy_params,
202  /*check_not_null=*/false);
203  }
204  ++col_idx;
205  }
206  }
207 }
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual size_t foreign_storage::TextFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FileReader file_reader 
) const
pure virtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implemented in foreign_storage::RegexFileBufferParser, and foreign_storage::CsvFileBufferParser.

Referenced by foreign_storage::dispatch_scan_requests().

+ Here is the caller graph for this function:

bool foreign_storage::TextFileBufferParser::isCoordinateScalar ( const std::string_view  datum)
static

Definition at line 121 of file TextFileBufferParser.cpp.

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), processGeoColumn(), and foreign_storage::anonymous_namespace{TextFileBufferParser.cpp}::set_coordinates_from_separate_lon_lat_columns().

121  {
122  // field looks like a scalar numeric value (and not a hex blob)
123  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
124  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
125 }

+ Here is the caller graph for this function:

bool foreign_storage::TextFileBufferParser::isNullDatum ( const std::string_view  datum,
const ColumnDescriptor column,
const std::string &  null_indicator 
)
static

Definition at line 332 of file TextFileBufferParser.cpp.

References ColumnDescriptor::columnName, ColumnDescriptor::columnType, SQLTypeInfo::get_notnull(), is_null(), ImportHelpers::is_null_datum(), and SQLTypeInfo::is_string().

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), and foreign_storage::RegexFileBufferParser::parseBuffer().

334  {
335  bool is_null = ImportHelpers::is_null_datum(datum, null_indicator);
336 
337  // Treating empty as NULL
338  if (!column->columnType.is_string() && datum.empty()) {
339  is_null = true;
340  }
341 
342  if (is_null && column->columnType.get_notnull()) {
343  throw std::runtime_error("NULL value provided for column (" + column->columnName +
344  ") with NOT NULL constraint.");
345  }
346  return is_null;
347 }
bool is_null_datum(const DatumStringType &datum, const std::string &null_indicator)
CONSTEXPR DEVICE bool is_null(const T &value)
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:559
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:398
std::string columnName

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual ParseBufferResult foreign_storage::TextFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false,
bool  skip_dict_encoding = false 
) const
pure virtual

Parses a given file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Parameters
convert_data_blocks- convert import buffers to data blocks
columns_are_pre_filtered- file buffer passed into parse_buffer only has the necessary columns that are being requested, not all columns.
skip_dict_encoding- skip dictionary encoding for encoded strings; the encoding will be required to happen later in processing

Implemented in foreign_storage::RegexFileBufferParser, and foreign_storage::CsvFileBufferParser.

Referenced by foreign_storage::parse_file_regions(), foreign_storage::populate_chunks(), and foreign_storage::scan_metadata().

+ Here is the caller graph for this function:

void foreign_storage::TextFileBufferParser::processGeoColumn ( std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
const import_export::CopyParams copy_params,
std::list< const ColumnDescriptor * >::iterator &  cd_it,
std::vector< std::string_view > &  row,
size_t &  import_idx,
bool  is_null,
size_t  first_row_index,
size_t  row_index_plus_one,
std::shared_ptr< Catalog_Namespace::Catalog catalog 
)
static

Definition at line 244 of file TextFileBufferParser.cpp.

References CHECK, geo_promoted_type_match(), import_export::CopyParams::geo_validate_geometry, Geospatial::GeoTypesFactory::getGeoColumns(), Geospatial::GeoTypesFactory::getNullGeoColumns(), IS_GEO, isCoordinateScalar(), kPOINT, import_export::CopyParams::lonlat, import_export::CopyParams::null_str, foreign_storage::anonymous_namespace{TextFileBufferParser.cpp}::set_coordinates_from_separate_lon_lat_columns(), import_export::Importer::set_geo_physical_import_buffer(), import_export::CopyParams::source_srid, and to_string().

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), and foreign_storage::RegexFileBufferParser::parseBuffer().

254  {
255  auto cd = *cd_it;
256  auto col_ti = cd->columnType;
257  SQLTypes col_type = col_ti.get_type();
258  CHECK(IS_GEO(col_type));
259 
260  auto starting_col_idx = col_idx;
261 
262  auto const& geo_string = row[import_idx];
263  ++import_idx;
264  ++col_idx;
265 
266  std::vector<double> coords;
267  std::vector<double> bounds;
268  std::vector<int> ring_sizes;
269  std::vector<int> poly_rings;
270 
271  // prepare to transform from another SRID
272  SQLTypeInfo import_ti{col_ti};
273  if (import_ti.get_output_srid() == 4326) {
274  auto srid0 = copy_params.source_srid;
275  if (srid0 > 0) {
276  // srid0 -> 4326 transform is requested on import
277  import_ti.set_input_srid(srid0);
278  }
279  }
280 
281  if (!is_null && col_type == kPOINT && isCoordinateScalar(geo_string)) {
283  geo_string, row[import_idx], import_ti, coords, copy_params.lonlat)) {
284  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
285  cd->columnName);
286  }
287  ++import_idx;
288  } else {
289  if (is_null || geo_string.empty() || geo_string == "NULL") {
291  import_ti, coords, bounds, ring_sizes, poly_rings);
292  is_null = true;
293  } else {
294  // extract geometry directly from WKT
296  std::string(geo_string),
297  import_ti,
298  coords,
299  bounds,
300  ring_sizes,
301  poly_rings,
302  copy_params.geo_validate_geometry)) {
303  std::string msg = "Failed to extract valid geometry from row " +
304  std::to_string(first_row_index + row_index_plus_one) +
305  " for column " + cd->columnName;
306  throw std::runtime_error(msg);
307  }
308 
309  // validate types
310  if (!geo_promoted_type_match(import_ti.get_type(), col_type)) {
311  throw std::runtime_error("Imported geometry doesn't match the type of column " +
312  cd->columnName);
313  }
314  }
315  }
316 
317  // allowed to be null?
318  if (is_null && col_ti.get_notnull()) {
319  throw std::runtime_error("NULL value provided for column (" + cd->columnName +
320  ") with NOT NULL constraint.");
321  }
322 
323  // import extracted geo
325  *catalog, cd, import_buffers, col_idx, coords, bounds, ring_sizes, poly_rings);
326 
327  // store null string in the base column
328  import_buffers[starting_col_idx]->add_value(
329  cd, copy_params.null_str, true, copy_params);
330 }
bool geo_promoted_type_match(const SQLTypes a, const SQLTypes b)
Definition: sqltypes.h:2029
SQLTypes
Definition: sqltypes.h:65
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings)
Definition: Types.cpp:1342
std::string to_string(char const *&&v)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
Definition: Importer.cpp:1636
CONSTEXPR DEVICE bool is_null(const T &value)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool validate_with_geos_if_available)
Definition: Types.cpp:1121
#define CHECK(condition)
Definition: Logger.h:291
static bool isCoordinateScalar(const std::string_view datum)
#define IS_GEO(T)
Definition: sqltypes.h:310

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::TextFileBufferParser::processInvalidGeoColumn ( std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
const import_export::CopyParams copy_params,
const ColumnDescriptor cd,
std::shared_ptr< Catalog_Namespace::Catalog catalog 
)
staticprivate

Definition at line 209 of file TextFileBufferParser.cpp.

References CHECK, ColumnDescriptor::columnType, SQLTypeInfo::get_type(), Geospatial::GeoTypesFactory::getNullGeoColumns(), IS_GEO, import_export::CopyParams::null_str, and import_export::Importer::set_geo_physical_import_buffer().

Referenced by fillRejectedRowWithInvalidData().

214  {
215  auto col_ti = cd->columnType;
216  SQLTypes col_type = col_ti.get_type();
217  CHECK(IS_GEO(col_type));
218 
219  // store null string in the base column
220  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
221  ++col_idx;
222 
223  std::vector<double> coords;
224  std::vector<double> bounds;
225  std::vector<int> ring_sizes;
226  std::vector<int> poly_rings;
227 
228  SQLTypeInfo import_ti{col_ti};
230  import_ti, coords, bounds, ring_sizes, poly_rings);
231 
232  // import extracted geo
234  cd,
235  import_buffers,
236  col_idx,
237  coords,
238  bounds,
239  ring_sizes,
240  poly_rings,
241  /*force_null=*/true);
242 }
SQLTypes
Definition: sqltypes.h:65
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings)
Definition: Types.cpp:1342
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:391
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool force_null=false)
Definition: Importer.cpp:1636
#define CHECK(condition)
Definition: Logger.h:291
SQLTypeInfo columnType
#define IS_GEO(T)
Definition: sqltypes.h:310

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual import_export::CopyParams foreign_storage::TextFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
pure virtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implemented in foreign_storage::RegexFileBufferParser, and foreign_storage::CsvFileBufferParser.

Referenced by foreign_storage::AbstractTextFileDataWrapper::iterativeFileScan(), foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), foreign_storage::AbstractTextFileDataWrapper::populateChunks(), and foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals().

+ Here is the caller graph for this function:

virtual void foreign_storage::TextFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
pure virtual

Performs basic validation of files to be parsed.

Implemented in foreign_storage::CsvFileBufferParser, and foreign_storage::RegexFileBufferParser.

Referenced by foreign_storage::anonymous_namespace{AbstractTextFileDataWrapper.cpp}::initialize_non_append_mode_scan().

+ Here is the caller graph for this function:

Member Data Documentation

const std::string foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY = "BUFFER_SIZE"
inlinestatic

The documentation for this class was generated from the following files: