17 #ifndef NONE_ENCODER_H
18 #define NONE_ENCODER_H
26 #include <tbb/parallel_for.h>
27 #include <tbb/parallel_reduce.h>
32 return std::is_integral<T>::value ? inline_int_null_value<T>()
33 : inline_fp_null_value<T>();
44 const std::vector<size_t>& selected_idx,
45 const size_t byte_limit)
override {
47 <<
"getNumElemsForBytesEncodedDataAtIndices unexpectedly called for non varlen"
55 const std::vector<size_t>& selected_idx)
override {
56 std::shared_ptr<ChunkMetadata> chunk_metadata;
61 selected_idx, [&](
const size_t start_pos,
const size_t end_pos) {
62 size_t elem_count = end_pos - start_pos;
63 auto data_ptr = data +
sizeof(
T) * selected_idx[start_pos];
67 return chunk_metadata;
72 const size_t start_idx,
73 const size_t num_elements)
override {
74 auto current_data = data +
sizeof(
T) * start_idx;
78 std::shared_ptr<ChunkMetadata>
appendData(int8_t*& src_data,
79 const size_t num_elems_to_append,
81 const bool replicating =
false,
82 const int64_t offset = -1)
override {
83 if (offset == 0 && num_elems_to_append >=
num_elems_) {
86 T* unencodedData =
reinterpret_cast<T*
>(src_data);
87 std::vector<T> encoded_data;
89 if (num_elems_to_append > 0) {
90 encoded_data.resize(num_elems_to_append);
92 std::fill(encoded_data.begin(), encoded_data.end(), data);
100 replicating ? reinterpret_cast<int8_t*>(encoded_data.data()) : src_data,
101 num_elems_to_append *
sizeof(
T));
103 src_data += num_elems_to_append *
sizeof(
T);
110 src_data, num_elems_to_append *
sizeof(
T), static_cast<size_t>(offset));
112 auto chunk_metadata = std::make_shared<ChunkMetadata>();
114 return chunk_metadata;
117 void getMetadata(
const std::shared_ptr<ChunkMetadata>& chunkMetadata)
override {
124 auto chunk_metadata = std::make_shared<ChunkMetadata>(ti, 0, 0,
ChunkStats{});
126 return chunk_metadata;
134 const auto data =
static_cast<T>(val);
145 const auto data =
static_cast<T>(val);
151 void updateStats(
const int8_t*
const src_data,
const size_t num_elements)
override {
152 const T* unencoded_data =
reinterpret_cast<const T*
>(src_data);
153 for (
size_t i = 0; i < num_elements; ++i) {
159 const size_t num_elements)
override {
160 const T* data =
reinterpret_cast<const T*
>(dst_data);
163 tbb::blocked_range(
size_t(0), num_elements),
165 [&](
const auto& range,
auto init) {
166 auto [min, max, nulls] =
init;
167 for (
size_t i = range.begin(); i < range.end(); i++) {
168 if (data[i] != none_encoded_null_value<T>()) {
170 min = std::min(min, data[i]);
171 max = std::max(max, data[i]);
176 return std::tuple(min, max, nulls);
178 [&](
auto lhs,
auto rhs) {
179 const auto [lhs_min, lhs_max, lhs_nulls] = lhs;
180 const auto [rhs_min, rhs_max, rhs_nulls] = rhs;
181 return std::tuple(std::min(lhs_min, rhs_min),
182 std::max(lhs_max, rhs_max),
183 lhs_nulls || rhs_nulls);
188 const size_t start_idx,
189 const size_t num_elements)
override {
194 const size_t start_idx,
195 const size_t num_elements)
override {
201 const auto that_typed =
static_cast<const NoneEncoder&
>(that);
202 if (that_typed.has_nulls) {
211 fwrite((int8_t*)&
num_elems_,
sizeof(
size_t), 1, f);
212 fwrite((int8_t*)&
dataMin,
sizeof(
T), 1, f);
213 fwrite((int8_t*)&
dataMax,
sizeof(
T), 1, f);
214 fwrite((int8_t*)&
has_nulls,
sizeof(
bool), 1, f);
219 fread((int8_t*)&
num_elems_,
sizeof(
size_t), 1, f);
220 fread((int8_t*)&
dataMin,
sizeof(
T), 1, f);
221 fread((int8_t*)&
dataMax,
sizeof(
T), 1, f);
222 fread((int8_t*)&
has_nulls,
sizeof(
bool), 1, f);
226 const auto new_min = DatumFetcher::getDatumVal<T>(stats.
min);
227 const auto new_max = DatumFetcher::getDatumVal<T>(stats.
max);
241 auto castedEncoder =
reinterpret_cast<const NoneEncoder<T>*
>(copyFromEncoder);
242 dataMin = castedEncoder->dataMin;
243 dataMax = castedEncoder->dataMax;
248 dataMin = std::numeric_limits<T>::max();
249 dataMax = std::numeric_limits<T>::lowest();
259 if (unencoded_data == none_encoded_null_value<T>()) {
266 return unencoded_data;
270 #endif // NONE_ENCODER_H
void updateStats(const int8_t *const src_data, const size_t num_elements) override
void writeMetadata(FILE *f) override
DecimalOverflowValidator decimal_overflow_validator_
void updateStats(const int64_t val, const bool is_null) override
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
void resetChunkStats() override
void execute_over_contiguous_indices(const std::vector< size_t > &indices, std::function< void(const size_t, const size_t)> to_execute)
std::shared_ptr< ChunkMetadata > appendEncodedDataAtIndices(const int8_t *, int8_t *data, const std::vector< size_t > &selected_idx) override
void updateStats(const std::vector< std::string > *const src_data, const size_t start_idx, const size_t num_elements) override
DEVICE void fill(ARGS &&...args)
CONSTEXPR DEVICE bool is_null(const T &value)
Data_Namespace::AbstractBuffer * buffer_
std::shared_ptr< ChunkMetadata > appendData(int8_t *&src_data, const size_t num_elems_to_append, const SQLTypeInfo &, const bool replicating=false, const int64_t offset=-1) override
void init(LogOptions const &log_opts)
size_t getNumElems() const
An AbstractBuffer is a unit of data management for a data manager.
void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata) override
virtual void write(int8_t *src, const size_t num_bytes, const size_t offset=0, const MemoryLevel src_buffer_type=CPU_LEVEL, const int src_device_id=-1)=0
Value parallel_reduce(const blocked_range< Int > &range, const Value &identity, const RealBody &real_body, const Reduction &reduction, const Partitioner &p=Partitioner())
Parallel iteration with reduction.
T none_encoded_null_value()
bool resetChunkStats(const ChunkStats &stats) override
: Reset chunk level stats (min, max, nulls) using new values from the argument.
size_t getNumElemsForBytesEncodedDataAtIndices(const int8_t *index_data, const std::vector< size_t > &selected_idx, const size_t byte_limit) override
void updateStats(const double val, const bool is_null) override
void updateStats(const std::vector< ArrayDatum > *const src_data, const size_t start_idx, const size_t num_elements) override
std::shared_ptr< ChunkMetadata > getMetadata(const SQLTypeInfo &ti) override
virtual void append(int8_t *src, const size_t num_bytes, const MemoryLevel src_buffer_type=CPU_LEVEL, const int device_id=-1)=0
std::shared_ptr< ChunkMetadata > appendEncodedData(const int8_t *, int8_t *data, const size_t start_idx, const size_t num_elements) override
NoneEncoder(Data_Namespace::AbstractBuffer *buffer)
void updateStatsEncoded(const int8_t *const dst_data, const size_t num_elements) override
void reduceStats(const Encoder &that) override
void copyMetadata(const Encoder *copyFromEncoder) override
T validateDataAndUpdateStats(const T &unencoded_data)
void validate(T value) const
void readMetadata(FILE *f) override