OmniSciDB  72c90bc290
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
Compressor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
24 #include "Shared/Compressor.h"
25 
26 #include <cstdint>
27 #include <memory>
28 #include <thread>
29 
30 #include <blosc.h>
31 
32 #include "Logger/Logger.h"
33 
34 // we only compress data if the payload size is greater than 512 MB
35 size_t g_compression_limit_bytes{512 * 1024 * 1024};
36 
38  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
39  blosc_init();
40  // We use maximum number of threads here since with tests we found that compression
41  // speed gets lear scalling with corresponding to the number of threads being used.
42 
43  blosc_set_nthreads(std::thread::hardware_concurrency());
44 
45  // We chosse faster compressor, accepting slightly lower compression ratio
46  // https://lz4.github.io/lz4/
47 
48  blosc_set_compressor(BLOSC_LZ4HC_COMPNAME);
49 }
50 
52  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
53  blosc_destroy();
54 }
55 
57  const uint8_t* buffer,
58  const size_t buffer_size,
59  uint8_t* compressed_buffer,
60  const size_t compressed_buffer_size,
61  const size_t min_compressor_bytes = g_compression_limit_bytes) {
62  if (compressed_buffer_size < BLOSC_MIN_HEADER_LENGTH) {
63  // Blosc compressor checks this condition during the initialization
64  // and throw "Output buffer size should be larger than 16 bytes" error
65  // if compressed_buffer_size < 16 (BLOSC_MIN_HEADER_LENGTH)
66  // but after sending interrupt signal, blosc compress function hangs until
67  // thrift timed out error and could not check this code.
68  // here, we can early return by explicitly checking this condition
69  // so as to avoid hangs in query runtime
70  return 0;
71  }
72 
73  if (buffer_size < min_compressor_bytes && min_compressor_bytes != 0) {
74  return 0;
75  }
76  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
77  const auto compressed_len = blosc_compress(5,
78  1,
79  sizeof(unsigned char),
80  buffer_size,
81  buffer,
82  &compressed_buffer[0],
83  compressed_buffer_size);
84 
85  if (compressed_len <= 0) {
86  // something went wrong. blosc retrun codes simply don't provide enough information
87  // for us to decide what.
88  throw CompressionFailedError(std::string("failed to compress result set of length ") +
89  std::to_string(buffer_size));
90  }
91  // we need to tell the other endpoint the size of the acctual data so it can
92  // decide whether it should decompress data or not. So we pass the original
93  // data length. and only send the compressed result if the output of the
94  // compressed result is smaller than the original
95  return compressed_len;
96 }
97 
98 std::string BloscCompressor::compress(const std::string& buffer) {
99  const auto buffer_size = buffer.size();
100  std::vector<uint8_t> compressed_buffer(getScratchSpaceSize(buffer_size));
101  try {
102  const size_t compressed_len = compress((uint8_t*)buffer.c_str(),
103  buffer_size,
104  &compressed_buffer[0],
105  getScratchSpaceSize(buffer_size));
106  if (compressed_len > 0 && compressed_len < buffer_size) {
107  // we need to tell the other endpoint the size of the acctual data so it can
108  // decide whether it should decompress data or not. So we pass the original
109  // data length. and only send the compressed result if the output of the
110  // compressed result is smaller than the original
111  compressed_buffer.resize(compressed_len);
112  return {compressed_buffer.begin(), compressed_buffer.end()};
113  }
114  } catch (const CompressionFailedError&) {
115  }
116  return buffer;
117 }
118 
119 size_t BloscCompressor::decompress(const uint8_t* compressed_buffer,
120  uint8_t* decompressed_buffer,
121  const size_t decompressed_size) {
122  size_t decompressed_buf_len, compressed_buf_len, block_size, decompressed_len = 0;
124  &compressed_buffer[0], &compressed_buf_len, &decompressed_buf_len, &block_size);
125  // check compressed buffer is a blosc compressed buffer.
126  if (compressed_buf_len > 0 && decompressed_size == decompressed_buf_len) {
127  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
128  decompressed_len =
129  blosc_decompress(&compressed_buffer[0], decompressed_buffer, decompressed_size);
130  }
131 
132  if (decompressed_len == 0) {
134  std::string("failed to decompress buffer for compressed size: ") +
135  std::to_string(compressed_buf_len));
136  }
137  if (decompressed_len != decompressed_size) {
139  std::string("decompression buffer size mismatch. Decompressed buffer length: ") +
140  std::to_string(decompressed_len));
141  }
142  return decompressed_len;
143 }
144 
145 std::string BloscCompressor::decompress(const std::string& buffer,
146  const size_t decompressed_size) {
147  std::vector<uint8_t> decompressed_buffer(decompressed_size);
148  if (buffer.size() == decompressed_size) {
149  return buffer;
150  }
151  try {
152  decompress(
153  (uint8_t*)&buffer[0], (uint8_t*)&decompressed_buffer[0], decompressed_size);
154  return {decompressed_buffer.begin(), decompressed_buffer.end()};
155  } catch (const CompressionFailedError&) {
156  }
157  return buffer;
158 }
159 
160 size_t BloscCompressor::compressOrMemcpy(const uint8_t* input_buffer,
161  uint8_t* output_buffer,
162  size_t uncompressed_size,
163  const size_t min_compressor_bytes) {
164  try {
165  const auto compressed_size = compress(input_buffer,
166  uncompressed_size,
167  output_buffer,
168  uncompressed_size,
169  min_compressor_bytes);
170  if (compressed_size > 0) {
171  return compressed_size;
172  }
173  } catch (const CompressionFailedError&) {
174  // catch exceptions from blosc
175  // we copy regardless what happens in compressor
176  if (uncompressed_size > min_compressor_bytes) {
177  LOG(WARNING) << "Compressor failed for byte size of " << uncompressed_size;
178  }
179  }
180  memcpy(output_buffer, input_buffer, uncompressed_size);
181  return uncompressed_size;
182 }
183 
184 bool BloscCompressor::decompressOrMemcpy(const uint8_t* compressed_buffer,
185  const size_t compressed_size,
186  uint8_t* decompressed_buffer,
187  const size_t decompressed_size) {
188  try {
189  decompress(compressed_buffer, decompressed_buffer, decompressed_size);
190  return true;
191  } catch (const CompressionFailedError&) {
192  // we will memcpy if we find that the buffer is not compressed
193 
194  if (compressed_size > decompressed_size) {
195  throw std::runtime_error(
196  "compressed buffer size is greater than decompressed buffer size.");
197  }
198  }
199  memcpy(decompressed_buffer, compressed_buffer, decompressed_size);
200  return false;
201 }
202 
203 void BloscCompressor::getBloscBufferSizes(const uint8_t* data_ptr,
204  size_t* num_bytes_compressed,
205  size_t* num_bytes_uncompressed,
206  size_t* block_size) {
207  blosc_cbuffer_sizes(data_ptr, num_bytes_uncompressed, num_bytes_compressed, block_size);
208 }
209 
211 
213  static std::mutex compressor_singleton_lock;
214  std::lock_guard<std::mutex> singleton_lock(compressor_singleton_lock);
215  if (instance == NULL) {
216  instance = new BloscCompressor();
217  }
218 
219  return instance;
220 }
221 
222 int BloscCompressor::setThreads(size_t num_threads) {
223  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
224  return blosc_set_nthreads(static_cast<int>(num_threads));
225 }
226 
227 int BloscCompressor::setCompressor(std::string& compressor_name) {
228  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
229  // Blosc is resilent enough to detect that the comprressor that was provided to it was
230  // supported or not. If the compressor is invalid or not supported it will simply keep
231  // current compressor.
232  return blosc_set_compressor(compressor_name.c_str());
233 }
int setThreads(size_t num_threads)
Definition: Compressor.cpp:222
size_t decompress(const uint8_t *compressed_buffer, uint8_t *decompressed_buffer, const size_t decompressed_size)
Definition: Compressor.cpp:119
#define LOG(tag)
Definition: Logger.h:285
size_t getScratchSpaceSize(const size_t len) const
Definition: Compressor.h:46
int setCompressor(std::string &compressor)
Definition: Compressor.cpp:227
size_t g_compression_limit_bytes
Definition: Compressor.cpp:35
size_t compressOrMemcpy(const uint8_t *input_buffer, uint8_t *output_buffer, const size_t uncompressed_size, const size_t min_compressor_bytes)
Definition: Compressor.cpp:160
std::string to_string(char const *&&v)
singleton class to handle concurrancy and state for blosc library. A C++ wrapper over a pure C librar...
std::mutex compressor_lock
Definition: Compressor.h:87
bool decompressOrMemcpy(const uint8_t *compressed_buffer, const size_t compressed_buffer_size, uint8_t *decompressed_buffer, const size_t decompressed_size)
Definition: Compressor.cpp:184
void getBloscBufferSizes(const uint8_t *data_ptr, size_t *num_bytes_compressed, size_t *num_bytes_uncompressed, size_t *block_size)
Definition: Compressor.cpp:203
static BloscCompressor * getCompressor()
Definition: Compressor.cpp:212
int64_t compress(const uint8_t *buffer, const size_t buffer_size, uint8_t *compressed_buffer, const size_t compressed_buffer_size, const size_t min_compressor_bytes)
Definition: Compressor.cpp:56
static BloscCompressor * instance
Definition: Compressor.h:88