OmniSciDB  b24e664e58
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
Compressor.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2018 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
25 #include "Compressor.h"
26 #include <blosc.h>
27 #include <string>
28 #include <thread>
29 #include "Logger.h"
30 
31 // we only compress data if the payload size is greater than 512 MB
32 size_t g_compression_limit_bytes{512 * 1024 * 1024};
33 
35  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
36  blosc_init();
37  // We use maximum number of threads here since with tests we found that compression
38  // speed gets lear scalling with corresponding to the number of threads being used.
39 
40  blosc_set_nthreads(std::thread::hardware_concurrency());
41 
42  // We chosse faster compressor, accepting slightly lower compression ratio
43  // https://lz4.github.io/lz4/
44 
45  blosc_set_compressor(BLOSC_LZ4HC_COMPNAME);
46 }
47 
49  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
50  blosc_destroy();
51 }
52 
54  const uint8_t* buffer,
55  const size_t buffer_size,
56  uint8_t* compressed_buffer,
57  const size_t compressed_buffer_size,
58  const size_t min_compressor_bytes = g_compression_limit_bytes) {
59  if (buffer_size < min_compressor_bytes && min_compressor_bytes != 0) {
60  return 0;
61  }
62  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
63  const auto compressed_len = blosc_compress(5,
64  1,
65  sizeof(unsigned char),
66  buffer_size,
67  buffer,
68  &compressed_buffer[0],
69  compressed_buffer_size);
70 
71  if (compressed_len <= 0) {
72  // something went wrong. blosc retrun codes simply don't provide enough information
73  // for us to decide what.
74 
75  throw CompressionFailedError(std::string("failed to compress result set of length ") +
76  std::to_string(buffer_size));
77  }
78  // we need to tell the other endpoint the size of the acctual data so it can
79  // decide whether it should decompress data or not. So we pass the original
80  // data length. and only send the compressed result if the output of the
81  // compressed result is smaller than the original
82  return compressed_len;
83 }
84 
85 std::string BloscCompressor::compress(const std::string& buffer) {
86  const auto buffer_size = buffer.size();
87  std::vector<uint8_t> compressed_buffer(getScratchSpaceSize(buffer_size));
88  try {
89  const size_t compressed_len = compress((uint8_t*)buffer.c_str(),
90  buffer_size,
91  &compressed_buffer[0],
92  getScratchSpaceSize(buffer_size));
93  if (compressed_len > 0 && compressed_len < buffer_size) {
94  // we need to tell the other endpoint the size of the acctual data so it can
95  // decide whether it should decompress data or not. So we pass the original
96  // data length. and only send the compressed result if the output of the
97  // compressed result is smaller than the original
98  compressed_buffer.resize(compressed_len);
99  return {compressed_buffer.begin(), compressed_buffer.end()};
100  }
101  } catch (const CompressionFailedError& e) {
102  }
103 
104  return buffer;
105 }
106 
107 size_t BloscCompressor::decompress(const uint8_t* compressed_buffer,
108  uint8_t* decompressed_buffer,
109  const size_t decompressed_size) {
110  size_t decompressed_buf_len, compressed_buf_len, block_size, decompressed_len = 0;
112  &compressed_buffer[0], &compressed_buf_len, &decompressed_buf_len, &block_size);
113  // check compressed buffer is a blosc compressed buffer.
114  if (compressed_buf_len > 0 && decompressed_size == decompressed_buf_len) {
115  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
116  decompressed_len =
117  blosc_decompress(&compressed_buffer[0], decompressed_buffer, decompressed_size);
118  }
119 
120  if (decompressed_len == 0) {
122  std::string("failed to decompress buffer for compressed size: ") +
123  std::to_string(compressed_buf_len));
124  }
125  if (decompressed_len != decompressed_size) {
127  std::string("decompression buffer size mismatch. Decompressed buffer length: ") +
128  std::to_string(decompressed_len));
129  }
130  return decompressed_len;
131 }
132 
133 std::string BloscCompressor::decompress(const std::string& buffer,
134  const size_t decompressed_size) {
135  std::vector<uint8_t> decompressed_buffer(decompressed_size);
136  if (buffer.size() == decompressed_size) {
137  return buffer;
138  }
139  try {
140  decompress(
141  (uint8_t*)&buffer[0], (uint8_t*)&decompressed_buffer[0], decompressed_size);
142  return {decompressed_buffer.begin(), decompressed_buffer.end()};
143  } catch (const CompressionFailedError& e) {
144  }
145  return buffer;
146 }
147 
148 size_t BloscCompressor::compressOrMemcpy(const uint8_t* input_buffer,
149  uint8_t* output_buffer,
150  size_t uncompressed_size,
151  const size_t min_compressor_bytes) {
152  try {
153  const auto compressed_size = compress(input_buffer,
154  uncompressed_size,
155  output_buffer,
156  uncompressed_size,
157  min_compressor_bytes);
158  if (compressed_size > 0) {
159  return compressed_size;
160  }
161  } catch (const CompressionFailedError& e) {
162  // catch exceptions from blosc
163  // we copy regardless what happens in compressor
164  if (uncompressed_size > min_compressor_bytes) {
165  LOG(WARNING) << "Compressor failed for byte size of " << uncompressed_size;
166  }
167  }
168  memcpy(output_buffer, input_buffer, uncompressed_size);
169  return uncompressed_size;
170 }
171 
172 bool BloscCompressor::decompressOrMemcpy(const uint8_t* compressed_buffer,
173  const size_t compressed_size,
174  uint8_t* decompressed_buffer,
175  const size_t decompressed_size) {
176  try {
177  decompress(compressed_buffer, decompressed_buffer, decompressed_size);
178  return true;
179  } catch (const CompressionFailedError& e) {
180  // we will memcpy if we find that the buffer is not compressed
181 
182  if (compressed_size > decompressed_size) {
183  throw std::runtime_error(
184  "compressed buffer size is greater than decompressed buffer size.");
185  }
186  }
187  memcpy(decompressed_buffer, compressed_buffer, decompressed_size);
188  return false;
189 }
190 
191 void BloscCompressor::getBloscBufferSizes(const uint8_t* data_ptr,
192  size_t* num_bytes_compressed,
193  size_t* num_bytes_uncompressed,
194  size_t* block_size) {
195  blosc_cbuffer_sizes(data_ptr, num_bytes_uncompressed, num_bytes_compressed, block_size);
196 }
197 
199 
201  static std::mutex compressor_singleton_lock;
202  std::lock_guard<std::mutex> singleton_lock(compressor_singleton_lock);
203 
204  if (instance == NULL) {
205  instance = new BloscCompressor();
206  }
207 
208  return instance;
209 }
210 
211 int BloscCompressor::setThreads(size_t num_threads) {
212  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
213  return blosc_set_nthreads(num_threads);
214 }
215 
216 int BloscCompressor::setCompressor(std::string& compressor_name) {
217  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
218  // Blosc is resilent enough to detect that the comprressor that was provided to it was
219  // supported or not. If the compressor is invalid or not supported it will simply keep
220  // current compressor.
221  return blosc_set_compressor(compressor_name.c_str());
222 }
int setThreads(size_t num_threads)
Definition: Compressor.cpp:211
size_t decompress(const uint8_t *compressed_buffer, uint8_t *decompressed_buffer, const size_t decompressed_size)
Definition: Compressor.cpp:107
#define LOG(tag)
Definition: Logger.h:185
size_t getScratchSpaceSize(const size_t len) const
Definition: Compressor.h:47
int setCompressor(std::string &compressor)
Definition: Compressor.cpp:216
size_t g_compression_limit_bytes
Definition: Compressor.cpp:32
size_t compressOrMemcpy(const uint8_t *input_buffer, uint8_t *output_buffer, const size_t uncompressed_size, const size_t min_compressor_bytes)
Definition: Compressor.cpp:148
std::string to_string(char const *&&v)
singleton class to handle concurrancy and state for blosc library. A C++ wraper over a pure C library...
std::mutex compressor_lock
Definition: Compressor.h:86
bool decompressOrMemcpy(const uint8_t *compressed_buffer, const size_t compressed_buffer_size, uint8_t *decompressed_buffer, const size_t decompressed_size)
Definition: Compressor.cpp:172
void getBloscBufferSizes(const uint8_t *data_ptr, size_t *num_bytes_compressed, size_t *num_bytes_uncompressed, size_t *block_size)
Definition: Compressor.cpp:191
static BloscCompressor * getCompressor()
Definition: Compressor.cpp:200
int64_t compress(const uint8_t *buffer, const size_t buffer_size, uint8_t *compressed_buffer, const size_t compressed_buffer_size, const size_t min_compressor_bytes)
Definition: Compressor.cpp:53
static BloscCompressor * instance
Definition: Compressor.h:87