37 #include "oprs_protobuf.h"
39 #include <core/exception.h>
40 #include <core/threading/mutex_locker.h>
41 #include <google/protobuf/descriptor.h>
42 #include <protobuf_comm/client.h>
43 #include <protobuf_comm/peer.h>
44 #include <protobuf_comm/server.h>
47 #include <oprs_f-pub.h>
49 using namespace google::protobuf;
50 using namespace protobuf_comm;
52 namespace oprs_protobuf {
66 OpenPRSProtobuf::OpenPRSProtobuf(std::vector<std::string> &proto_path)
67 : message_register_(new
MessageRegister(proto_path)), server_(NULL), next_client_id_(0)
74 for (
auto c : clients_) {
80 message_register_.reset();
89 if ((port > 0) && !server_) {
93 boost::bind(&OpenPRSProtobuf::handle_server_client_connected,
this, _1, _2));
95 boost::bind(&OpenPRSProtobuf::handle_server_client_disconnected,
this, _1, _2));
97 boost::bind(&OpenPRSProtobuf::handle_server_client_msg,
this, _1, _2, _3, _4));
99 boost::bind(&OpenPRSProtobuf::handle_server_client_fail,
this, _1, _2, _3, _4));
123 const std::string &crypto_key,
124 const std::string &cipher)
127 recv_port = send_port;
131 address, send_port, recv_port, &*message_register_, crypto_key, cipher);
136 peer_id = ++next_client_id_;
137 peers_[peer_id] = peer;
141 boost::bind(&OpenPRSProtobuf::handle_peer_msg,
this, peer_id, _1, _2, _3, _4));
143 boost::bind(&OpenPRSProtobuf::handle_peer_recv_error,
this, peer_id, _1, _2));
145 boost::bind(&OpenPRSProtobuf::handle_peer_send_error,
this, peer_id, _1));
147 return build_long_long(peer_id);
149 return build_long_long(0);
163 const std::string &crypto_key,
164 const std::string &cipher)
198 if (peers_.find(peer_id) != peers_.end()) {
199 delete peers_[peer_id];
200 peers_.erase(peer_id);
211 const std::string &crypto_key,
212 const std::string &cipher)
214 if (peers_.find(peer_id) != peers_.end()) {
215 peers_[peer_id]->setup_crypto(crypto_key, cipher);
227 message_register_->add_message_type(full_name);
229 }
catch (std::runtime_error &e) {
240 std::shared_ptr<google::protobuf::Message> *
243 std::shared_ptr<google::protobuf::Message> m = message_register_->new_message_for(full_name);
244 return new std::shared_ptr<google::protobuf::Message>(m);
254 std::shared_ptr<google::protobuf::Message> *m =
255 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
257 return build_pointer(
new std::shared_ptr<google::protobuf::Message>());
259 return build_pointer(
new std::shared_ptr<google::protobuf::Message>(*m));
272 std::shared_ptr<google::protobuf::Message> *m =
273 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
288 TermList tl = sl_make_slist();
290 std::shared_ptr<google::protobuf::Message> *m =
291 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
293 return build_term_l_list_from_c_list(tl);
295 const Descriptor *desc = (*m)->GetDescriptor();
296 const int field_count = desc->field_count();
297 for (
int i = 0; i < field_count; ++i) {
298 tl = build_term_list(tl, build_string(desc->field(i)->name().c_str()));
300 return build_term_l_list_from_c_list(tl);
311 std::shared_ptr<google::protobuf::Message> *m =
312 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
314 return build_id(declare_atom(
"INVALID-MESSAGE"));
316 const Descriptor * desc = (*m)->GetDescriptor();
317 const FieldDescriptor *field = desc->FindFieldByName(field_name);
319 return build_id(declare_atom(
"DOES-NOT-EXIST"));
321 switch (field->type()) {
322 case FieldDescriptor::TYPE_DOUBLE:
return build_id(declare_atom(
"DOUBLE"));
323 case FieldDescriptor::TYPE_FLOAT:
return build_id(declare_atom(
"FLOAT"));
324 case FieldDescriptor::TYPE_INT64:
return build_id(declare_atom(
"INT64"));
325 case FieldDescriptor::TYPE_UINT64:
return build_id(declare_atom(
"UINT64"));
326 case FieldDescriptor::TYPE_INT32:
return build_id(declare_atom(
"INT32"));
327 case FieldDescriptor::TYPE_FIXED64:
return build_id(declare_atom(
"FIXED64"));
328 case FieldDescriptor::TYPE_FIXED32:
return build_id(declare_atom(
"FIXED32"));
329 case FieldDescriptor::TYPE_BOOL:
return build_id(declare_atom(
"BOOL"));
330 case FieldDescriptor::TYPE_STRING:
return build_id(declare_atom(
"STRING"));
331 case FieldDescriptor::TYPE_MESSAGE:
return build_id(declare_atom(
"MESSAGE"));
332 case FieldDescriptor::TYPE_BYTES:
return build_id(declare_atom(
"BYTES"));
333 case FieldDescriptor::TYPE_UINT32:
return build_id(declare_atom(
"UINT32"));
334 case FieldDescriptor::TYPE_ENUM:
return build_id(declare_atom(
"ENUM"));
335 case FieldDescriptor::TYPE_SFIXED32:
return build_id(declare_atom(
"SFIXED32"));
336 case FieldDescriptor::TYPE_SFIXED64:
return build_id(declare_atom(
"SFIXED64"));
337 case FieldDescriptor::TYPE_SINT32:
return build_id(declare_atom(
"SINT32"));
338 case FieldDescriptor::TYPE_SINT64:
return build_id(declare_atom(
"SINT64"));
339 default:
return build_id(declare_atom(
"UNKNOWN"));
352 std::shared_ptr<google::protobuf::Message> *m =
353 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
357 const Descriptor * desc = (*m)->GetDescriptor();
358 const FieldDescriptor *field = desc->FindFieldByName(field_name);
362 const Reflection *refl = (*m)->GetReflection();
364 if (field->is_repeated()) {
365 return (refl->FieldSize(**m, field) > 0);
367 return refl->HasField(**m, field);
379 std::shared_ptr<google::protobuf::Message> *m =
380 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
382 return build_id(declare_atom(
"INVALID-MESSAGE"));
384 const Descriptor * desc = (*m)->GetDescriptor();
385 const FieldDescriptor *field = desc->FindFieldByName(field_name);
387 return build_id(declare_atom(
"DOES-NOT-EXIST"));
388 switch (field->label()) {
389 case FieldDescriptor::LABEL_OPTIONAL:
return build_id(declare_atom(
"OPTIONAL"));
390 case FieldDescriptor::LABEL_REQUIRED:
return build_id(declare_atom(
"REQUIRED"));
391 case FieldDescriptor::LABEL_REPEATED:
return build_id(declare_atom(
"REPEATED"));
392 default:
return build_id(declare_atom(
"UNKNOWN"));
404 std::shared_ptr<google::protobuf::Message> *m =
405 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
407 return build_id(declare_atom(
"INVALID-MESSAGE"));
409 const Descriptor * desc = (*m)->GetDescriptor();
410 const FieldDescriptor *field = desc->FindFieldByName(field_name);
412 return build_id(declare_atom(
"DOES-NOT-EXIST"));
413 const Reflection *refl = (*m)->GetReflection();
414 if (field->type() != FieldDescriptor::TYPE_MESSAGE && !refl->HasField(**m, field)) {
417 return build_id(declare_atom(
"NOT-SET"));
419 switch (field->type()) {
420 case FieldDescriptor::TYPE_DOUBLE:
return build_float(refl->GetDouble(**m, field));
421 case FieldDescriptor::TYPE_FLOAT:
return build_float(refl->GetFloat(**m, field));
422 case FieldDescriptor::TYPE_INT64:
return build_long_long(refl->GetInt64(**m, field));
423 case FieldDescriptor::TYPE_UINT64:
return build_long_long((
long int)refl->GetUInt64(**m, field));
424 case FieldDescriptor::TYPE_INT32:
return build_integer(refl->GetInt32(**m, field));
425 case FieldDescriptor::TYPE_FIXED64:
return build_long_long((
long int)refl->GetUInt64(**m, field));
426 case FieldDescriptor::TYPE_FIXED32:
return build_long_long(refl->GetUInt32(**m, field));
427 case FieldDescriptor::TYPE_BOOL:
return refl->GetBool(**m, field) ? build_t() : build_nil();
428 case FieldDescriptor::TYPE_STRING:
return build_string(refl->GetString(**m, field).c_str());
429 case FieldDescriptor::TYPE_MESSAGE: {
430 const google::protobuf::Message &mfield = refl->GetMessage(**m, field);
431 google::protobuf::Message * mcopy = mfield.New();
432 mcopy->CopyFrom(mfield);
433 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
434 return build_pointer(ptr);
436 case FieldDescriptor::TYPE_BYTES:
return build_string((
char *)
"bytes");
437 case FieldDescriptor::TYPE_UINT32:
return build_long_long(refl->GetUInt32(**m, field));
438 case FieldDescriptor::TYPE_ENUM:
439 return build_id(declare_atom(refl->GetEnum(**m, field)->name().c_str()));
440 case FieldDescriptor::TYPE_SFIXED32:
return build_integer(refl->GetInt32(**m, field));
441 case FieldDescriptor::TYPE_SFIXED64:
return build_long_long(refl->GetInt64(**m, field));
442 case FieldDescriptor::TYPE_SINT32:
return build_integer(refl->GetInt32(**m, field));
443 case FieldDescriptor::TYPE_SINT64:
return build_long_long(refl->GetInt64(**m, field));
444 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
456 std::shared_ptr<google::protobuf::Message> *m =
457 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
461 const Descriptor * desc = (*m)->GetDescriptor();
462 const FieldDescriptor *field = desc->FindFieldByName(field_name);
467 const Reflection *refl = (*m)->GetReflection();
470 switch (field->type()) {
471 case FieldDescriptor::TYPE_DOUBLE:
472 if (value->type == TT_FLOAT) {
473 refl->SetDouble(m->get(), field, *(value->u.doubleptr));
475 throw std::logic_error(std::string(
"Invalid type, required float for ")
476 + (*m)->GetTypeName() + field_name);
479 case FieldDescriptor::TYPE_FLOAT:
480 if (value->type == TT_FLOAT) {
481 refl->SetFloat(m->get(), field, *(value->u.doubleptr));
483 throw std::logic_error(std::string(
"Invalid type, required float for ")
484 + (*m)->GetTypeName() + field_name);
487 case FieldDescriptor::TYPE_SFIXED64:
488 case FieldDescriptor::TYPE_SINT64:
489 case FieldDescriptor::TYPE_INT64:
490 if (value->type == INTEGER) {
491 refl->SetInt64(m->get(), field, value->u.intval);
492 }
else if (value->type == LONG_LONG) {
493 refl->SetInt64(m->get(), field, value->u.llintval);
495 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ")
496 + (*m)->GetTypeName() + field_name);
499 case FieldDescriptor::TYPE_FIXED64:
500 case FieldDescriptor::TYPE_UINT64:
501 if (value->type == INTEGER) {
502 refl->SetUInt64(m->get(), field, value->u.intval);
503 }
else if (value->type == LONG_LONG) {
504 refl->SetUInt64(m->get(), field, value->u.llintval);
506 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ")
507 + (*m)->GetTypeName() + field_name);
510 case FieldDescriptor::TYPE_SFIXED32:
511 case FieldDescriptor::TYPE_SINT32:
512 case FieldDescriptor::TYPE_INT32:
513 if (value->type == INTEGER) {
514 refl->SetInt32(m->get(), field, value->u.intval);
516 throw std::logic_error(std::string(
"Invalid type, required integer for ")
517 + (*m)->GetTypeName() + field_name);
520 case FieldDescriptor::TYPE_BOOL:
521 if (value->type == TT_ATOM) {
522 if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
523 refl->SetBool(m->get(), field, (value->u.id == lisp_t_sym));
525 throw std::logic_error(std::string(
"Invalid value, allowed are T or NIL for field ")
526 + (*m)->GetTypeName() + field_name);
529 throw std::logic_error(std::string(
"Invalid type, required symbol for ")
530 + (*m)->GetTypeName() + field_name);
533 case FieldDescriptor::TYPE_STRING:
534 if (value->type == STRING) {
535 refl->SetString(m->get(), field, value->u.string);
537 throw std::logic_error(std::string(
"Invalid type, required string for ")
538 + (*m)->GetTypeName() + field_name);
541 case FieldDescriptor::TYPE_MESSAGE:
542 if (value->type == U_POINTER) {
543 std::shared_ptr<google::protobuf::Message> *mfrom =
544 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value->u.u_pointer);
545 Message *mut_msg = refl->MutableMessage(m->get(), field);
546 mut_msg->CopyFrom(**mfrom);
549 throw std::logic_error(std::string(
"Invalid type, required user pointer for ")
550 + (*m)->GetTypeName() + field_name);
553 case FieldDescriptor::TYPE_BYTES:
break;
554 case FieldDescriptor::TYPE_FIXED32:
555 case FieldDescriptor::TYPE_UINT32:
556 if (value->type == INTEGER) {
557 refl->SetUInt32(m->get(), field, value->u.intval);
558 }
else if (value->type == LONG_LONG) {
559 refl->SetUInt32(m->get(), field, value->u.llintval);
561 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ")
562 + (*m)->GetTypeName() + field_name);
565 case FieldDescriptor::TYPE_ENUM: {
566 const char *sym_name = NULL;
567 if (value->type == TT_ATOM) {
568 sym_name = value->u.id;
569 }
else if (value->type == STRING) {
570 sym_name = value->u.string;
572 throw std::logic_error(std::string(
"Invalid type, required symbol or string for ")
573 + (*m)->GetTypeName() + field_name);
576 const EnumDescriptor * enumdesc = field->enum_type();
577 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
579 refl->SetEnum(m->get(), field, enumval);
581 std::string sym_str(sym_name);
582 std::transform(sym_str.begin(),
585 std::ptr_fun<int, int>(std::toupper));
587 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_str);
590 refl->SetEnum(m->get(), field, enumval);
593 "%s: cannot set invalid enum value '%s' (neither '%s') on '%s'",
594 (*m)->GetTypeName().c_str(),
602 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
604 }
catch (std::logic_error &e) {
618 std::shared_ptr<google::protobuf::Message> *m =
619 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
623 const Descriptor * desc = (*m)->GetDescriptor();
624 const FieldDescriptor *field = desc->FindFieldByName(field_name);
629 const Reflection *refl = (*m)->GetReflection();
632 switch (field->type()) {
633 case FieldDescriptor::TYPE_DOUBLE:
634 if (value->type == TT_FLOAT) {
635 refl->AddDouble(m->get(), field, *(value->u.doubleptr));
637 throw std::logic_error(std::string(
"Invalid type, required float for ")
638 + (*m)->GetTypeName() + field_name);
641 case FieldDescriptor::TYPE_FLOAT:
642 if (value->type == TT_FLOAT) {
643 refl->AddFloat(m->get(), field, *(value->u.doubleptr));
645 throw std::logic_error(std::string(
"Invalid type, required float for ")
646 + (*m)->GetTypeName() + field_name);
650 case FieldDescriptor::TYPE_SFIXED64:
651 case FieldDescriptor::TYPE_SINT64:
652 case FieldDescriptor::TYPE_INT64:
653 if (value->type == INTEGER) {
654 refl->AddInt64(m->get(), field, value->u.intval);
655 }
else if (value->type == LONG_LONG) {
656 refl->AddInt64(m->get(), field, value->u.llintval);
658 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ")
659 + (*m)->GetTypeName() + field_name);
663 case FieldDescriptor::TYPE_SFIXED32:
664 case FieldDescriptor::TYPE_SINT32:
665 case FieldDescriptor::TYPE_INT32:
666 if (value->type == INTEGER) {
667 refl->AddInt32(m->get(), field, value->u.intval);
669 throw std::logic_error(std::string(
"Invalid type, required integer for ")
670 + (*m)->GetTypeName() + field_name);
673 case FieldDescriptor::TYPE_BOOL:
674 if (value->type == TT_ATOM) {
675 if (value->u.id == lisp_t_sym || value->u.id == nil_sym) {
676 refl->AddBool(m->get(), field, (value->u.id == lisp_t_sym));
678 throw std::logic_error(std::string(
"Invalid value, allowed are T or NIL for field ")
679 + (*m)->GetTypeName() + field_name);
682 throw std::logic_error(std::string(
"Invalid type, required symbol for ")
683 + (*m)->GetTypeName() + field_name);
686 case FieldDescriptor::TYPE_STRING:
687 if (value->type == STRING) {
688 refl->AddString(m->get(), field, value->u.string);
690 throw std::logic_error(std::string(
"Invalid type, required string for ")
691 + (*m)->GetTypeName() + field_name);
694 case FieldDescriptor::TYPE_MESSAGE:
695 if (value->type == U_POINTER) {
696 std::shared_ptr<google::protobuf::Message> *mfrom =
697 static_cast<std::shared_ptr<google::protobuf::Message> *
>(value->u.u_pointer);
698 Message *mut_msg = refl->AddMessage(m->get(), field);
699 mut_msg->CopyFrom(**mfrom);
702 throw std::logic_error(std::string(
"Invalid type, required user pointer for ")
703 + (*m)->GetTypeName() + field_name);
707 case FieldDescriptor::TYPE_BYTES:
break;
709 case FieldDescriptor::TYPE_FIXED32:
710 case FieldDescriptor::TYPE_UINT32:
711 if (value->type == INTEGER) {
712 refl->AddUInt32(m->get(), field, value->u.intval);
713 }
else if (value->type == LONG_LONG) {
714 refl->AddUInt32(m->get(), field, value->u.llintval);
716 throw std::logic_error(std::string(
"Invalid type, required integer or long long for ")
717 + (*m)->GetTypeName() + field_name);
721 case FieldDescriptor::TYPE_ENUM: {
722 const char *sym_name = NULL;
723 if (value->type == TT_ATOM) {
724 sym_name = value->u.id;
725 }
else if (value->type == STRING) {
726 sym_name = value->u.string;
728 throw std::logic_error(std::string(
"Invalid type, required symbol or string for ")
729 + (*m)->GetTypeName() + field_name);
731 const EnumDescriptor * enumdesc = field->enum_type();
732 const EnumValueDescriptor *enumval = enumdesc->FindValueByName(sym_name);
734 refl->AddEnum(m->get(), field, enumval);
741 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
743 }
catch (std::logic_error &e) {
768 client_id = ++next_client_id_;
769 clients_[client_id] = client;
773 boost::bind(&OpenPRSProtobuf::handle_client_connected,
this, client_id));
774 client->
signal_disconnected().connect(boost::bind(&OpenPRSProtobuf::handle_client_disconnected,
777 boost::asio::placeholders::error));
779 boost::bind(&OpenPRSProtobuf::handle_client_msg,
this, client_id, _1, _2, _3));
781 boost::bind(&OpenPRSProtobuf::handle_client_receive_fail,
this, client_id, _1, _2, _3));
784 return build_long_long(client_id);
795 std::shared_ptr<google::protobuf::Message> *m =
796 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
805 if (server_ && server_clients_.find(client_id) != server_clients_.end()) {
807 server_->
send(server_clients_[client_id], *m);
808 sig_server_sent_(server_clients_[client_id], *m);
809 }
else if (clients_.find(client_id) != clients_.end()) {
811 clients_[client_id]->send(*m);
812 std::pair<std::string, unsigned short> &client_endpoint = client_endpoints_[client_id];
813 sig_client_sent_(client_endpoint.first, client_endpoint.second, *m);
814 }
else if (peers_.find(client_id) != peers_.end()) {
816 peers_[client_id]->send(*m);
817 sig_peer_sent_(client_id, *m);
822 }
catch (google::protobuf::FatalException &e) {
825 }
catch (std::runtime_error &e) {
838 std::shared_ptr<google::protobuf::Message> *m =
839 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
841 fprintf(stderr,
"Cannot send broadcast: invalid message");
846 if (peers_.find(peer_id) == peers_.end())
850 peers_[peer_id]->send(*m);
851 }
catch (google::protobuf::FatalException &e) {
853 "pb-broadcast: failed to broadcast message of type %s: %s\n",
854 (*m)->GetTypeName().c_str(),
858 sig_peer_sent_(peer_id, *m);
872 if (server_clients_.find(client_id) != server_clients_.end()) {
875 server_clients_.erase(client_id);
876 rev_server_clients_.erase(srv_client);
877 }
else if (clients_.find(client_id) != clients_.end()) {
878 delete clients_[client_id];
879 clients_.erase(client_id);
881 }
catch (std::runtime_error &e) {
882 throw fawkes::Exception(
"Failed to disconnect from client %li: %s", client_id, e.what());
895 std::shared_ptr<google::protobuf::Message> *m =
896 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
898 return build_id(declare_atom(
"INVALID-MESSAGE"));
900 const Descriptor * desc = (*m)->GetDescriptor();
901 const FieldDescriptor *field = desc->FindFieldByName(field_name);
903 return build_id(declare_atom(
"DOES-NOT-EXIST"));
905 TermList tl = sl_make_slist();
907 if (field->label() == FieldDescriptor::LABEL_REQUIRED
908 || field->label() == FieldDescriptor::LABEL_OPTIONAL) {
910 return build_term_l_list_from_c_list(tl);
913 const Reflection *refl = (*m)->GetReflection();
914 int field_size = refl->FieldSize(**m, field);
915 for (
int i = 0; i < field_size; ++i) {
916 switch (field->type()) {
917 case FieldDescriptor::TYPE_DOUBLE:
918 tl = build_term_list(tl, build_float(refl->GetRepeatedDouble(**m, field, i)));
920 case FieldDescriptor::TYPE_FLOAT:
921 tl = build_term_list(tl, build_float(refl->GetRepeatedFloat(**m, field, i)));
923 case FieldDescriptor::TYPE_UINT64:
924 case FieldDescriptor::TYPE_FIXED64:
925 tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt64(**m, field, i)));
927 case FieldDescriptor::TYPE_UINT32:
928 case FieldDescriptor::TYPE_FIXED32:
929 tl = build_term_list(tl, build_long_long(refl->GetRepeatedUInt32(**m, field, i)));
931 case FieldDescriptor::TYPE_BOOL:
932 tl = build_term_list(tl, refl->GetRepeatedBool(**m, field, i) ? build_t() : build_nil());
934 case FieldDescriptor::TYPE_STRING:
935 tl = build_term_list(tl, build_string(refl->GetRepeatedString(**m, field, i).c_str()));
937 case FieldDescriptor::TYPE_MESSAGE: {
938 const google::protobuf::Message &msg = refl->GetRepeatedMessage(**m, field, i);
939 google::protobuf::Message * mcopy = msg.New();
940 mcopy->CopyFrom(msg);
941 void *ptr =
new std::shared_ptr<google::protobuf::Message>(mcopy);
942 tl = build_term_list(tl, build_pointer(ptr));
944 case FieldDescriptor::TYPE_BYTES:
945 tl = build_term_list(tl, build_string((
char *)
"bytes"));
947 case FieldDescriptor::TYPE_ENUM:
948 tl = build_term_list(tl,
950 declare_atom(refl->GetRepeatedEnum(**m, field, i)->name().c_str())));
952 case FieldDescriptor::TYPE_SFIXED32:
953 case FieldDescriptor::TYPE_INT32:
954 case FieldDescriptor::TYPE_SINT32:
955 tl = build_term_list(tl, build_integer(refl->GetRepeatedInt32(**m, field, i)));
957 case FieldDescriptor::TYPE_SFIXED64:
958 case FieldDescriptor::TYPE_SINT64:
959 case FieldDescriptor::TYPE_INT64:
960 tl = build_term_list(tl, build_long_long(refl->GetRepeatedInt64(**m, field, i)));
962 default:
throw std::logic_error(
"Unknown protobuf field type encountered");
966 return build_term_l_list_from_c_list(tl);
977 std::shared_ptr<google::protobuf::Message> *m =
978 static_cast<std::shared_ptr<google::protobuf::Message> *
>(msgptr);
982 const Descriptor * desc = (*m)->GetDescriptor();
983 const FieldDescriptor *field = desc->FindFieldByName(field_name);
987 return (field->label() == FieldDescriptor::LABEL_REPEATED);
998 while (!q_server_client_.empty()) {
999 auto &sc = q_server_client_.front();
1000 oprs_assert_server_client_event(std::get<0>(sc),
1004 q_server_client_.pop();
1010 while (!q_client_.empty()) {
1011 auto &c = q_client_.front();
1012 oprs_assert_client_event(std::get<0>(c), std::get<1>(c));
1019 while (!q_msgs_.empty()) {
1020 auto &m = q_msgs_.front();
1021 oprs_assert_message(std::get<0>(m),
1043 return (!(q_server_client_.empty() && q_client_.empty() && q_msgs_.empty()));
1047 OpenPRSProtobuf::oprs_assert_server_client_event(
long int client_id,
1049 unsigned short port,
1052 TermList tl = sl_make_slist();
1053 tl = build_term_list(tl, build_long_long(client_id));
1055 tl = build_term_list(tl, build_string(host.c_str()));
1056 tl = build_term_list(tl, build_integer(port));
1057 add_external_fact((
char *)
"protobuf-server-client-connected", tl);
1059 add_external_fact((
char *)
"protobuf-server-client-disconnected", tl);
1064 OpenPRSProtobuf::oprs_assert_client_event(
long int client_id,
bool connect)
1066 TermList tl = sl_make_slist();
1067 tl = build_term_list(tl, build_long_long(client_id));
1069 add_external_fact((
char *)
"protobuf-client-connected", tl);
1071 add_external_fact((
char *)
"protobuf-client-disconnected", tl);
1076 OpenPRSProtobuf::oprs_assert_message(std::string & endpoint_host,
1077 unsigned short endpoint_port,
1080 std::shared_ptr<google::protobuf::Message> &msg,
1081 OpenPRSProtobuf::ClientType ct,
1082 unsigned int client_id)
1084 TermList tl = sl_make_slist();
1087 gettimeofday(&tv, 0);
1088 void *ptr =
new std::shared_ptr<google::protobuf::Message>(msg);
1090 tl = build_term_list(tl, build_string(msg->GetTypeName().c_str()));
1092 tl = build_term_list(tl, build_integer(comp_id));
1094 tl = build_term_list(tl, build_integer(msg_type));
1096 tl = build_term_list(tl, build_string((client_id == 0) ?
"BROADCAST" :
"STREAM"));
1098 tl = build_term_list(tl, build_long_long(tv.tv_sec));
1099 tl = build_term_list(tl, build_long_long(tv.tv_usec));
1101 tl = build_term_list(tl, build_string(endpoint_host.c_str()));
1102 tl = build_term_list(tl, build_integer(endpoint_port));
1104 tl = build_term_list(tl,
1105 build_string(ct == CT_CLIENT ?
"CLIENT"
1106 : (ct == CT_SERVER ?
"SERVER" :
"PEER")));
1108 tl = build_term_list(tl, build_integer(client_id));
1110 tl = build_term_list(tl, build_pointer(ptr));
1112 add_external_fact((
char *)
"protobuf-msg", tl);
1117 boost::asio::ip::tcp::endpoint &endpoint)
1119 long int client_id = -1;
1122 client_id = ++next_client_id_;
1123 client_endpoints_[client_id] = std::make_pair(endpoint.address().to_string(), endpoint.port());
1124 server_clients_[client_id] = client;
1125 rev_server_clients_[client] = client_id;
1129 std::make_tuple(client_id, endpoint.address().to_string(), endpoint.port(),
true));
1134 const boost::system::error_code &error)
1136 long int client_id = -1;
1139 RevServerClientMap::iterator c;
1140 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1141 client_id = c->second;
1142 rev_server_clients_.erase(c);
1143 server_clients_.erase(client_id);
1147 if (client_id >= 0) {
1148 q_server_client_.
push_locked(std::make_tuple(client_id,
"", 0,
false));
1160 uint16_t component_id,
1162 std::shared_ptr<google::protobuf::Message> msg)
1165 RevServerClientMap::iterator c;
1166 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1167 q_msgs_.
push_locked(std::make_tuple(client_endpoints_[c->second].first,
1168 client_endpoints_[c->second].second,
1185 uint16_t component_id,
1190 RevServerClientMap::iterator c;
1191 if ((c = rev_server_clients_.find(client)) != rev_server_clients_.end()) {
1211 OpenPRSProtobuf::handle_peer_msg(
long int peer_id,
1212 boost::asio::ip::udp::endpoint & endpoint,
1213 uint16_t component_id,
1215 std::shared_ptr<google::protobuf::Message> msg)
1217 q_msgs_.
push_locked(std::make_tuple(endpoint.address().to_string(),
1231 OpenPRSProtobuf::handle_peer_recv_error(
long int peer_id,
1232 boost::asio::ip::udp::endpoint &endpoint,
1236 "Failed to receive peer message from %s:%u: %s\n",
1237 endpoint.address().to_string().c_str(),
1246 OpenPRSProtobuf::handle_peer_send_error(
long int peer_id,
const std::string &msg)
1252 OpenPRSProtobuf::handle_client_connected(
long int client_id)
1254 q_client_.
push_locked(std::make_tuple(client_id,
true));
1258 OpenPRSProtobuf::handle_client_disconnected(
long int client_id,
1259 const boost::system::error_code &error)
1261 q_client_.
push_locked(std::make_tuple(client_id,
false));
1265 OpenPRSProtobuf::handle_client_msg(
long int client_id,
1268 std::shared_ptr<google::protobuf::Message> msg)
1270 q_msgs_.
push_locked(std::make_tuple(
"", 0, comp_id, msg_type, msg, CT_CLIENT, client_id));
1274 OpenPRSProtobuf::handle_client_receive_fail(
long int client_id,
1277 const std::string &msg)
Base class for exceptions in Fawkes.
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
void push_locked(const Type &x)
Push element to queue with lock protection.
Term * oprs_pb_peer_create_local_crypto(const std::string &host, int send_port, int recv_port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Term * oprs_pb_destroy(void *msgptr)
Destroy given message (reference).
bool oprs_pb_register_type(std::string full_name)
Register a new message type.
void oprs_pb_broadcast(long int peer_id, void *msgptr)
Broadcast a message through a peer.
Term * oprs_pb_ref(void *msgptr)
Create new reference to message.
Term * oprs_pb_peer_create_local(const std::string &host, int send_port, int recv_port)
Enable protobuf peer.
Term * oprs_pb_peer_create_crypto(const std::string &host, int port, const std::string &crypto_key="", const std::string &cipher="")
Enable protobuf peer.
Term * oprs_pb_field_label(void *msgptr, std::string field_name)
Get a fields label.
void oprs_pb_set_field(void *msgptr, std::string field_name, Term *value)
Set a field.
std::shared_ptr< google::protobuf::Message > * oprs_create_msg(std::string full_name)
Create a new message of given type.
bool oprs_pb_field_is_list(void *msgptr, std::string field_name)
Check if a given field is a list (repeated field).
void oprs_pb_process()
Process all pending events.
bool oprs_pb_events_pending()
Check if there are pending events.
void oprs_pb_disable_server()
Disable protobuf stream server.
void oprs_pb_send(long int client_id, void *msgptr)
Send message to a specific client.
void oprs_pb_enable_server(int port)
Enable protobuf stream server.
Term * oprs_pb_client_connect(std::string host, int port)
Connect as a client to the given server.
Term * oprs_pb_peer_create(const std::string &host, int port)
Enable protobuf peer.
bool oprs_pb_has_field(void *msgptr, std::string field_name)
Check if message has a specific field.
void oprs_pb_peer_setup_crypto(long int peer_id, const std::string &crypto_key, const std::string &cipher)
Setup crypto for peer.
Term * oprs_pb_field_value(void *msgptr, std::string field_name)
Get properly typed field value.
~OpenPRSProtobuf()
Destructor.
Term * oprs_pb_field_names(void *msgptr)
Get field names of message.
void oprs_pb_add_list(void *msgptr, std::string field_name, Term *value)
Add value to a repeated field.
Term * oprs_pb_field_type(void *msgptr, std::string field_name)
Get type if a specific field.
void oprs_pb_disconnect(long int client_id)
Disconnect a given client.
Term * oprs_pb_field_list(void *msgptr, std::string field_name)
Get list of values of a given message field.
void oprs_pb_peer_destroy(long int peer_id)
Disable peer.
Register to map msg type numbers to Protobuf messages.
Communicate by broadcasting protobuf messages.
signal_received_type & signal_received()
Signal that is invoked when a message has been received.
signal_send_error_type & signal_send_error()
Signal that is invoked when sending a message failed.
signal_recv_error_type & signal_recv_error()
Signal that is invoked when receiving a message failed.
Stream client for protobuf message transmission.
void async_connect(const char *host, unsigned short port)
Asynchronous connect.
boost::signals2::signal< void()> & signal_connected()
Signal that is invoked when the connection has been established.
boost::signals2::signal< void(uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
boost::signals2::signal< void(uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
boost::signals2::signal< void(const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when the connection is closed.
Stream server for protobuf message transmission.
boost::signals2::signal< void(ClientID, const boost::system::error_code &)> & signal_disconnected()
Signal that is invoked when a new client has disconnected.
unsigned int ClientID
ID to identify connected clients.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::string)> & signal_receive_failed()
Signal that is invoked when receiving a message failed.
boost::signals2::signal< void(ClientID, boost::asio::ip::tcp::endpoint &)> & signal_connected()
Signal that is invoked when a new client has connected.
void disconnect(ClientID client)
Disconnect specific client.
boost::signals2::signal< void(ClientID, uint16_t, uint16_t, std::shared_ptr< google::protobuf::Message >)> & signal_received()
Signal that is invoked when a message has been received.
void send(ClientID client, uint16_t component_id, uint16_t msg_type, google::protobuf::Message &m)
Send a message to the given client.