OmniSciDB  085a039ca4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
foreign_storage::TextFileBufferParser Class Referenceabstract

#include <TextFileBufferParser.h>

+ Inheritance diagram for foreign_storage::TextFileBufferParser:

Public Member Functions

virtual ParseBufferResult parseBuffer (ParseBufferRequest &request, bool convert_data_blocks, bool columns_are_pre_filtered=false) const =0
 
virtual import_export::CopyParams validateAndGetCopyParams (const ForeignTable *foreign_table) const =0
 
virtual size_t findRowEndPosition (size_t &alloc_size, std::unique_ptr< char[]> &buffer, size_t &buffer_size, const import_export::CopyParams &copy_params, const size_t buffer_first_row_index, unsigned int &num_rows_in_buffer, FileReader *file_reader) const =0
 
virtual void validateFiles (const FileReader *file_reader, const ForeignTable *foreign_table) const =0
 

Static Public Member Functions

static std::map< int,
DataBlockPtr
convertImportBuffersToDataBlocks (const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers)
 
static bool isCoordinateScalar (const std::string_view datum)
 
static void processGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, std::list< const ColumnDescriptor * >::iterator &cd_it, std::vector< std::string_view > &row, size_t &import_idx, bool is_null, size_t first_row_index, size_t row_index_plus_one, std::shared_ptr< Catalog_Namespace::Catalog > catalog, const RenderGroupAnalyzerMap *render_group_analyzer_map)
 
static void fillRejectedRowWithInvalidData (const std::list< const ColumnDescriptor * > &columns, std::list< const ColumnDescriptor * >::iterator &cd_it, const size_t col_idx, ParseBufferRequest &request)
 
static bool isNullDatum (const std::string_view datum, const ColumnDescriptor *column, const std::string &null_indicator)
 

Static Public Attributes

static const std::string THREADS_KEY = "THREADS"
 
static const std::string BUFFER_SIZE_KEY = "BUFFER_SIZE"
 

Static Private Member Functions

static void processInvalidGeoColumn (std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)
 

Detailed Description

Definition at line 93 of file TextFileBufferParser.h.

Member Function Documentation

std::map< int, DataBlockPtr > foreign_storage::TextFileBufferParser::convertImportBuffersToDataBlocks ( const std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers)
static

Definition at line 61 of file TextFileBufferParser.cpp.

References DataBlockPtr::arraysPtr, threading_serial::async(), CHECK, CHECK_EQ, IS_STRING, kARRAY, kBOOLEAN, kENCODING_DICT, kENCODING_NONE, DataBlockPtr::numbersPtr, run_benchmark_import::result, and DataBlockPtr::stringsPtr.

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), foreign_storage::RegexFileBufferParser::parseBuffer(), and foreign_storage::InternalSystemDataWrapper::populateChunkBuffers().

