OmniSciDB  baf940c279
BloscCompressor Class Reference

#include <Compressor.h>

+ Collaboration diagram for BloscCompressor:

Public Member Functions

size_t getScratchSpaceSize (const size_t len) const
 
int64_t compress (const uint8_t *buffer, const size_t buffer_size, uint8_t *compressed_buffer, const size_t compressed_buffer_size, const size_t min_compressor_bytes)
 
std::string compress (const std::string &buffer)
 
size_t decompress (const uint8_t *compressed_buffer, uint8_t *decompressed_buffer, const size_t decompressed_size)
 
std::string decompress (const std::string &buffer, const size_t decompressed_size)
 
size_t compressOrMemcpy (const uint8_t *input_buffer, uint8_t *output_buffer, const size_t uncompressed_size, const size_t min_compressor_bytes)
 
bool decompressOrMemcpy (const uint8_t *compressed_buffer, const size_t compressed_buffer_size, uint8_t *decompressed_buffer, const size_t decompressed_size)
 
void getBloscBufferSizes (const uint8_t *data_ptr, size_t *num_bytes_compressed, size_t *num_bytes_uncompressed, size_t *block_size)
 
int setThreads (size_t num_threads)
 
int setCompressor (std::string &compressor)
 
 ~BloscCompressor ()
 

Static Public Member Functions

static BloscCompressorgetCompressor ()
 

Private Member Functions

 BloscCompressor ()
 

Private Attributes

std::mutex compressor_lock
 

Static Private Attributes

static BloscCompressorinstance = NULL
 

Detailed Description

Definition at line 39 of file Compressor.h.

Constructor & Destructor Documentation

◆ ~BloscCompressor()

BloscCompressor::~BloscCompressor ( )

Definition at line 52 of file Compressor.cpp.

References compressor_lock.

52  {
53  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
54  blosc_destroy();
55 }
std::mutex compressor_lock
Definition: Compressor.h:88

◆ BloscCompressor()

BloscCompressor::BloscCompressor ( )
private

Definition at line 38 of file Compressor.cpp.

References compressor_lock.

Referenced by getCompressor().

38  {
39  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
40  blosc_init();
41  // We use maximum number of threads here since with tests we found that compression
42  // speed gets lear scalling with corresponding to the number of threads being used.
43 
44  blosc_set_nthreads(std::thread::hardware_concurrency());
45 
46  // We chosse faster compressor, accepting slightly lower compression ratio
47  // https://lz4.github.io/lz4/
48 
49  blosc_set_compressor(BLOSC_LZ4HC_COMPNAME);
50 }
std::mutex compressor_lock
Definition: Compressor.h:88
+ Here is the caller graph for this function:

Member Function Documentation

◆ compress() [1/2]

int64_t BloscCompressor::compress ( const uint8_t *  buffer,
const size_t  buffer_size,
uint8_t *  compressed_buffer,
const size_t  compressed_buffer_size,
const size_t  min_compressor_bytes = g_compression_limit_bytes 
)

Definition at line 57 of file Compressor.cpp.

References compressor_lock, and to_string().

Referenced by compress(), and compressOrMemcpy().

62  {
63  if (compressed_buffer_size < BLOSC_MIN_HEADER_LENGTH) {
64  // Blosc compressor checks this condition during the initialization
65  // and throw "Output buffer size should be larger than 16 bytes" error
66  // if compressed_buffer_size < 16 (BLOSC_MIN_HEADER_LENGTH)
67  // but after sending interrupt signal, blosc compress function hangs until
68  // thrift timed out error and could not check this code.
69  // here, we can early return by explicitly checking this condition
70  // so as to avoid hangs in query runtime
71  return 0;
72  }
73 
74  if (buffer_size < min_compressor_bytes && min_compressor_bytes != 0) {
75  return 0;
76  }
77  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
78  const auto compressed_len = blosc_compress(5,
79  1,
80  sizeof(unsigned char),
81  buffer_size,
82  buffer,
83  &compressed_buffer[0],
84  compressed_buffer_size);
85 
86  if (compressed_len <= 0) {
87  // something went wrong. blosc retrun codes simply don't provide enough information
88  // for us to decide what.
89  throw CompressionFailedError(std::string("failed to compress result set of length ") +
90  std::to_string(buffer_size));
91  }
92  // we need to tell the other endpoint the size of the acctual data so it can
93  // decide whether it should decompress data or not. So we pass the original
94  // data length. and only send the compressed result if the output of the
95  // compressed result is smaller than the original
96  return compressed_len;
97 }
std::string to_string(char const *&&v)
std::mutex compressor_lock
Definition: Compressor.h:88
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ compress() [2/2]

