Fawkes API  Fawkes Development Version
openprs_comm.cpp
1 
2 /***************************************************************************
3  * openprs_comm.cpp - OpenPRS communication wrapper for Fawkes
4  *
5  * Created: Mon Aug 18 14:55:51 2014
6  * Copyright 2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version. A runtime exception applies to
13  * this software (see LICENSE.GPL_WRE file mentioned below for details).
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
21  */
22 
23 #include "openprs_comm.h"
24 
25 #include "openprs_server_proxy.h"
26 
27 #include <core/exception.h>
28 #include <core/exceptions/system.h>
29 #include <utils/sub_process/proc.h>
30 
31 // clang-format off
32 // The OpenPRS headers have an include problem and require this ordering.
33 #include <opaque-pub.h>
34 #include <mp-pub.h>
35 // clang-format on
36 #include <unistd.h>
37 
38 // these exist in libExtMP and are exported, but not mentioned in the header
39 extern "C" {
40 void send_message_string_socket(int socket, Symbol rec, PString message);
41 void broadcast_message_string_socket(int socket, PString message);
42 void
43 multicast_message_string_socket(int socket, unsigned int nb_recs, Symbol *recs, PString message);
44 }
45 
46 namespace fawkes {
47 
48 /** @class OpenPRSComm <plugins/openprs/utils/openprs_comm.h>
49  * OpenPRS communication wrapper.
50  * This class provides communication facilities via the OpenPRS message passer
51  * as well as through the OpenPRS server proxy.
52  * @author Tim Niemueller
53  */
54 
55 /** Constructor.
56  * @param local_name the local name with which the communication wrapper will be
57  * registered to the message parser. This can be used to send messages from the
58  * kernel.
59  * @param hostname host where the message passer runs
60  * @param port TCP port where the message passer listens
61  * @param server_proxy server proxy to use to send commands to kernels
62  * @param logger logger for informational messages (optional)
63  */
64 OpenPRSComm::OpenPRSComm(const char * local_name,
65  const char * hostname,
66  unsigned short port,
67  OpenPRSServerProxy *server_proxy,
68  Logger * logger)
69 : name_(local_name),
70  server_proxy_(server_proxy),
71  logger_(logger),
72  io_service_work_(io_service_),
73  sd_mp_socket_(io_service_)
74 
75 {
76  // Protocol MUST be STRINGS_PT or otherwise our message reception code will break
77  mp_socket_ = external_register_to_the_mp_host_prot(local_name, hostname, port, STRINGS_PT);
78  if (mp_socket_ == -1) {
79  throw Exception("Failed to connect to OpenPRS as '%s'", local_name);
80  }
81  io_service_thread_ = std::thread([this]() { this->io_service_.run(); });
82  sd_mp_socket_.assign(dup(mp_socket_));
83  start_recv();
84 }
85 
86 /** Destructor. */
88 {
89  io_service_.stop();
90  io_service_thread_.join();
91  if (mp_socket_ >= 0) {
92  ::close(mp_socket_);
93  }
94 }
95 
96 /** Send a message to an OpenPRS kernel.
97  * @param recipient OpenPRS kernel name to send to
98  * @param message message to send, cf. OpenPRS manual for valid messages
99  */
100 void
101 OpenPRSComm::send_message(const char *recipient, const char *message)
102 {
103  send_message_string_socket(mp_socket_, recipient, (char *)message);
104 }
105 
106 /** Send a message to all OpenPRS kernels.
107  * @param message message to send, cf. OpenPRS manual for valid messages
108  */
109 void
110 OpenPRSComm::broadcast_message(const char *message)
111 {
112  broadcast_message_string_socket(mp_socket_, (char *)message);
113 }
114 
115 /** Send a message to multiple OpenPRS kernel.
116  * @param recipients Vector of OpenPRS kernel names to send to
117  * @param message message to send, cf. OpenPRS manual for valid messages
118  */
119 void
120 OpenPRSComm::multicast_message(std::vector<const char *> &recipients, const char *message)
121 {
122  multicast_message_string_socket(mp_socket_, recipients.size(), &(recipients[0]), (char *)message);
123 }
124 
125 /** Send a message to an OpenPRS kernel.
126  * @param recipient OpenPRS kernel name to send to
127  * @param message message to send, cf. OpenPRS manual for valid messages
128  */
129 void
130 OpenPRSComm::send_message(const std::string &recipient, const std::string &message)
131 {
132  send_message_string_socket(mp_socket_, recipient.c_str(), (char *)message.c_str());
133 }
134 
135 /** Send a message to all OpenPRS kernels.
136  * @param message message to send, cf. OpenPRS manual for valid messages
137  */
138 void
139 OpenPRSComm::broadcast_message(const std::string &message)
140 {
141  broadcast_message_string_socket(mp_socket_, (char *)message.c_str());
142 }
143 
144 /** Send a message to multiple OpenPRS kernel.
145  * @param recipients Vector of OpenPRS kernel names to send to
146  * @param message message to send, cf. OpenPRS manual for valid messages
147  */
148 void
149 OpenPRSComm::multicast_message(const std::vector<std::string> &recipients,
150  const std::string & message)
151 {
152  std::vector<const char *> recs;
153  recs.resize(recipients.size());
154  for (size_t i = 0; i < recipients.size(); ++i) {
155  recs[i] = recipients[i].c_str();
156  }
157  multicast_message_string_socket(mp_socket_, recs.size(), &(recs[0]), (char *)message.c_str());
158 }
159 
160 /** Send a formatted message to an OpenPRS kernel.
161  * @param recipient OpenPRS kernel name to send to
162  * @param format format for message to send according to sprintf()
163  */
164 void
165 OpenPRSComm::send_message_f(const std::string &recipient, const char *format, ...)
166 {
167  va_list arg;
168  va_start(arg, format);
169  char *msg;
170  if (vasprintf(&msg, format, arg) == -1) {
171  va_end(arg);
172  throw OutOfMemoryException("Cannot format OpenPRS client command string");
173  }
174  va_end(arg);
175  send_message_string_socket(mp_socket_, recipient.c_str(), msg);
176  free(msg);
177 }
178 
179 /** Send a formatted message to all OpenPRS kernels.
180  * @param format format for message to send according to sprintf()
181  */
182 void
183 OpenPRSComm::broadcast_message_f(const char *format, ...)
184 {
185  va_list arg;
186  va_start(arg, format);
187  char *msg;
188  if (vasprintf(&msg, format, arg) == -1) {
189  va_end(arg);
190  throw OutOfMemoryException("Cannot format OpenPRS client command string");
191  }
192  va_end(arg);
193  broadcast_message_string_socket(mp_socket_, msg);
194  free(msg);
195 }
196 
197 /** Send a message to multiple OpenPRS kernel.
198  * @param recipients Vector of OpenPRS kernel names to send to
199  * @param format format for message to send according to sprintf()
200  */
201 void
202 OpenPRSComm::multicast_message_f(const std::vector<std::string> &recipients,
203  const char * format,
204  ...)
205 {
206  std::vector<const char *> recs;
207  recs.resize(recipients.size());
208  for (size_t i = 0; i < recipients.size(); ++i) {
209  recs[i] = recipients[i].c_str();
210  }
211  va_list arg;
212  va_start(arg, format);
213  char *msg;
214  if (vasprintf(&msg, format, arg) == -1) {
215  va_end(arg);
216  throw OutOfMemoryException("Cannot format OpenPRS client command string");
217  }
218  va_end(arg);
219  multicast_message_string_socket(mp_socket_, recs.size(), &(recs[0]), msg);
220  free(msg);
221 }
222 
223 /** Transmit a command to an OpenPRS kernel.
224  * This works equivalent to the transmit oprs-server console command.
225  * @param recipient OpenPRS kernel name to send to
226  * @param message command to send, cf. OpenPRS manual for valid commands
227  */
228 void
229 OpenPRSComm::transmit_command(const char *recipient, const char *message)
230 {
231  server_proxy_->transmit_command(recipient, message);
232 }
233 
234 /** Transmit a command to an OpenPRS kernel.
235  * This works equivalent to the transmit oprs-server console command.
236  * @param recipient OpenPRS kernel name to send to
237  * @param message command to send, cf. OpenPRS manual for valid commands
238  */
239 void
240 OpenPRSComm::transmit_command(const std::string &recipient, const std::string &message)
241 {
242  server_proxy_->transmit_command(recipient, message);
243 }
244 
245 /** Transmit a command to an OpenPRS kernel.
246  * This works equivalent to the transmit oprs-server console command.
247  * This function allows to pass a format according to the sprintf()
248  * format and its arguments.
249  * @param recipient OpenPRS kernel name to send to
250  * @param format format string for the command, must be followed by the
251  * appropriate number and types of arguments.
252  */
253 void
254 OpenPRSComm::transmit_command_f(const std::string &recipient, const char *format, ...)
255 {
256  va_list arg;
257  va_start(arg, format);
258  server_proxy_->transmit_command_v(recipient, format, arg);
259  va_end(arg);
260 }
261 
262 void
263 OpenPRSComm::start_recv()
264 {
265  sd_mp_socket_.async_read_some(boost::asio::null_buffers(),
266  boost::bind(&OpenPRSComm::handle_recv,
267  this,
268  boost::asio::placeholders::error));
269 }
270 
271 void
272 OpenPRSComm::handle_recv(const boost::system::error_code &err)
273 {
274  if (!err) {
275  try {
276  std::string sender = read_string_from_socket(sd_mp_socket_);
277  std::string message = read_string_from_socket(sd_mp_socket_);
278 
279  sig_rcvd_(sender, message);
280  } catch (Exception &e) {
281  if (logger_) {
282  logger_->log_warn(name_.c_str(), "Failed to receive message: %s", e.what_no_backtrace());
283  }
284  }
285  } else if (logger_) {
286  logger_->log_warn(name_.c_str(), "Failed to receive message: %s", err.message().c_str());
287  }
288  start_recv();
289 }
290 
291 std::string
292 OpenPRSComm::read_string_from_socket(boost::asio::posix::stream_descriptor &socket)
293 {
294  uint32_t s_size = 0;
295  boost::system::error_code ec;
296  boost::asio::read(socket, boost::asio::buffer(&s_size, sizeof(s_size)), ec);
297  if (ec) {
298  throw Exception("Failed to read string size from socket: %s", ec.message().c_str());
299  }
300  s_size = ntohl(s_size);
301 
302  char s[s_size + 1];
303  boost::asio::read(socket, boost::asio::buffer(s, s_size), ec);
304  if (ec) {
305  throw Exception("Failed to read string content from socket: %s", ec.message().c_str());
306  }
307  s[s_size] = 0;
308 
309  return s;
310 }
311 
312 } // end namespace fawkes
Base class for exceptions in Fawkes.
Definition: exception.h:36
Interface for logging.
Definition: logger.h:42
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
void transmit_command(const char *recipient, const char *message)
Transmit a command to an OpenPRS kernel.
void multicast_message(std::vector< const char * > &recipients, const char *message)
Send a message to multiple OpenPRS kernel.
void multicast_message_f(const std::vector< std::string > &recipients, const char *format,...)
Send a message to multiple OpenPRS kernel.
virtual ~OpenPRSComm()
Destructor.
void broadcast_message_f(const char *format,...)
Send a formatted message to all OpenPRS kernels.
void transmit_command_f(const std::string &recipient, const char *format,...)
Transmit a command to an OpenPRS kernel.
void broadcast_message(const char *message)
Send a message to all OpenPRS kernels.
void send_message_f(const std::string &recipient, const char *format,...)
Send a formatted message to an OpenPRS kernel.
OpenPRSComm(const char *local_name, const char *hostname, unsigned short port, OpenPRSServerProxy *server_proxy, Logger *logger=NULL)
Constructor.
void send_message(const char *recipient, const char *message)
Send a message to an OpenPRS kernel.
Proxy for the OpenPRS server communication.
void transmit_command(const std::string &client_name, const std::string &command)
Transmit a command to an OpenPRS kernel.
void transmit_command_v(const std::string &client_name, const char *format, va_list arg)
Transmit a command to an OpenPRS kernel.
System ran out of memory and desired operation could not be fulfilled.
Definition: system.h:32
Fawkes library namespace.