63  {
64  std::map<int, DataBlockPtr> result;
65  std::vector<std::pair<const size_t, std::future<int8_t*>>>
66  encoded_data_block_ptrs_futures;
67  // make all async calls to string dictionary here and then continue execution
68  for (const auto& import_buffer : import_buffers) {
69  if (import_buffer == nullptr) {
70  continue;
71  }
72  DataBlockPtr p;
73  if (import_buffer->getTypeInfo().is_number() ||
74  import_buffer->getTypeInfo().is_time() ||
75  import_buffer->getTypeInfo().get_type() == kBOOLEAN) {
76  p.numbersPtr = import_buffer->getAsBytes();
77  } else if (import_buffer->getTypeInfo().is_string()) {
78  auto string_payload_ptr = import_buffer->getStringBuffer();
79  if (import_buffer->getTypeInfo().get_compression() == kENCODING_NONE) {
80  p.stringsPtr = string_payload_ptr;
81  } else {
82  CHECK_EQ(kENCODING_DICT, import_buffer->getTypeInfo().get_compression());
83  p.numbersPtr = nullptr;
84 
85  auto column_id = import_buffer->getColumnDesc()->columnId;
86  encoded_data_block_ptrs_futures.emplace_back(std::make_pair(
87  column_id,
88  std::async(std::launch::async, [&import_buffer, string_payload_ptr] {
89  import_buffer->addDictEncodedString(*string_payload_ptr);
90  return import_buffer->getStringDictBuffer();
91  })));
92  }
93  } else if (import_buffer->getTypeInfo().is_geometry()) {
94  auto geo_payload_ptr = import_buffer->getGeoStringBuffer();
95  p.stringsPtr = geo_payload_ptr;
96  } else {
97  CHECK(import_buffer->getTypeInfo().get_type() == kARRAY);
98  if (IS_STRING(import_buffer->getTypeInfo().get_subtype())) {
99  CHECK(import_buffer->getTypeInfo().get_compression() == kENCODING_DICT);
100  import_buffer->addDictEncodedStringArray(*import_buffer->getStringArrayBuffer());
101  p.arraysPtr = import_buffer->getStringArrayDictBuffer();
102  } else {
103  p.arraysPtr = import_buffer->getArrayBuffer();
104  }
105  }
106  result[import_buffer->getColumnDesc()->columnId] = p;
107  }
108 
109  // wait for the async requests we made for string dictionary
110  for (auto& encoded_ptr_future : encoded_data_block_ptrs_futures) {
111  result[encoded_ptr_future.first].numbersPtr = encoded_ptr_future.second.get();
112  }
113  return result;
114 }
#define CHECK_EQ(x, y)
Definition: Logger.h:231
std::vector< std::string > * stringsPtr
Definition: sqltypes.h:227
std::vector< ArrayDatum > * arraysPtr
Definition: sqltypes.h:228
future< Result > async(Fn &&fn, Args &&...args)
#define IS_STRING(T)
Definition: sqltypes.h:250
#define CHECK(condition)
Definition: Logger.h:223
int8_t * numbersPtr
Definition: sqltypes.h:226

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::TextFileBufferParser::fillRejectedRowWithInvalidData ( const std::list< const ColumnDescriptor * > &  columns,
std::list< const ColumnDescriptor * >::iterator &  cd_it,
const size_t  col_idx,
ParseBufferRequest request 
)
static

Fill the current row of the request with invalid (null) data as row will be marked as rejected

Definition at line 167 of file TextFileBufferParser.cpp.

References foreign_storage::ParseBufferRequest::copy_params, foreign_storage::ParseBufferRequest::getCatalog(), foreign_storage::ParseBufferRequest::import_buffers, and processInvalidGeoColumn().

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), and foreign_storage::RegexFileBufferParser::parseBuffer().

171  {
172  size_t col_idx = starting_col_idx;
173 
174  for (; cd_it != columns.end(); cd_it++) {
175  auto cd = *cd_it;
176  const auto& col_ti = cd->columnType;
177  if (col_ti.is_geometry()) {
178  if (request.import_buffers[col_idx] != nullptr) {
179  processInvalidGeoColumn(request.import_buffers,
180  col_idx,
181  request.copy_params,
182  cd,
183  request.getCatalog());
184  } else {
185  ++col_idx;
186  col_idx += col_ti.get_physical_cols();
187  }
188  // skip remaining physical columns
189  for (int i = 0; i < cd->columnType.get_physical_cols(); ++i) {
190  ++cd_it;
191  }
192  } else {
193  if (request.import_buffers[col_idx] != nullptr) {
194  request.import_buffers[col_idx]->add_value(cd,
195  {},
196  true,
197  request.copy_params,
198  /*check_not_null=*/false);
199  }
200  ++col_idx;
201  }
202  }
203 }
static void processInvalidGeoColumn(std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &import_buffers, size_t &col_idx, const import_export::CopyParams &copy_params, const ColumnDescriptor *cd, std::shared_ptr< Catalog_Namespace::Catalog > catalog)

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual size_t foreign_storage::TextFileBufferParser::findRowEndPosition ( size_t &  alloc_size,
std::unique_ptr< char[]> &  buffer,
size_t &  buffer_size,
const import_export::CopyParams copy_params,
const size_t  buffer_first_row_index,
unsigned int &  num_rows_in_buffer,
FileReader file_reader 
) const
pure virtual

