GRPC C++  1.39.1
method_handler.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
21 
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
40 #if GRPC_ALLOW_EXCEPTIONS
41  try {
42  return handler();
43  } catch (...) {
45  "Unexpected error in RPC handling");
46  }
47 #else // GRPC_ALLOW_EXCEPTIONS
48  return handler();
49 #endif // GRPC_ALLOW_EXCEPTIONS
50 }
51 
55 
56 template <class ResponseType>
58  ResponseType* rsp, ::grpc::Status& status) {
59  GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
63  ops;
64  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
65  param.server_context->initial_metadata_flags());
67  ops.set_compression_level(param.server_context->compression_level());
68  }
69  if (status.ok()) {
70  status = ops.SendMessagePtr(rsp);
71  }
72  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
73  param.call->PerformOps(&ops);
74  param.call->cq()->Pluck(&ops);
75 }
76 
78 
79 template <class RequestType>
81  RequestType* request) {
83  buf.set_buffer(req);
85  &buf, static_cast<RequestType*>(request));
86  buf.Release();
87  if (status->ok()) {
88  return request;
89  }
90  request->~RequestType();
91  return nullptr;
92 }
93 
95 template <class ServiceType, class RequestType, class ResponseType,
96  class BaseRequestType = RequestType,
97  class BaseResponseType = ResponseType>
99  public:
101  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
102  const RequestType*, ResponseType*)>
103  func,
104  ServiceType* service)
105  : func_(func), service_(service) {}
106 
107  void RunHandler(const HandlerParameter& param) final {
108  ResponseType rsp;
109  ::grpc::Status status = param.status;
110  if (status.ok()) {
111  status = CatchingFunctionHandler([this, &param, &rsp] {
112  return func_(service_,
113  static_cast<::grpc::ServerContext*>(param.server_context),
114  static_cast<RequestType*>(param.request), &rsp);
115  });
116  static_cast<RequestType*>(param.request)->~RequestType();
117  }
118  UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
119  }
120 
122  ::grpc::Status* status, void** /*handler_data*/) final {
123  auto* request =
125  call, sizeof(RequestType))) RequestType;
126  return UnaryDeserializeHelper(req, status,
127  static_cast<BaseRequestType*>(request));
128  }
129 
130  private:
132  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
133  const RequestType*, ResponseType*)>
134  func_;
135  // The class the above handler function lives in.
136  ServiceType* service_;
137 };
138 
140 template <class ServiceType, class RequestType, class ResponseType>
142  public:
144  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
145  ServerReader<RequestType>*, ResponseType*)>
146  func,
147  ServiceType* service)
148  : func_(func), service_(service) {}
149 
150  void RunHandler(const HandlerParameter& param) final {
152  param.call, static_cast<::grpc::ServerContext*>(param.server_context));
153  ResponseType rsp;
154  ::grpc::Status status = CatchingFunctionHandler([this, &param, &reader,
155  &rsp] {
156  return func_(service_,
157  static_cast<::grpc::ServerContext*>(param.server_context),
158  &reader, &rsp);
159  });
160 
164  ops;
165  if (!param.server_context->sent_initial_metadata_) {
166  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
167  param.server_context->initial_metadata_flags());
168  if (param.server_context->compression_level_set()) {
169  ops.set_compression_level(param.server_context->compression_level());
170  }
171  }
172  if (status.ok()) {
173  status = ops.SendMessagePtr(&rsp);
174  }
175  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
176  param.call->PerformOps(&ops);
177  param.call->cq()->Pluck(&ops);
178  }
179 
180  private:
181  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
182  ServerReader<RequestType>*, ResponseType*)>
183  func_;
184  ServiceType* service_;
185 };
186 
188 template <class ServiceType, class RequestType, class ResponseType>
190  public:
192  ServiceType*, ::grpc::ServerContext*,
193  const RequestType*, ServerWriter<ResponseType>*)>
194  func,
195  ServiceType* service)
196  : func_(func), service_(service) {}
197 
198  void RunHandler(const HandlerParameter& param) final {
199  ::grpc::Status status = param.status;
200  if (status.ok()) {
202  param.call,
203  static_cast<::grpc::ServerContext*>(param.server_context));
204  status = CatchingFunctionHandler([this, &param, &writer] {
205  return func_(service_,
206  static_cast<::grpc::ServerContext*>(param.server_context),
207  static_cast<RequestType*>(param.request), &writer);
208  });
209  static_cast<RequestType*>(param.request)->~RequestType();
210  }
211 
214  ops;
215  if (!param.server_context->sent_initial_metadata_) {
216  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
217  param.server_context->initial_metadata_flags());
218  if (param.server_context->compression_level_set()) {
219  ops.set_compression_level(param.server_context->compression_level());
220  }
221  }
222  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
223  param.call->PerformOps(&ops);
224  if (param.server_context->has_pending_ops_) {
225  param.call->cq()->Pluck(&param.server_context->pending_ops_);
226  }
227  param.call->cq()->Pluck(&ops);
228  }
229 
231  ::grpc::Status* status, void** /*handler_data*/) final {
232  ::grpc::ByteBuffer buf;
233  buf.set_buffer(req);
234  auto* request =
236  call, sizeof(RequestType))) RequestType();
237  *status =
239  buf.Release();
240  if (status->ok()) {
241  return request;
242  }
243  request->~RequestType();
244  return nullptr;
245  }
246 
247  private:
248  std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
249  const RequestType*, ServerWriter<ResponseType>*)>
250  func_;
251  ServiceType* service_;
252 };
253 
261 template <class Streamer, bool WriteNeeded>
263  public:
265  std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func)
266  : func_(func), write_needed_(WriteNeeded) {}
267 
268  void RunHandler(const HandlerParameter& param) final {
269  Streamer stream(param.call,
270  static_cast<::grpc::ServerContext*>(param.server_context));
271  ::grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
272  return func_(static_cast<::grpc::ServerContext*>(param.server_context),
273  &stream);
274  });
275 
278  ops;
279  if (!param.server_context->sent_initial_metadata_) {
280  ops.SendInitialMetadata(&param.server_context->initial_metadata_,
281  param.server_context->initial_metadata_flags());
282  if (param.server_context->compression_level_set()) {
283  ops.set_compression_level(param.server_context->compression_level());
284  }
285  if (write_needed_ && status.ok()) {
286  // If we needed a write but never did one, we need to mark the
287  // status as a fail
289  "Service did not provide response message");
290  }
291  }
292  ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
293  param.call->PerformOps(&ops);
294  if (param.server_context->has_pending_ops_) {
295  param.call->cq()->Pluck(&param.server_context->pending_ops_);
296  }
297  param.call->cq()->Pluck(&ops);
298  }
299 
300  private:
301  std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func_;
302  const bool write_needed_;
303 };
304 
305 template <class ServiceType, class RequestType, class ResponseType>
308  ServerReaderWriter<ResponseType, RequestType>, false> {
309  public:
311  ServiceType*, ::grpc::ServerContext*,
313  func,
314  ServiceType* service)
315  // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
317  ServerReaderWriter<ResponseType, RequestType>, false>(
318  [func, service](
319  ::grpc::ServerContext* ctx,
320  ServerReaderWriter<ResponseType, RequestType>* streamer) {
321  return func(service, ctx, streamer);
322  }) {}
323 };
324 
325 template <class RequestType, class ResponseType>
328  ServerUnaryStreamer<RequestType, ResponseType>, true> {
329  public:
331  std::function<
334  func)
336  ServerUnaryStreamer<RequestType, ResponseType>, true>(
337  std::move(func)) {}
338 };
339 
340 template <class RequestType, class ResponseType>
343  ServerSplitStreamer<RequestType, ResponseType>, false> {
344  public:
346  std::function<
349  func)
351  ServerSplitStreamer<RequestType, ResponseType>, false>(
352  std::move(func)) {}
353 };
354 
357 template <::grpc::StatusCode code>
359  public:
360  explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
361 
362  template <class T>
363  static void FillOps(::grpc::ServerContextBase* context,
364  const std::string& message, T* ops) {
365  ::grpc::Status status(code, message);
366  if (!context->sent_initial_metadata_) {
367  ops->SendInitialMetadata(&context->initial_metadata_,
368  context->initial_metadata_flags());
369  if (context->compression_level_set()) {
370  ops->set_compression_level(context->compression_level());
371  }
372  context->sent_initial_metadata_ = true;
373  }
374  ops->ServerSendStatus(&context->trailing_metadata_, status);
375  }
376 
377  void RunHandler(const HandlerParameter& param) final {
380  ops;
381  FillOps(param.server_context, message_, &ops);
382  param.call->PerformOps(&ops);
383  param.call->cq()->Pluck(&ops);
384  }
385 
386  void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
387  ::grpc::Status* /*status*/, void** /*handler_data*/) final {
388  // We have to destroy any request payload
389  if (req != nullptr) {
391  }
392  return nullptr;
393  }
394 
395  private:
396  const std::string message_;
397 };
398 
399 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
403 
404 } // namespace internal
405 } // namespace grpc
406 
407 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
A sequence of bytes.
Definition: byte_buffer.h:60
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer *bb)=0
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
Base class of ServerContext. Experimental until callback API is final.
Definition: server_context.h:126
bool compression_level_set() const
Return a bool indicating whether the compression level for this call has been set (either implicitly ...
Definition: server_context.h:243
grpc_compression_level compression_level() const
Return the compression algorithm to be used by the server call.
Definition: server_context.h:228
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:538
Synchronous (blocking) server-side API for doing client-streaming RPCs, where the incoming message st...
Definition: sync_stream.h:580
Synchronous (blocking) server-side API for a bidirectional streaming call, where the incoming message...
Definition: sync_stream.h:779
A class to represent a flow-controlled server-side streaming call.
Definition: sync_stream.h:887
A class to represent a flow-controlled unary call.
Definition: sync_stream.h:821
Synchronous (blocking) server-side API for doing for doing a server-streaming RPCs,...
Definition: sync_stream.h:633
Did it work? If it didn't, why?
Definition: status.h:31
bool ok() const
Is the status OK?
Definition: status.h:118
Definition: method_handler.h:308
BidiStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, ServerReaderWriter< ResponseType, RequestType > *)> func, ServiceType *service)
Definition: method_handler.h:310
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
::grpc::CompletionQueue * cq() const
Definition: call.h:70
Definition: call_op_set.h:212
Definition: call_op_set.h:282
Definition: call_op_set.h:654
void ServerSendStatus(std::multimap< std::string, std::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:658
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:867
A wrapper class of an application provided client streaming handler.
Definition: method_handler.h:141
ClientStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, ServerReader< RequestType > *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:143
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:150
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: method_handler.h:358
static void FillOps(::grpc::ServerContextBase *context, const std::string &message, T *ops)
Definition: method_handler.h:363
ErrorMethodHandler(const std::string &message)
Definition: method_handler.h:360
void * Deserialize(grpc_call *, grpc_byte_buffer *req, ::grpc::Status *, void **) final
Definition: method_handler.h:386
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:377
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
A wrapper class of an application provided rpc method handler.
Definition: method_handler.h:98
RpcMethodHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, const RequestType *, ResponseType *)> func, ServiceType *service)
Definition: method_handler.h:100
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: method_handler.h:121
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:107
A wrapper class of an application provided server streaming handler.
Definition: method_handler.h:189
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:198
ServerStreamingHandler(std::function<::grpc::Status(ServiceType *, ::grpc::ServerContext *, const RequestType *, ServerWriter< ResponseType > *)> func, ServiceType *service)
Definition: method_handler.h:191
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: method_handler.h:230
Definition: method_handler.h:343
SplitServerStreamingHandler(std::function< ::grpc::Status(::grpc::ServerContext *, ServerSplitStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:345
Definition: method_handler.h:328
StreamedUnaryHandler(std::function< ::grpc::Status(::grpc::ServerContext *, ServerUnaryStreamer< RequestType, ResponseType > *)> func)
Definition: method_handler.h:330
A wrapper class of an application provided bidi-streaming handler.
Definition: method_handler.h:262
void RunHandler(const HandlerParameter &param) final
Definition: method_handler.h:268
TemplatedBidiStreamingHandler(std::function<::grpc::Status(::grpc::ServerContext *, Streamer *)> func)
Definition: method_handler.h:264
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED > ResourceExhaustedHandler
Definition: method_handler.h:402
ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED > UnknownMethodHandler
Definition: method_handler.h:400
::grpc::Status CatchingFunctionHandler(Callable &&handler)
Definition: method_handler.h:39
void UnaryRunHandlerHelper(const ::grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, ::grpc::Status &)
void * UnaryDeserializeHelper(grpc_byte_buffer *, ::grpc::Status *, RequestType *)
A helper function with reduced templating to do deserializing.
Definition: method_handler.h:80
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
@ INTERNAL
Internal errors.
Definition: status_code_enum.h:119
@ UNKNOWN
Unknown error.
Definition: status_code_enum.h:35
Definition: async_unary_call.h:398
Definition: rpc_service_method.h:41
::grpc::ServerContextBase *const server_context
Definition: rpc_service_method.h:63
Call *const call
Definition: rpc_service_method.h:62
Definition: grpc_types.h:40