OmniSciDB  8fa3bf436f
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
CsvShared.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "CsvShared.h"
17 #include "CsvDataWrapper.h"
19 #include "FsiJsonUtils.h"
21 #include "Utils/DdlUtils.h"
22 
23 namespace foreign_storage {
24 
25 // Serialization functions for FileRegion
26 void set_value(rapidjson::Value& json_val,
27  const FileRegion& file_region,
28  rapidjson::Document::AllocatorType& allocator) {
29  json_val.SetObject();
31  json_val, file_region.first_row_file_offset, "first_row_file_offset", allocator);
33  json_val, file_region.first_row_index, "first_row_index", allocator);
35  json_val, file_region.region_size, "region_size", allocator);
37  json_val, file_region.row_count, "row_count", allocator);
38  if (file_region.filename.size()) {
40  json_val, file_region.filename, "filename", allocator);
41  }
42 }
43 
44 void get_value(const rapidjson::Value& json_val, FileRegion& file_region) {
45  CHECK(json_val.IsObject());
47  json_val, file_region.first_row_file_offset, "first_row_file_offset");
49  json_val, file_region.first_row_index, "first_row_index");
50  json_utils::get_value_from_object(json_val, file_region.region_size, "region_size");
51  json_utils::get_value_from_object(json_val, file_region.row_count, "row_count");
52  if (json_val.HasMember("filename")) {
53  json_utils::get_value_from_object(json_val, file_region.filename, "filename");
54  }
55 }
56 
57 namespace Csv {
58 namespace {
59 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
60  const std::string& option_name,
61  const size_t expected_num_chars) {
62  if (auto it = foreign_table->options.find(option_name);
63  it != foreign_table->options.end()) {
64  if (it->second.length() != expected_num_chars) {
65  throw std::runtime_error{"Value of \"" + option_name +
66  "\" foreign table option has the wrong number of "
67  "characters. Expected " +
68  std::to_string(expected_num_chars) + " character(s)."};
69  }
70  return it->second;
71  }
72  return "";
73 }
74 
75 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
76  const std::string& option_name) {
77  if (auto it = foreign_table->options.find(option_name);
78  it != foreign_table->options.end()) {
79  if (boost::iequals(it->second, "TRUE")) {
80  return true;
81  } else if (boost::iequals(it->second, "FALSE")) {
82  return false;
83  } else {
84  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
85  "\" foreign table option. "
86  "Value must be either 'true' or 'false'."};
87  }
88  }
89  return std::nullopt;
90 }
91 } // namespace
92 
93 bool validate_and_get_is_s3_select(const ForeignTable* foreign_table) {
94  static constexpr const char* S3_DIRECT = "S3_DIRECT";
95  static constexpr const char* S3_SELECT = "S3_SELECT";
96  static constexpr const char* S3_ACCESS_TYPE = "S3_ACCESS_TYPE";
97  auto access_type = foreign_table->options.find(S3_ACCESS_TYPE);
98 
99  if (access_type != foreign_table->options.end()) {
100  auto& server_options = foreign_table->foreign_server->options;
101  if (server_options.find(AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY)->second !=
103  throw std::runtime_error{
104  "The \"" + std::string{S3_ACCESS_TYPE} +
105  "\" option is only valid for foreign tables using servers with \"" +
106  AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY + "\" option value of \"" +
108  }
109  if (access_type->second != S3_DIRECT && access_type->second != S3_SELECT) {
110  throw std::runtime_error{
111  "Invalid value provided for the \"" + std::string{S3_ACCESS_TYPE} +
112  "\" option. Value must be one of the following: " + S3_DIRECT + ", " +
113  S3_SELECT + "."};
114  }
115  return (access_type->second == S3_SELECT);
116  } else {
117  return false;
118  }
119 }
120 
121 void validate_options(const ForeignTable* foreign_table) {
122  validate_and_get_copy_params(foreign_table);
123  validate_and_get_is_s3_select(foreign_table);
124 }
125 
127  const ForeignTable* foreign_table) {
128  import_export::CopyParams copy_params{};
129  copy_params.plain_text = true;
130  if (const auto& value =
131  validate_and_get_string_with_length(foreign_table, "ARRAY_DELIMITER", 1);
132  !value.empty()) {
133  copy_params.array_delim = value[0];
134  }
135  if (const auto& value =
136  validate_and_get_string_with_length(foreign_table, "ARRAY_MARKER", 2);
137  !value.empty()) {
138  copy_params.array_begin = value[0];
139  copy_params.array_end = value[1];
140  }
141  if (auto it = foreign_table->options.find("BUFFER_SIZE");
142  it != foreign_table->options.end()) {
143  copy_params.buffer_size = std::stoi(it->second);
144  }
145  if (const auto& value =
146  validate_and_get_string_with_length(foreign_table, "DELIMITER", 1);
147  !value.empty()) {
148  copy_params.delimiter = value[0];
149  }
150  if (const auto& value = validate_and_get_string_with_length(foreign_table, "ESCAPE", 1);
151  !value.empty()) {
152  copy_params.escape = value[0];
153  }
154  auto has_header = validate_and_get_bool_value(foreign_table, "HEADER");
155  if (has_header.has_value()) {
156  if (has_header.value()) {
157  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
158  } else {
159  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
160  }
161  }
162  if (const auto& value =
163  validate_and_get_string_with_length(foreign_table, "LINE_DELIMITER", 1);
164  !value.empty()) {
165  copy_params.line_delim = value[0];
166  }
167  copy_params.lonlat =
168  validate_and_get_bool_value(foreign_table, "LONLAT").value_or(copy_params.lonlat);
169 
170  if (auto it = foreign_table->options.find("NULLS");
171  it != foreign_table->options.end()) {
172  copy_params.null_str = it->second;
173  }
174  if (const auto& value = validate_and_get_string_with_length(foreign_table, "QUOTE", 1);
175  !value.empty()) {
176  copy_params.quote = value[0];
177  }
178  copy_params.quoted =
179  validate_and_get_bool_value(foreign_table, "QUOTED").value_or(copy_params.quoted);
180  return copy_params;
181 }
182 
184  const ChunkKey& chunk_key,
185  std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
186  const std::map<ChunkKey, AbstractBuffer*>& buffers) {
187  auto catalog =
189  CHECK(catalog);
190 
191  ChunkKey data_chunk_key = chunk_key;
192  AbstractBuffer* data_buffer = nullptr;
193  AbstractBuffer* index_buffer = nullptr;
194  const auto column = catalog->getMetadataForColumnUnlocked(
195  chunk_key[CHUNK_KEY_TABLE_IDX], chunk_key[CHUNK_KEY_COLUMN_IDX]);
196 
197  if (column->columnType.is_varlen_indeed()) {
198  data_chunk_key.push_back(1);
199  ChunkKey index_chunk_key = chunk_key;
200  index_chunk_key.push_back(2);
201 
202  CHECK(buffers.find(data_chunk_key) != buffers.end());
203  CHECK(buffers.find(index_chunk_key) != buffers.end());
204 
205  data_buffer = buffers.find(data_chunk_key)->second;
206  index_buffer = buffers.find(index_chunk_key)->second;
207  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
208  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
209 
210  size_t index_offset_size{0};
211  if (column->columnType.is_string() || column->columnType.is_geometry()) {
212  index_offset_size = sizeof(StringOffsetT);
213  } else if (column->columnType.is_array()) {
214  index_offset_size = sizeof(ArrayOffsetT);
215  } else {
216  UNREACHABLE();
217  }
218  CHECK(chunk_metadata_map.find(data_chunk_key) != chunk_metadata_map.end());
219  index_buffer->reserve(index_offset_size *
220  (chunk_metadata_map.at(data_chunk_key)->numElements + 1));
221  } else {
222  data_chunk_key = chunk_key;
223  CHECK(buffers.find(data_chunk_key) != buffers.end());
224  data_buffer = buffers.find(data_chunk_key)->second;
225  }
226  data_buffer->reserve(chunk_metadata_map.at(data_chunk_key)->numBytes);
227 
228  auto retval = Chunk_NS::Chunk{column};
229  retval.setBuffer(data_buffer);
230  retval.setIndexBuffer(index_buffer);
231  retval.initEncoder();
232  return retval;
233 }
234 
235 std::shared_ptr<ChunkMetadata> get_placeholder_metadata(const ColumnDescriptor* column,
236  size_t num_elements) {
237  ForeignStorageBuffer empty_buffer;
238  // Use default encoder metadata as in parquet wrapper
239  empty_buffer.initEncoder(column->columnType);
240  auto chunk_metadata = empty_buffer.getEncoder()->getMetadata(column->columnType);
241  chunk_metadata->numElements = num_elements;
242 
243  if (!column->columnType.is_varlen_indeed()) {
244  chunk_metadata->numBytes = column->columnType.get_size() * num_elements;
245  }
246  // min/max not set by default for arrays, so get from elem type encoder
247  if (column->columnType.is_array()) {
248  ForeignStorageBuffer scalar_buffer;
249  scalar_buffer.initEncoder(column->columnType.get_elem_type());
250  auto scalar_metadata =
251  scalar_buffer.getEncoder()->getMetadata(column->columnType.get_elem_type());
252  chunk_metadata->chunkStats.min = scalar_metadata->chunkStats.min;
253  chunk_metadata->chunkStats.max = scalar_metadata->chunkStats.max;
254  }
255  chunk_metadata->chunkStats.has_nulls = true;
256  return chunk_metadata;
257 }
258 } // namespace Csv
259 
260 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:211
std::vector< int > ChunkKey
Definition: types.h:37
HOST DEVICE int get_size() const
Definition: sqltypes.h:324
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
Definition: CsvShared.cpp:75
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:126
#define CHUNK_KEY_DB_IDX
Definition: types.h:39
#define UNREACHABLE()
Definition: Logger.h:247
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string to_string(char const *&&v)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:109
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
Definition: CsvShared.cpp:235
void validate_options(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:121
int32_t StringOffsetT
Definition: sqltypes.h:937
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:227
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: FsiJsonUtils.h:126
static SysCatalog & instance()
Definition: SysCatalog.h:292
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:40
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: FsiJsonUtils.h:111
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
Definition: CsvShared.cpp:59
int32_t ArrayOffsetT
Definition: sqltypes.h:938
const ForeignServer * foreign_server
Definition: ForeignTable.h:53
#define CHECK(condition)
Definition: Logger.h:203
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:520
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:41
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:713
virtual void reserve(size_t num_bytes)=0
bool validate_and_get_is_s3_select(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:93
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
bool is_array() const
Definition: sqltypes.h:497
Chunk_NS::Chunk make_chunk_for_column(const ChunkKey &chunk_key, std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers)
Definition: CsvShared.cpp:183