std::string BloscCompressor::compress ( const std::string &  buffer)

Definition at line 99 of file Compressor.cpp.

References compress(), and getScratchSpaceSize().

99  {
100  const auto buffer_size = buffer.size();
101  std::vector<uint8_t> compressed_buffer(getScratchSpaceSize(buffer_size));
102  try {
103  const size_t compressed_len = compress((uint8_t*)buffer.c_str(),
104  buffer_size,
105  &compressed_buffer[0],
106  getScratchSpaceSize(buffer_size));
107  if (compressed_len > 0 && compressed_len < buffer_size) {
108  // we need to tell the other endpoint the size of the acctual data so it can
109  // decide whether it should decompress data or not. So we pass the original
110  // data length. and only send the compressed result if the output of the
111  // compressed result is smaller than the original
112  compressed_buffer.resize(compressed_len);
113  return {compressed_buffer.begin(), compressed_buffer.end()};
114  }
115  } catch (const CompressionFailedError&) {
116  }
117  return buffer;
118 }
size_t getScratchSpaceSize(const size_t len) const
Definition: Compressor.h:47
int64_t compress(const uint8_t *buffer, const size_t buffer_size, uint8_t *compressed_buffer, const size_t compressed_buffer_size, const size_t min_compressor_bytes)
Definition: Compressor.cpp:57
+ Here is the call graph for this function:

◆ compressOrMemcpy()

size_t BloscCompressor::compressOrMemcpy ( const uint8_t *  input_buffer,
uint8_t *  output_buffer,
const size_t  uncompressed_size,
const size_t  min_compressor_bytes 
)

Definition at line 161 of file Compressor.cpp.

References compress(), LOG, and logger::WARNING.

164  {
165  try {
166  const auto compressed_size = compress(input_buffer,
167  uncompressed_size,
168  output_buffer,
169  uncompressed_size,
170  min_compressor_bytes);
171  if (compressed_size > 0) {
172  return compressed_size;
173  }
174  } catch (const CompressionFailedError&) {
175  // catch exceptions from blosc
176  // we copy regardless what happens in compressor
177  if (uncompressed_size > min_compressor_bytes) {
178  LOG(WARNING) << "Compressor failed for byte size of " << uncompressed_size;
179  }
180  }
181  memcpy(output_buffer, input_buffer, uncompressed_size);
182  return uncompressed_size;
183 }
#define LOG(tag)
Definition: Logger.h:188
int64_t compress(const uint8_t *buffer, const size_t buffer_size, uint8_t *compressed_buffer, const size_t compressed_buffer_size, const size_t min_compressor_bytes)
Definition: Compressor.cpp:57
+ Here is the call graph for this function:

◆ decompress() [1/2]

size_t BloscCompressor::decompress ( const uint8_t *  compressed_buffer,
uint8_t *  decompressed_buffer,
const size_t  decompressed_size 
)

Definition at line 120 of file Compressor.cpp.

References compressor_lock, getBloscBufferSizes(), and to_string().

Referenced by decompress(), and decompressOrMemcpy().

122  {
123  size_t decompressed_buf_len, compressed_buf_len, block_size, decompressed_len = 0;
125  &compressed_buffer[0], &compressed_buf_len, &decompressed_buf_len, &block_size);
126  // check compressed buffer is a blosc compressed buffer.
127  if (compressed_buf_len > 0 && decompressed_size == decompressed_buf_len) {
128  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
129  decompressed_len =
130  blosc_decompress(&compressed_buffer[0], decompressed_buffer, decompressed_size);
131  }
132 
133  if (decompressed_len == 0) {
135  std::string("failed to decompress buffer for compressed size: ") +
136  std::to_string(compressed_buf_len));
137  }
138  if (decompressed_len != decompressed_size) {
140  std::string("decompression buffer size mismatch. Decompressed buffer length: ") +
141  std::to_string(decompressed_len));
142  }
143  return decompressed_len;
144 }
std::string to_string(char const *&&v)
std::mutex compressor_lock
Definition: Compressor.h:88
void getBloscBufferSizes(const uint8_t *data_ptr, size_t *num_bytes_compressed, size_t *num_bytes_uncompressed, size_t *block_size)
Definition: Compressor.cpp:204
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ decompress() [2/2]

