00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "lux.h"
00024 #include "scene.h"
00025 #include "api.h"
00026 #include "error.h"
00027 #include "paramset.h"
00028 #include "renderfarm.h"
00029 #include "camera.h"
00030
00031 #include <fstream>
00032 #include <boost/asio.hpp>
00033
00034 using namespace boost::posix_time;
00035 using namespace lux;
00036 using boost::asio::ip::tcp;
00037
00038 void FilmUpdaterThread::updateFilm(FilmUpdaterThread *filmUpdaterThread) {
00039
00040
00041 boost::xtime reft;
00042 boost::xtime_get(&reft, boost::TIME_UTC);
00043
00044 while (filmUpdaterThread->signal == SIG_NONE) {
00045
00046
00047 for(;;) {
00048
00049 boost::xtime xt;
00050 boost::xtime_get(&xt, boost::TIME_UTC);
00051 xt.sec += 1;
00052 boost::thread::sleep(xt);
00053
00054 if (filmUpdaterThread->signal == SIG_EXIT)
00055 break;
00056
00057 if (xt.sec - reft.sec > filmUpdaterThread->renderFarm->serverUpdateInterval) {
00058 reft = xt;
00059 break;
00060 }
00061 }
00062
00063 if (filmUpdaterThread->signal == SIG_EXIT)
00064 break;
00065
00066 filmUpdaterThread->renderFarm->updateFilm(filmUpdaterThread->scene);
00067 }
00068 }
00069
00070
00071 void RenderFarm::startFilmUpdater(Scene *scene) {
00072 if (filmUpdateThread == NULL) {
00073 filmUpdateThread = new FilmUpdaterThread(this, scene);
00074 filmUpdateThread->thread = new boost::thread(boost::bind(
00075 FilmUpdaterThread::updateFilm, filmUpdateThread));
00076 } else {
00077 luxError(LUX_ILLSTATE,LUX_ERROR,"RenderFarm::startFilmUpdater() called but update thread already started.");
00078 }
00079 }
00080
00081 void RenderFarm::stopFilmUpdater() {
00082 if (filmUpdateThread != NULL) {
00083 filmUpdateThread->interrupt();
00084 delete filmUpdateThread;
00085 filmUpdateThread = NULL;
00086 }
00087
00088
00089
00090
00091
00092 }
00093
00094 void RenderFarm::decodeServerName(const string &serverName, string &name, string &port) {
00095
00096 size_t idx = serverName.find_last_of(':');
00097 if (idx != string::npos) {
00098
00099
00100 name = serverName.substr(0, idx);
00101 port = serverName.substr(idx + 1);
00102 } else {
00103 name = serverName;
00104 port = "18018";
00105 }
00106 }
00107
00108 bool RenderFarm::connect(const string &serverName) {
00109 {
00110 boost::mutex::scoped_lock lock(serverListMutex);
00111
00112
00113 std::stringstream ss;
00114 try {
00115 ss.str("");
00116 ss << "Connecting server: " << serverName;
00117 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00118
00119 string name, port;
00120 decodeServerName(serverName, name, port);
00121
00122 tcp::iostream stream(name, port);
00123 stream << "ServerConnect" << std::endl;
00124
00125
00126
00127 string result;
00128 if (!getline(stream, result)) {
00129 ss.str("");
00130 ss << "Unable to connect server: " << serverName;
00131 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00132
00133 return false;
00134 }
00135
00136 ss.str("");
00137 ss << "Server connect result: " << result;
00138 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00139
00140 string sid;
00141 if ("OK" != result) {
00142 ss.str("");
00143 ss << "Unable to connect server: " << serverName;
00144 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00145
00146 return false;
00147 } else {
00148
00149 if (!getline(stream, result)) {
00150 ss.str("");
00151 ss << "Unable read session ID from server: " << serverName;
00152 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00153
00154 return false;
00155 }
00156
00157 sid = result;
00158 ss.str("");
00159 ss << "Server session ID: " << sid;
00160 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00161 }
00162
00163 serverInfoList.push_back(ExtRenderingServerInfo(name, port, sid));
00164 } catch (std::exception& e) {
00165 ss.str("");
00166 ss << "Unable to connect server: " << serverName;
00167 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00168
00169 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00170 return false;
00171 }
00172 }
00173
00174 if (netBuffer.rdbuf()->in_avail() > 0)
00175 flush();
00176
00177 return true;
00178 }
00179
00180 void RenderFarm::disconnectAll() {
00181 boost::mutex::scoped_lock lock(serverListMutex);
00182
00183 for (size_t i = 0; i < serverInfoList.size(); i++)
00184 disconnect(serverInfoList[i]);
00185 serverInfoList.clear();
00186 }
00187
00188 void RenderFarm::disconnect(const string &serverName) {
00189 boost::mutex::scoped_lock lock(serverListMutex);
00190
00191 string name, port;
00192 decodeServerName(serverName, name, port);
00193
00194 for (vector<ExtRenderingServerInfo>::iterator it = serverInfoList.begin(); it < serverInfoList.end(); it++ ) {
00195 if (name.compare(it->name) == 0 && port.compare(it->port) == 0) {
00196 disconnect(*it);
00197 serverInfoList.erase(it);
00198 break;
00199 }
00200 }
00201 }
00202
00203 void RenderFarm::disconnect(const ExtRenderingServerInfo &serverInfo) {
00204 std::stringstream ss;
00205 try {
00206 ss.str("");
00207 ss << "Disconnect from server: " <<
00208 serverInfo.name << ":" << serverInfo.port;
00209 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00210
00211 tcp::iostream stream(serverInfo.name, serverInfo.port);
00212 stream << "ServerDisconnect" << std::endl;
00213 stream << serverInfo.sid << std::endl;
00214 } catch (std::exception& e) {
00215 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00216 }
00217 }
00218
00219 void RenderFarm::flush() {
00220 boost::mutex::scoped_lock lock(serverListMutex);
00221
00222 std::stringstream ss;
00223
00224 string commands = netBuffer.str();
00225
00226 ss.str("");
00227 ss << "Compiled scene size: " << (commands.size() / 1024) << "KBytes";
00228 luxError(LUX_NOERROR, LUX_DEBUG, ss.str().c_str());
00229
00230
00231 for (size_t i = 0; i < serverInfoList.size(); i++) {
00232 if(serverInfoList[i].flushed == false) {
00233 try {
00234 ss.str("");
00235 ss << "Sending commands to server: " <<
00236 serverInfoList[i].name << ":" << serverInfoList[i].port;
00237 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00238
00239 tcp::iostream stream(serverInfoList[i].name, serverInfoList[i].port);
00240 stream << commands << std::endl;
00241
00242 serverInfoList[i].flushed = true;
00243 } catch (std::exception& e) {
00244 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00245 }
00246 }
00247 }
00248
00249
00250 if (serverInfoList.size() > 0) {
00251 ss.str("");
00252 ss << "All servers are aligned";
00253 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00254 }
00255 }
00256
00257 void RenderFarm::updateFilm(Scene *scene) {
00258
00259
00260 boost::mutex::scoped_lock lock(serverListMutex);
00261
00262
00263 Film *film = scene->camera->film;
00264
00265 std::stringstream ss;
00266 for (size_t i = 0; i < serverInfoList.size(); i++) {
00267 try {
00268 ss.str("");
00269 ss << "Getting samples from: " <<
00270 serverInfoList[i].name << ":" << serverInfoList[i].port;
00271 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00272
00273
00274
00275 boost::asio::io_service ioService;
00276 tcp::resolver resolver(ioService);
00277 tcp::resolver::query query(serverInfoList[i].name, serverInfoList[i].port);
00278 tcp::resolver::iterator iterator = resolver.resolve(query);
00279
00280
00281 tcp::socket slaveSocket(ioService);
00282 slaveSocket.connect(*iterator);
00283
00284
00285 boost::asio::socket_base::keep_alive option(true);
00286 slaveSocket.set_option(option);
00287 #if defined(__linux__) || defined(__MACOSX__)
00288
00289 const int nativeSocket = static_cast<int>(slaveSocket.native());
00290 int optval = 3;
00291 const socklen_t optlen = sizeof(optval);
00292 setsockopt(nativeSocket, SOL_TCP, TCP_KEEPCNT, &optval, optlen);
00293 optval = 30;
00294 setsockopt(nativeSocket, SOL_TCP, TCP_KEEPIDLE, &optval, optlen);
00295 optval = 5;
00296 setsockopt(nativeSocket, SOL_TCP, TCP_KEEPINTVL, &optval, optlen);
00297 #endif
00298
00299
00300 ss.str("");
00301 ss << "luxGetFilm" << std::endl;
00302 boost::asio::write(slaveSocket, boost::asio::buffer(ss.str().c_str(), ss.str().size()));
00303 ss.str("");
00304 ss << serverInfoList[i].sid << std::endl;
00305 boost::asio::write(slaveSocket, boost::asio::buffer(ss.str().c_str(), ss.str().size()));
00306
00307
00308 std::stringstream compressedStream(std::stringstream::in |
00309 std::stringstream::out | std::stringstream::binary);
00310 boost::array<char, 16 * 1024 > buf;
00311 std::streamsize compressedSize = 0;
00312 for (bool done = false; !done;) {
00313 boost::system::error_code error;
00314 std::size_t count = boost::asio::read(
00315 slaveSocket, boost::asio::buffer(buf),
00316 boost::asio::transfer_all(), error);
00317 if (error == boost::asio::error::eof)
00318 done = true;
00319 else if (error)
00320 throw boost::system::system_error(error);
00321
00322 compressedStream.write(buf.c_array(), count);
00323 compressedSize += count;
00324 }
00325
00326 slaveSocket.close();
00327
00328
00329 serverInfoList[i].numberOfSamplesReceived += film->UpdateFilm(compressedStream);
00330
00331 ss.str("");
00332 ss << "Samples received from '" <<
00333 serverInfoList[i].name << ":" << serverInfoList[i].port << "' (" <<
00334 (compressedSize / 1024) << " Kbytes)";
00335 luxError(LUX_NOERROR, LUX_INFO, ss.str().c_str());
00336
00337 serverInfoList[i].timeLastContact = second_clock::local_time();
00338 } catch (std::exception& e) {
00339 ss.str("");
00340 ss << "Error while communicating with server: " <<
00341 serverInfoList[i].name << ":" << serverInfoList[i].port;
00342 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00343 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00344 }
00345 }
00346 }
00347
00348 void RenderFarm::send(const std::string &command) {
00349 netBuffer << command << std::endl;
00350 }
00351
00352 void RenderFarm::sendFile(std::string file) {
00353 std::string s;
00354 std::ifstream in(file.c_str(), std::ios::in | std::ios::binary);
00355
00356
00357 in.seekg(0, std::ifstream::end);
00358
00359 const int len = static_cast<int> (in.tellg());
00360 in.seekg(0, std::ifstream::beg);
00361
00362 if (in.fail()) {
00363 std::stringstream ss;
00364 ss << "There was an error while checking the size of file '" << file;
00365 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00366
00367
00368 netBuffer << "0\n";
00369 } else {
00370
00371 char *buf = new char[len];
00372
00373 in.read(buf, len);
00374
00375 if (in.fail()) {
00376 std::stringstream ss;
00377 ss << "There was an error while reading file '" << file << "'";
00378 luxError(LUX_SYSTEM, LUX_ERROR, ss.str().c_str());
00379
00380
00381 netBuffer << "0\n";
00382 } else {
00383
00384 netBuffer << len << "\n";
00385
00386
00387 netBuffer.write(buf, len);
00388 }
00389
00390 delete buf;
00391 }
00392
00393 in.close();
00394 }
00395
00396 void RenderFarm::send(const std::string &command, const std::string &name,
00397 const ParamSet ¶ms) {
00398 try {
00399 netBuffer << command << std::endl << name << std::endl;
00400 boost::archive::text_oarchive oa(netBuffer);
00401 oa << params;
00402 netBuffer << "\n";
00403
00404
00405 std::string file;
00406 file = "";
00407 file = params.FindOneString(std::string("mapname"), file);
00408 if (file.size())
00409 sendFile(file);
00410
00411 file = "";
00412 file = params.FindOneString(std::string("iesname"), file);
00413 if (file.size())
00414 sendFile(file);
00415 } catch (std::exception& e) {
00416 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00417 }
00418 }
00419
00420 void RenderFarm::send(const std::string &command, const std::string &name) {
00421 try {
00422 netBuffer << command << std::endl << name << std::endl;
00423 } catch (std::exception& e) {
00424 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00425 }
00426 }
00427
00428 void RenderFarm::send(const string &command, float x, float y) {
00429 try {
00430 netBuffer << command << std::endl << x << ' ' << y << std::endl;
00431 } catch (std::exception& e) {
00432 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00433 }
00434 }
00435
00436 void RenderFarm::send(const std::string &command, float x, float y, float z) {
00437 try {
00438 netBuffer << command << std::endl << x << ' ' << y << ' ' << z << std::endl;
00439 } catch (std::exception& e) {
00440 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00441 }
00442 }
00443
00444 void RenderFarm::send(const std::string &command, float a, float x, float y,
00445 float z) {
00446 try {
00447 netBuffer << command << std::endl << a << ' ' << x << ' ' << y << ' ' << z << std::endl;
00448 } catch (std::exception& e) {
00449 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00450 }
00451 }
00452
00453 void RenderFarm::send(const std::string &command, float ex, float ey, float ez,
00454 float lx, float ly, float lz, float ux, float uy, float uz) {
00455 try {
00456 netBuffer << command << std::endl << ex << ' ' << ey << ' ' << ez << ' ' << lx << ' ' << ly << ' ' << lz << ' ' << ux << ' ' << uy << ' ' << uz << std::endl;
00457 } catch (std::exception& e) {
00458 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00459 }
00460 }
00461
00462 void RenderFarm::send(const std::string &command, float tr[16]) {
00463 try {
00464 netBuffer << command << std::endl;
00465 for (int i = 0; i < 16; i++)
00466 netBuffer << tr[i] << ' ';
00467 netBuffer << std::endl;
00468 } catch (std::exception& e) {
00469 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00470 }
00471 }
00472
00473 void RenderFarm::send(const std::string &command, const string &name,
00474 const string &type, const string &texname, const ParamSet ¶ms) {
00475 try {
00476 netBuffer << command << std::endl << name << std::endl << type << std::endl << texname << std::endl;
00477 boost::archive::text_oarchive oa(netBuffer);
00478 oa << params;
00479 netBuffer << "\n";
00480
00481
00482 std::string file = "";
00483 file = params.FindOneString(std::string("filename"), file);
00484 if (file.size())
00485 sendFile(file);
00486 } catch (std::exception& e) {
00487 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00488 }
00489 }
00490
00491 void RenderFarm::send(const std::string &command, const std::string &name, float a, float b, const std::string &transform) {
00492 try {
00493 netBuffer << command << std::endl << name << ' ' << a << ' ' << b << ' ' << transform << std::endl;
00494 } catch (std::exception& e) {
00495 luxError(LUX_SYSTEM, LUX_ERROR, e.what());
00496 }
00497 }
00498
00499 int RenderFarm::getServersStatus(RenderingServerInfo *info, int maxInfoCount) {
00500 ptime now = second_clock::local_time();
00501 for (size_t i = 0; i < min<size_t>(serverInfoList.size(), maxInfoCount); ++i) {
00502 info[i].serverIndex = i;
00503 info[i].name = serverInfoList[i].name.c_str();
00504 info[i].port = serverInfoList[i].port.c_str();
00505 info[i].sid = serverInfoList[i].sid.c_str();
00506
00507 time_duration td = now - serverInfoList[i].timeLastContact;
00508 info[i].secsSinceLastContact = td.seconds();
00509 info[i].numberOfSamplesReceived = serverInfoList[i].numberOfSamplesReceived;
00510 }
00511
00512 return serverInfoList.size();
00513 }