Finds and returns the offset of the end of the last row in the given buffer. If the buffer does not contain at least one row, the buffer is extended with more content from the file until a row is read. An exception is thrown if the buffer is extended to a maximum threshold and at least one row has still not been read.

Implemented in foreign_storage::RegexFileBufferParser, and foreign_storage::CsvFileBufferParser.

Referenced by foreign_storage::dispatch_metadata_scan_requests().

+ Here is the caller graph for this function:

bool foreign_storage::TextFileBufferParser::isCoordinateScalar ( const std::string_view  datum)
static

Definition at line 116 of file TextFileBufferParser.cpp.

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), processGeoColumn(), and foreign_storage::anonymous_namespace{TextFileBufferParser.cpp}::set_coordinates_from_separate_lon_lat_columns().

116  {
117  // field looks like a scalar numeric value (and not a hex blob)
118  return datum.size() > 0 && (datum[0] == '.' || isdigit(datum[0]) || datum[0] == '-') &&
119  datum.find_first_of("ABCDEFabcdef") == std::string_view::npos;
120 }

+ Here is the caller graph for this function:

bool foreign_storage::TextFileBufferParser::isNullDatum ( const std::string_view  datum,
const ColumnDescriptor column,
const std::string &  null_indicator 
)
static

Definition at line 356 of file TextFileBufferParser.cpp.

References ColumnDescriptor::columnName, ColumnDescriptor::columnType, SQLTypeInfo::get_notnull(), is_null(), and SQLTypeInfo::is_string().

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), and foreign_storage::RegexFileBufferParser::parseBuffer().

358  {
359  bool is_null = (datum == null_indicator);
360 
361  // Treating empty as NULL
362  if (!column->columnType.is_string() && datum.empty()) {
363  is_null = true;
364  }
365 
366  if (is_null && column->columnType.get_notnull()) {
367  throw std::runtime_error("NULL value provided for column (" + column->columnName +
368  ") with NOT NULL constraint.");
369  }
370  return is_null;
371 }
CONSTEXPR DEVICE bool is_null(const T &value)
SQLTypeInfo columnType
bool is_string() const
Definition: sqltypes.h:510
HOST DEVICE bool get_notnull() const
Definition: sqltypes.h:336
std::string columnName

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual ParseBufferResult foreign_storage::TextFileBufferParser::parseBuffer ( ParseBufferRequest request,
bool  convert_data_blocks,
bool  columns_are_pre_filtered = false 
) const
pure virtual

Parses a given file buffer and returns data blocks for each column in the file along with metadata related to rows and row offsets within the buffer.

Parameters
convert_data_blocks- convert import buffers to data blocks
columns_are_pre_filtered- file buffer passed into parse_buffer only has the necessary columns that are being requested, not all columns.

Implemented in foreign_storage::RegexFileBufferParser, and foreign_storage::CsvFileBufferParser.

Referenced by foreign_storage::parse_file_regions(), and foreign_storage::scan_metadata().

+ Here is the caller graph for this function:

void foreign_storage::TextFileBufferParser::processGeoColumn ( std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
const import_export::CopyParams copy_params,
std::list< const ColumnDescriptor * >::iterator &  cd_it,
std::vector< std::string_view > &  row,
size_t &  import_idx,
bool  is_null,
size_t  first_row_index,
size_t  row_index_plus_one,
std::shared_ptr< Catalog_Namespace::Catalog catalog,
const RenderGroupAnalyzerMap render_group_analyzer_map 
)
static

Definition at line 242 of file TextFileBufferParser.cpp.

References CHECK, Geospatial::GeoTypesFactory::getGeoColumns(), Geospatial::GeoTypesFactory::getNullGeoColumns(), IS_GEO, IS_GEO_POLY, isCoordinateScalar(), kMULTIPOLYGON, kPOINT, kPOLYGON, import_export::CopyParams::lonlat, import_export::CopyParams::null_str, foreign_storage::anonymous_namespace{TextFileBufferParser.cpp}::PROMOTE_POLYGON_TO_MULTIPOLYGON, foreign_storage::anonymous_namespace{TextFileBufferParser.cpp}::set_coordinates_from_separate_lon_lat_columns(), import_export::Importer::set_geo_physical_import_buffer(), import_export::CopyParams::source_srid, and to_string().

Referenced by foreign_storage::CsvFileBufferParser::parseBuffer(), and foreign_storage::RegexFileBufferParser::parseBuffer().

253  {
254  auto cd = *cd_it;
255  auto col_ti = cd->columnType;
256  SQLTypes col_type = col_ti.get_type();
257  CHECK(IS_GEO(col_type));
258 
259  auto starting_col_idx = col_idx;
260 
261  auto const& geo_string = row[import_idx];
262  ++import_idx;
263  ++col_idx;
264 
265  std::vector<double> coords;
266  std::vector<double> bounds;
267  std::vector<int> ring_sizes;
268  std::vector<int> poly_rings;
269  int render_group = 0;
270 
271  // prepare to transform from another SRID
272  SQLTypeInfo import_ti{col_ti};
273  if (import_ti.get_output_srid() == 4326) {
274  auto srid0 = copy_params.source_srid;
275  if (srid0 > 0) {
276  // srid0 -> 4326 transform is requested on import
277  import_ti.set_input_srid(srid0);
278  }
279  }
280 
281  if (!is_null && col_type == kPOINT && isCoordinateScalar(geo_string)) {
283  geo_string, row[import_idx], import_ti, coords, copy_params.lonlat)) {
284  throw std::runtime_error("Cannot read lon/lat to insert into POINT column " +
285  cd->columnName);
286  }
287  ++import_idx;
288  } else {
289  if (is_null || geo_string.empty() || geo_string == "NULL") {
291  coords,
292  bounds,
293  ring_sizes,
294  poly_rings,
296  is_null = true;
297  } else {
298  // extract geometry directly from WKT
299  if (!Geospatial::GeoTypesFactory::getGeoColumns(std::string(geo_string),
300  import_ti,
301  coords,
302  bounds,
303  ring_sizes,
304  poly_rings,
306  std::string msg = "Failed to extract valid geometry from row " +
307  std::to_string(first_row_index + row_index_plus_one) +
308  " for column " + cd->columnName;
309  throw std::runtime_error(msg);
310  }
311 
312  // validate types
313  if (col_type != import_ti.get_type()) {
315  !(import_ti.get_type() == SQLTypes::kPOLYGON &&
316  col_type == SQLTypes::kMULTIPOLYGON)) {
317  throw std::runtime_error("Imported geometry doesn't match the type of column " +
318  cd->columnName);
319  }
320  }
321 
322  // get render group
323  if (IS_GEO_POLY(col_type) && render_group_analyzer_map &&
324  render_group_analyzer_map->size()) {
325  auto const itr = render_group_analyzer_map->find(cd->columnId);
326  if (itr != render_group_analyzer_map->end()) {
327  auto& render_group_analyzer = *itr->second;
328  render_group = render_group_analyzer.insertBoundsAndReturnRenderGroup(bounds);
329  }
330  }
331  }
332  }
333 
334  // allowed to be null?
335  if (is_null && col_ti.get_notnull()) {
336  throw std::runtime_error("NULL value provided for column (" + cd->columnName +
337  ") with NOT NULL constraint.");
338  }
339 
340  // import extracted geo
342  cd,
343  import_buffers,
344  col_idx,
345  coords,
346  bounds,
347  ring_sizes,
348  poly_rings,
349  render_group);
350 
351  // store null string in the base column
352  import_buffers[starting_col_idx]->add_value(
353  cd, copy_params.null_str, true, copy_params);
354 }
SQLTypes
Definition: sqltypes.h:38
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1144
std::string to_string(char const *&&v)
bool set_coordinates_from_separate_lon_lat_columns(const std::string_view lon_str, const std::string_view lat_str, SQLTypeInfo &ti, std::vector< double > &coords, const bool is_lon_lat_order)
CONSTEXPR DEVICE bool is_null(const T &value)
static bool getGeoColumns(const std::string &wkt_or_wkb_hex, SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:937
#define CHECK(condition)
Definition: Logger.h:223
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
Definition: Importer.cpp:1653
static bool isCoordinateScalar(const std::string_view datum)
#define IS_GEO(T)
Definition: sqltypes.h:251
#define IS_GEO_POLY(T)
Definition: sqltypes.h:255

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

void foreign_storage::TextFileBufferParser::processInvalidGeoColumn ( std::vector< std::unique_ptr< import_export::TypedImportBuffer >> &  import_buffers,
size_t &  col_idx,
const import_export::CopyParams copy_params,
const ColumnDescriptor cd,
std::shared_ptr< Catalog_Namespace::Catalog catalog 
)
staticprivate

Definition at line 205 of file TextFileBufferParser.cpp.

References CHECK, ColumnDescriptor::columnType, SQLTypeInfo::get_type(), Geospatial::GeoTypesFactory::getNullGeoColumns(), IS_GEO, import_export::CopyParams::null_str, foreign_storage::anonymous_namespace{TextFileBufferParser.cpp}::PROMOTE_POLYGON_TO_MULTIPOLYGON, and import_export::Importer::set_geo_physical_import_buffer().

Referenced by fillRejectedRowWithInvalidData().

210  {
211  auto col_ti = cd->columnType;
212  SQLTypes col_type = col_ti.get_type();
213  CHECK(IS_GEO(col_type));
214 
215  // store null string in the base column
216  import_buffers[col_idx]->add_value(cd, copy_params.null_str, true, copy_params);
217  ++col_idx;
218 
219  std::vector<double> coords;
220  std::vector<double> bounds;
221  std::vector<int> ring_sizes;
222  std::vector<int> poly_rings;
223  int render_group = 0;
224 
225  SQLTypeInfo import_ti{col_ti};
227  import_ti, coords, bounds, ring_sizes, poly_rings, PROMOTE_POLYGON_TO_MULTIPOLYGON);
228 
229  // import extracted geo
231  cd,
232  import_buffers,
233  col_idx,
234  coords,
235  bounds,
236  ring_sizes,
237  poly_rings,
238  render_group,
239  /*force_null=*/true);
240 }
SQLTypes
Definition: sqltypes.h:38
static void getNullGeoColumns(SQLTypeInfo &ti, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, const bool promote_poly_to_mpoly=false)
Definition: Types.cpp:1144
HOST DEVICE SQLTypes get_type() const
Definition: sqltypes.h:329
#define CHECK(condition)
Definition: Logger.h:223
static void set_geo_physical_import_buffer(const Catalog_Namespace::Catalog &catalog, const ColumnDescriptor *cd, std::vector< std::unique_ptr< TypedImportBuffer >> &import_buffers, size_t &col_idx, std::vector< double > &coords, std::vector< double > &bounds, std::vector< int > &ring_sizes, std::vector< int > &poly_rings, int render_group, const bool force_null=false)
Definition: Importer.cpp:1653
SQLTypeInfo columnType
#define IS_GEO(T)
Definition: sqltypes.h:251

+ Here is the call graph for this function:

+ Here is the caller graph for this function:

virtual import_export::CopyParams foreign_storage::TextFileBufferParser::validateAndGetCopyParams ( const ForeignTable foreign_table) const
pure virtual

Validates foreign table parse options and returns a CopyParams object upon successful validation. An exception is thrown if validation fails.

Implemented in foreign_storage::RegexFileBufferParser, and foreign_storage::CsvFileBufferParser.

Referenced by foreign_storage::AbstractTextFileDataWrapper::populateChunkMetadata(), foreign_storage::AbstractTextFileDataWrapper::populateChunks(), and foreign_storage::AbstractTextFileDataWrapper::restoreDataWrapperInternals().

+ Here is the caller graph for this function:

virtual void foreign_storage::TextFileBufferParser::validateFiles ( const FileReader file_reader,
const ForeignTable foreign_table 
) const
pure virtual

Performs basic validation of files to be parsed.

Implemented in foreign_storage::CsvFileBufferParser, and foreign_storage::RegexFileBufferParser.

Member Data Documentation

const std::string foreign_storage::TextFileBufferParser::BUFFER_SIZE_KEY = "BUFFER_SIZE"
inlinestatic

The documentation for this class was generated from the following files: