30 #include "BESInternalError.h"
32 #include "CurlUtils.h"
34 #include "DmrppRequestHandler.h"
35 #include "CurlHandlePool.h"
36 #include "DmrppCommon.h"
37 #include "DmrppArray.h"
38 #include "DmrppNames.h"
40 #include "SuperChunk.h"
42 #define prolog std::string("SuperChunk::").append(__func__).append("() - ")
44 using std::stringstream;
51 string SuperChunk::get_curl_range_arg_string() {
52 return curl::get_range_arg_string(d_offset, d_size);
65 bool chunk_was_added =
false;
67 d_chunks.push_back(candidate_chunk);
68 d_offset = candidate_chunk->get_offset();
69 d_size = candidate_chunk->get_size();
70 d_data_url = candidate_chunk->get_data_url();
71 chunk_was_added =
true;
73 else if(is_contiguous(candidate_chunk) ){
74 this->d_chunks.push_back(candidate_chunk);
75 d_size += candidate_chunk->get_size();
76 chunk_was_added =
true;
78 return chunk_was_added;
94 bool SuperChunk::is_contiguous(
const std::shared_ptr<Chunk> candidate_chunk) {
96 bool contiguous = candidate_chunk->get_data_url() == d_data_url;
99 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
112 void SuperChunk::map_chunks_to_buffer()
114 unsigned long long bindex = 0;
115 for(
const auto &chunk : d_chunks){
116 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0,
false);
117 bindex += chunk->get_size();
120 msg <<
"ERROR The computed buffer index, " << bindex <<
" is larger than expected size of the SuperChunk. ";
121 msg <<
"d_size: " << d_size;
132 void SuperChunk::read_aggregate_bytes()
137 Chunk chunk(d_data_url,
"NOT_USED", d_size, d_offset);
139 chunk.set_read_buffer(d_read_buffer, d_size,0,
false);
141 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
143 throw BESInternalError(prolog +
"No more libcurl handles.", __FILE__, __LINE__);
147 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
150 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
155 if (d_size != chunk.get_bytes_read()) {
157 oss <<
"Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() <<
", expected: " << d_size;
170 BESDEBUG(MODULE, prolog <<
"SuperChunk (" << (
void **)
this <<
") has already been read! Returning." << endl);
177 d_read_buffer =
new char[d_size];
183 map_chunks_to_buffer();
188 read_aggregate_bytes();
194 for(
auto chunk : d_chunks){
195 chunk->set_is_read(
true);
196 chunk->set_bytes_read(chunk->get_size());
205 void SuperChunk::read_and_copy(
DmrppArray *target_array) {
206 BESDEBUG(MODULE, prolog <<
"BEGIN" << endl );
210 vector<unsigned int> constrained_array_shape = target_array->
get_shape(
true);
212 for(
auto &chunk :d_chunks){
217 vector<unsigned int> target_element_address = chunk->get_position_in_array();
218 vector<unsigned int> chunk_source_address(target_array->dimensions(), 0);
220 target_array->insert_chunk(
222 &target_element_address,
223 &chunk_source_address,
225 constrained_array_shape);
228 BESDEBUG(MODULE, prolog <<
"END" << endl );
234 void SuperChunk::read_and_copy_unconstrained(DmrppArray *target_array) {
235 BESDEBUG(MODULE, prolog <<
"BEGIN" << endl );
240 const vector<unsigned int> array_shape = target_array->get_shape(
true);
242 const vector<unsigned int> chunk_shape = target_array->get_chunk_dimension_sizes();
245 for(
auto &chunk :d_chunks){
246 if (target_array->is_deflate_compression() || target_array->is_shuffle_compression())
247 chunk->inflate_chunk(target_array->is_deflate_compression(), target_array->is_shuffle_compression(),
248 target_array->get_chunk_size_in_elements(), target_array->var()->width());
250 vector<unsigned int> target_element_address = chunk->get_position_in_array();
251 vector<unsigned int> chunk_source_address(target_array->dimensions(), 0);
253 target_array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
256 BESDEBUG(MODULE, prolog <<
"END" << endl );
267 msg <<
"[SuperChunk: " << (
void **)
this;
268 msg <<
" offset: " << d_offset;
269 msg <<
" size: " << d_size ;
270 msg <<
" chunk_count: " << d_chunks.size();
275 for (
auto chunk: d_chunks) {
276 msg << chunk->to_string() << endl;
exception thrown if internal error encountered
Extend libdap::Array so that a handler can read data using a DMR++ file.
virtual std::vector< unsigned int > get_shape(bool constrained)
Get the array shape.
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.
virtual void read()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.