std::string BloscCompressor::decompress ( const std::string &  buffer,
const size_t  decompressed_size 
)

Definition at line 146 of file Compressor.cpp.

References decompress().

147  {
148  std::vector<uint8_t> decompressed_buffer(decompressed_size);
149  if (buffer.size() == decompressed_size) {
150  return buffer;
151  }
152  try {
153  decompress(
154  (uint8_t*)&buffer[0], (uint8_t*)&decompressed_buffer[0], decompressed_size);
155  return {decompressed_buffer.begin(), decompressed_buffer.end()};
156  } catch (const CompressionFailedError&) {
157  }
158  return buffer;
159 }
size_t decompress(const uint8_t *compressed_buffer, uint8_t *decompressed_buffer, const size_t decompressed_size)
Definition: Compressor.cpp:120
+ Here is the call graph for this function:

◆ decompressOrMemcpy()

bool BloscCompressor::decompressOrMemcpy ( const uint8_t *  compressed_buffer,
const size_t  compressed_buffer_size,
uint8_t *  decompressed_buffer,
const size_t  decompressed_size 
)

Definition at line 185 of file Compressor.cpp.

References decompress().

188  {
189  try {
190  decompress(compressed_buffer, decompressed_buffer, decompressed_size);
191  return true;
192  } catch (const CompressionFailedError&) {
193  // we will memcpy if we find that the buffer is not compressed
194 
195  if (compressed_size > decompressed_size) {
196  throw std::runtime_error(
197  "compressed buffer size is greater than decompressed buffer size.");
198  }
199  }
200  memcpy(decompressed_buffer, compressed_buffer, decompressed_size);
201  return false;
202 }
size_t decompress(const uint8_t *compressed_buffer, uint8_t *decompressed_buffer, const size_t decompressed_size)
Definition: Compressor.cpp:120
+ Here is the call graph for this function:

◆ getBloscBufferSizes()

void BloscCompressor::getBloscBufferSizes ( const uint8_t *  data_ptr,
size_t *  num_bytes_compressed,
size_t *  num_bytes_uncompressed,
size_t *  block_size 
)

Definition at line 204 of file Compressor.cpp.

References instance.

Referenced by decompress().

207  {
208  blosc_cbuffer_sizes(data_ptr, num_bytes_uncompressed, num_bytes_compressed, block_size);
209 }
+ Here is the caller graph for this function:

◆ getCompressor()

BloscCompressor * BloscCompressor::getCompressor ( )
static

Definition at line 213 of file Compressor.cpp.

References BloscCompressor(), and instance.

213  {
214  static std::mutex compressor_singleton_lock;
215  std::lock_guard<std::mutex> singleton_lock(compressor_singleton_lock);
216  if (instance == NULL) {
217  instance = new BloscCompressor();
218  }
219 
220  return instance;
221 }
static BloscCompressor * instance
Definition: Compressor.h:89
+ Here is the call graph for this function:

◆ getScratchSpaceSize()

size_t BloscCompressor::getScratchSpaceSize ( const size_t  len) const
inline

Definition at line 47 of file Compressor.h.

References decompress().

Referenced by compress().

47  {
48  return static_cast<size_t>(len * 1.1);
49  }
+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ setCompressor()

int BloscCompressor::setCompressor ( std::string &  compressor)

Definition at line 228 of file Compressor.cpp.

References compressor_lock.

228  {
229  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
230  // Blosc is resilent enough to detect that the comprressor that was provided to it was
231  // supported or not. If the compressor is invalid or not supported it will simply keep
232  // current compressor.
233  return blosc_set_compressor(compressor_name.c_str());
234 }
std::mutex compressor_lock
Definition: Compressor.h:88

◆ setThreads()

int BloscCompressor::setThreads ( size_t  num_threads)

Definition at line 223 of file Compressor.cpp.

References compressor_lock.

223  {
224  std::lock_guard<std::mutex> compressor_lock_(compressor_lock);
225  return blosc_set_nthreads(static_cast<int>(num_threads));
226 }
std::mutex compressor_lock
Definition: Compressor.h:88

Member Data Documentation

◆ compressor_lock

std::mutex BloscCompressor::compressor_lock
private

◆ instance

BloscCompressor * BloscCompressor::instance = NULL
staticprivate

Definition at line 89 of file Compressor.h.

Referenced by getBloscBufferSizes(), and getCompressor().


The documentation for this class was generated from the following files: