33 #include <BESInternalError.h>
36 #include "CurlHandlePool.h"
37 #include "DmrppRequestHandler.h"
38 #include "DmrppD4Opaque.h"
46 DmrppD4Opaque::_duplicate(
const DmrppD4Opaque &)
50 DmrppD4Opaque::DmrppD4Opaque(
const string &n) : D4Opaque(n), DmrppCommon()
54 DmrppD4Opaque::DmrppD4Opaque(
const string &n,
const string &d) : D4Opaque(n, d), DmrppCommon()
59 DmrppD4Opaque::ptr_duplicate()
61 return new DmrppD4Opaque(*
this);
64 DmrppD4Opaque::DmrppD4Opaque(
const DmrppD4Opaque &rhs) : D4Opaque(rhs), DmrppCommon(rhs)
70 DmrppD4Opaque::operator=(
const DmrppD4Opaque &rhs)
75 dynamic_cast<D4Opaque &
>(*this) = rhs;
78 DmrppCommon::m_duplicate_common(rhs);
83 void DmrppD4Opaque::insert_chunk(shared_ptr<Chunk> chunk)
86 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
87 if (chunk_shape.size() != 1)
throw BESInternalError(
"Opaque variables' chunks can only have one dimension.", __FILE__, __LINE__);
90 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
92 char *source_buffer = chunk->get_rbuf();
93 unsigned char *target_buffer = get_buf();
95 memcpy(target_buffer + chunk_origin[0], source_buffer, chunk_shape[0]);
100 void DmrppD4Opaque::read_chunks_parallel()
102 vector<Chunk> &chunk_refs = get_chunk_vec();
103 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
108 queue<Chunk*> chunks_to_read;
111 for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
112 chunks_to_read.push(&*c);
115 #if !HAVE_CURL_MULTI_API
116 if (DmrppRequestHandler::d_use_parallel_transfers)
117 LOG(
"The DMR++ handler is configured to use parallel transfers, but the libcurl Multi API is not present, defaulting to serial transfers");
120 if (DmrppRequestHandler::d_use_parallel_transfers && have_curl_multi_api) {
123 unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
124 dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
127 while (chunks_to_read.size() > 0) {
128 queue<Chunk*> chunks_to_insert;
129 for (
unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
130 Chunk *chunk = chunks_to_read.front();
131 chunks_to_read.pop();
133 chunk->set_rbuf_to_size();
134 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
135 if (!handle)
throw BESInternalError(
"No more libcurl handles.", __FILE__, __LINE__);
137 mhandle->add_easy_handle(handle);
139 chunks_to_insert.push(chunk);
142 mhandle->read_data();
144 while (chunks_to_insert.size() > 0) {
145 Chunk *chunk = chunks_to_insert.front();
146 chunks_to_insert.pop();
148 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 );
157 while (chunks_to_read.size() > 0) {
158 Chunk *chunk = chunks_to_read.front();
159 chunks_to_read.pop();
163 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 );
173 void DmrppD4Opaque::read_chunks()
175 for (
auto chunk : get_chunks()) {
177 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 );
194 DmrppD4Opaque::read()
196 if (read_p())
return true;
201 if (get_chunk_dimension_sizes().empty()) {
204 char *data = read_atomic(name());
215 void DmrppD4Opaque::dump(ostream & strm)
const
217 strm << BESIndent::LMarg <<
"DmrppD4Opaque::dump - (" << (
void *)
this <<
")" << endl;
219 DmrppCommon::dump(strm);
220 D4Opaque::dump(strm);
221 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
222 BESIndent::UnIndent();
exception thrown if internal error encountered