bes  Updated for version 3.20.8
DmrppD4Opaque.cc
1 
2 // -*- mode: c++; c-basic-offset:4 -*-
3 
4 // This file is part of the BES
5 
6 // Copyright (c) 2016 OPeNDAP, Inc.
7 // Author: James Gallagher <jgallagher@opendap.org>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
22 //
23 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
24 
25 #include "config.h"
26 
27 #include <cstring>
28 
29 #include <string>
30 #include <queue>
31 
32 #include <BESLog.h>
33 #include <BESInternalError.h>
34 #include <BESDebug.h>
35 
36 #include "CurlHandlePool.h"
37 #include "DmrppRequestHandler.h"
38 #include "DmrppD4Opaque.h"
39 
40 using namespace libdap;
41 using namespace std;
42 
43 namespace dmrpp {
44 
45 void
46 DmrppD4Opaque::_duplicate(const DmrppD4Opaque &)
47 {
48 }
49 
50 DmrppD4Opaque::DmrppD4Opaque(const string &n) : D4Opaque(n), DmrppCommon()
51 {
52 }
53 
54 DmrppD4Opaque::DmrppD4Opaque(const string &n, const string &d) : D4Opaque(n, d), DmrppCommon()
55 {
56 }
57 
58 BaseType *
59 DmrppD4Opaque::ptr_duplicate()
60 {
61  return new DmrppD4Opaque(*this);
62 }
63 
64 DmrppD4Opaque::DmrppD4Opaque(const DmrppD4Opaque &rhs) : D4Opaque(rhs), DmrppCommon(rhs)
65 {
66  _duplicate(rhs);
67 }
68 
69 DmrppD4Opaque &
70 DmrppD4Opaque::operator=(const DmrppD4Opaque &rhs)
71 {
72  if (this == &rhs)
73  return *this;
74 
75  dynamic_cast<D4Opaque &>(*this) = rhs; // run Constructor=
76 
77  _duplicate(rhs);
78  DmrppCommon::m_duplicate_common(rhs);
79 
80  return *this;
81 }
82 
83 void DmrppD4Opaque::insert_chunk(shared_ptr<Chunk> chunk)
84 {
85  // The size, in elements, of each of the chunk's dimensions.
86  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
87  if (chunk_shape.size() != 1) throw BESInternalError("Opaque variables' chunks can only have one dimension.", __FILE__, __LINE__);
88 
89  // The chunk's origin point a.k.a. its "position in array".
90  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
91 
92  char *source_buffer = chunk->get_rbuf();
93  unsigned char *target_buffer = get_buf();
94 
95  memcpy(target_buffer + chunk_origin[0], source_buffer, chunk_shape[0]);
96 }
97 
98 #if 0
99 // This is the old code based on DmrppArray. jhrg 9/15/20
100 void DmrppD4Opaque::read_chunks_parallel()
101 {
102  vector<Chunk> &chunk_refs = get_chunk_vec();
103  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
104 
105  // This is not needed here - Opaque is never constrained - but using it
106  // means we can reuse the DmrppArray::read_chunks_parallel() method's logic.
107  // TODO Replace with a more efficient version. jhrg 5/3/18
108  queue<Chunk*> chunks_to_read;
109 
110  // Look at all the chunks
111  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
112  chunks_to_read.push(&*c);
113  }
114 
115 #if !HAVE_CURL_MULTI_API
116  if (DmrppRequestHandler::d_use_parallel_transfers)
117  LOG("The DMR++ handler is configured to use parallel transfers, but the libcurl Multi API is not present, defaulting to serial transfers");
118 #endif
119 
120  if (DmrppRequestHandler::d_use_parallel_transfers && have_curl_multi_api) {
121  // This is the parallel version of the code. It reads a set of chunks in parallel
122  // using the multi curl API, then inserts them, then reads the next set, ... jhrg 5/1/18
123  unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
124  dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
125 
126  // Look only at the chunks we need, found above. jhrg 4/30/18
127  while (chunks_to_read.size() > 0) {
128  queue<Chunk*> chunks_to_insert;
129  for (unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
130  Chunk *chunk = chunks_to_read.front();
131  chunks_to_read.pop();
132 
133  chunk->set_rbuf_to_size();
134  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
135  if (!handle) throw BESInternalError("No more libcurl handles.", __FILE__, __LINE__);
136 
137  mhandle->add_easy_handle(handle);
138 
139  chunks_to_insert.push(chunk);
140  }
141 
142  mhandle->read_data(); // read and decompress chunks, then remove the easy_handles
143 
144  while (chunks_to_insert.size() > 0) {
145  Chunk *chunk = chunks_to_insert.front();
146  chunks_to_insert.pop();
147 
148  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 /*elem width*/);
149 
150  insert_chunk(chunk);
151  }
152  }
153  }
154  else {
155  // This version is the 'serial' version of the code. It reads a chunk, inserts it,
156  // reads the next one, and so on.
157  while (chunks_to_read.size() > 0) {
158  Chunk *chunk = chunks_to_read.front();
159  chunks_to_read.pop();
160 
161  chunk->read_chunk();
162 
163  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 /*elem width*/);
164 
165  insert_chunk(chunk);
166  }
167  }
168 
169  set_read_p(true);
170 }
171 #endif
172 
173 void DmrppD4Opaque::read_chunks()
174 {
175  for (auto chunk : get_chunks()) {
176  chunk->read_chunk();
177  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), 1 /*elem width*/);
178  insert_chunk(chunk);
179  }
180 
181  set_read_p(true);
182 }
183 
193 bool
194 DmrppD4Opaque::read()
195 {
196  if (read_p()) return true;
197 
198  // if there are no chunks, use read a single contiguous block of data
199  // and store it in the object. Note that DmrppCommon uses a single Chunk
200  // instance to hold 'contiguous' data.
201  if (get_chunk_dimension_sizes().empty()) {
202  // read_atomic() returns a pointer to the Chunk data. When the Chunk
203  // instance is freed, this memory goes away.
204  char *data = read_atomic(name());
205  val2buf(data); // yes, it's not type-safe
206  }
207  else {
208  // Handle the more complex case where the data is chunked.
209  read_chunks();
210  }
211 
212  return true;
213 }
214 
215 void DmrppD4Opaque::dump(ostream & strm) const
216 {
217  strm << BESIndent::LMarg << "DmrppD4Opaque::dump - (" << (void *) this << ")" << endl;
218  BESIndent::Indent();
219  DmrppCommon::dump(strm);
220  D4Opaque::dump(strm);
221  strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/ endl;
222  BESIndent::UnIndent();
223 }
224 
225 } // namespace dmrpp
226 
exception thrown if internal error encountered