GRPC C++  1.39.1
server_interface.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
21 
23 
33 
34 namespace grpc {
35 
36 class AsyncGenericService;
37 class Channel;
38 class CompletionQueue;
39 class GenericServerContext;
40 class ServerCompletionQueue;
41 class ServerCredentials;
42 class Service;
43 
44 extern CoreCodegenInterface* g_core_codegen_interface;
45 
49 namespace internal {
50 class ServerAsyncStreamingInterface;
51 } // namespace internal
52 
53 class CallbackGenericService;
54 
55 namespace experimental {
56 class ServerInterceptorFactoryInterface;
57 } // namespace experimental
58 
60  public:
61  ~ServerInterface() override {}
62 
95  template <class T>
96  void Shutdown(const T& deadline) {
97  ShutdownInternal(TimePoint<T>(deadline).raw_time());
98  }
99 
105  void Shutdown() {
108  }
109 
114  virtual void Wait() = 0;
115 
116  protected:
117  friend class ::grpc::Service;
118 
121  virtual bool RegisterService(const std::string* host, Service* service) = 0;
122 
126 
130 
132  /*service*/) {}
133 
145  virtual int AddListeningPort(const std::string& addr,
146  ServerCredentials* creds) = 0;
147 
154  virtual void Start(::grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0;
155 
156  virtual void ShutdownInternal(gpr_timespec deadline) = 0;
157 
158  virtual int max_receive_message_size() const = 0;
159 
160  virtual grpc_server* server() = 0;
161 
163  internal::Call* call) override = 0;
164 
166  public:
169  ::grpc::CompletionQueue* call_cq,
170  ::grpc::ServerCompletionQueue* notification_cq, void* tag,
171  bool delete_on_finalize);
172  ~BaseAsyncRequest() override;
173 
174  bool FinalizeResult(void** tag, bool* status) override;
175 
176  private:
177  void ContinueFinalizeResultAfterInterception();
178 
179  protected:
185  void* const tag_;
191  };
192 
195  public:
197  ::grpc::ServerContext* context,
199  ::grpc::CompletionQueue* call_cq,
200  ::grpc::ServerCompletionQueue* notification_cq,
201  void* tag, const char* name,
203 
204  bool FinalizeResult(void** tag, bool* status) override {
205  /* If we are done intercepting, then there is nothing more for us to do */
206  if (done_intercepting_) {
207  return BaseAsyncRequest::FinalizeResult(tag, status);
208  }
211  context_->set_server_rpc_info(name_, type_,
212  *server_->interceptor_creators()));
213  return BaseAsyncRequest::FinalizeResult(tag, status);
214  }
215 
216  protected:
217  void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
218  ::grpc::ServerCompletionQueue* notification_cq);
219  const char* name_;
221  };
222 
224  public:
227  ::grpc::ServerContext* context,
229  ::grpc::CompletionQueue* call_cq,
230  ::grpc::ServerCompletionQueue* notification_cq,
231  void* tag)
233  server, context, stream, call_cq, notification_cq, tag,
234  registered_method->name(), registered_method->method_type()) {
235  IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
236  }
237 
238  // uses RegisteredAsyncRequest::FinalizeResult
239  };
240 
241  template <class Message>
243  public:
247  ::grpc::CompletionQueue* call_cq,
248  ::grpc::ServerCompletionQueue* notification_cq,
249  void* tag, Message* request)
251  server, context, stream, call_cq, notification_cq, tag,
252  registered_method->name(), registered_method->method_type()),
253  registered_method_(registered_method),
254  request_(request) {
255  IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
256  notification_cq);
257  }
258 
259  ~PayloadAsyncRequest() override {
260  payload_.Release(); // We do not own the payload_
261  }
262 
263  bool FinalizeResult(void** tag, bool* status) override {
264  /* If we are done intercepting, then there is nothing more for us to do */
265  if (done_intercepting_) {
266  return RegisteredAsyncRequest::FinalizeResult(tag, status);
267  }
268  if (*status) {
270  payload_.bbuf_ptr(), request_)
271  .ok()) {
272  // If deserialization fails, we cancel the call and instantiate
273  // a new instance of ourselves to request another call. We then
274  // return false, which prevents the call from being returned to
275  // the application.
277  call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
279  new PayloadAsyncRequest(registered_method_, server_, context_,
281  request_);
282  delete this;
283  return false;
284  }
285  }
286  /* Set interception point for recv message */
289  interceptor_methods_.SetRecvMessage(request_, nullptr);
290  return RegisteredAsyncRequest::FinalizeResult(tag, status);
291  }
292 
293  private:
294  internal::RpcServiceMethod* const registered_method_;
295  Message* const request_;
296  ByteBuffer payload_;
297  };
298 
300  public:
303  ::grpc::CompletionQueue* call_cq,
304  ::grpc::ServerCompletionQueue* notification_cq,
305  void* tag, bool delete_on_finalize);
306 
307  bool FinalizeResult(void** tag, bool* status) override;
308 
309  private:
310  grpc_call_details call_details_;
311  };
312 
313  template <class Message>
315  ::grpc::ServerContext* context,
317  ::grpc::CompletionQueue* call_cq,
318  ::grpc::ServerCompletionQueue* notification_cq,
319  void* tag, Message* message) {
320  GPR_CODEGEN_ASSERT(method);
321  new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
322  notification_cq, tag, message);
323  }
324 
326  ::grpc::ServerContext* context,
328  ::grpc::CompletionQueue* call_cq,
329  ::grpc::ServerCompletionQueue* notification_cq,
330  void* tag) {
331  GPR_CODEGEN_ASSERT(method);
332  new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
333  notification_cq, tag);
334  }
335 
338  ::grpc::CompletionQueue* call_cq,
339  ::grpc::ServerCompletionQueue* notification_cq,
340  void* tag) {
341  new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
342  tag, true);
343  }
344 
345  private:
346  // EXPERIMENTAL
347  // Getter method for the vector of interceptor factory objects.
348  // Returns a nullptr (rather than being pure) since this is a post-1.0 method
349  // and adding a new pure method to an interface would be a breaking change
350  // (even though this is private and non-API)
351  virtual std::vector<
352  std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
353  interceptor_creators() {
354  return nullptr;
355  }
356 
357  // EXPERIMENTAL
358  // A method to get the callbackable completion queue associated with this
359  // server. If the return value is nullptr, this server doesn't support
360  // callback operations.
361  // TODO(vjpai): Consider a better default like using a global CQ
362  // Returns nullptr (rather than being pure) since this is a post-1.0 method
363  // and adding a new pure method to an interface would be a breaking change
364  // (even though this is private and non-API)
365  virtual ::grpc::CompletionQueue* CallbackCQ() { return nullptr; }
366 };
367 
368 } // namespace grpc
369 
370 #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
Definition: async_generic_service.h:68
A sequence of bytes.
Definition: byte_buffer.h:60
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
bool Valid() const
Is this ByteBuffer valid?
Definition: byte_buffer.h:163
CallbackGenericService is the base class for generic services implemented using the callback API and ...
Definition: async_generic_service.h:102
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
virtual void grpc_call_unref(grpc_call *call)=0
virtual gpr_timespec gpr_inf_future(gpr_clock_type type)=0
virtual grpc_call_error grpc_call_cancel_with_status(grpc_call *call, grpc_status_code status, const char *description, void *reserved)=0
Definition: async_generic_service.h:39
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:429
A ServerContext or CallbackServerContext allows the code implementing a service handler to:
Definition: server_context.h:538
Wrapper around grpc_server_credentials, a way to authenticate a server.
Definition: server_credentials.h:70
Definition: server_interface.h:165
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
BaseAsyncRequest(ServerInterface *server, ::grpc::ServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag, bool delete_on_finalize)
const bool delete_on_finalize_
Definition: server_interface.h:186
grpc_call * call_
Definition: server_interface.h:187
internal::Call call_wrapper_
Definition: server_interface.h:188
bool done_intercepting_
Definition: server_interface.h:190
ServerInterface *const server_
Definition: server_interface.h:180
::grpc::ServerContext *const context_
Definition: server_interface.h:181
::grpc::CompletionQueue *const call_cq_
Definition: server_interface.h:183
void *const tag_
Definition: server_interface.h:185
internal::InterceptorBatchMethodsImpl interceptor_methods_
Definition: server_interface.h:189
internal::ServerAsyncStreamingInterface *const stream_
Definition: server_interface.h:182
::grpc::ServerCompletionQueue *const notification_cq_
Definition: server_interface.h:184
Definition: server_interface.h:299
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
GenericAsyncRequest(ServerInterface *server, GenericServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag, bool delete_on_finalize)
Definition: server_interface.h:223
NoPayloadAsyncRequest(internal::RpcServiceMethod *registered_method, ServerInterface *server, ::grpc::ServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag)
Definition: server_interface.h:225
Definition: server_interface.h:242
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: server_interface.h:263
PayloadAsyncRequest(internal::RpcServiceMethod *registered_method, ServerInterface *server, ::grpc::ServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag, Message *request)
Definition: server_interface.h:244
~PayloadAsyncRequest() override
Definition: server_interface.h:259
RegisteredAsyncRequest is not part of the C++ API.
Definition: server_interface.h:194
const char * name_
Definition: server_interface.h:219
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: server_interface.h:204
void IssueRequest(void *registered_method, grpc_byte_buffer **payload, ::grpc::ServerCompletionQueue *notification_cq)
RegisteredAsyncRequest(ServerInterface *server, ::grpc::ServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag, const char *name, internal::RpcMethod::RpcType type)
const internal::RpcMethod::RpcType type_
Definition: server_interface.h:220
Definition: server_interface.h:59
void RequestAsyncGenericCall(GenericServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag)
Definition: server_interface.h:336
virtual bool RegisterService(const std::string *host, Service *service)=0
Register a service.
virtual int AddListeningPort(const std::string &addr, ServerCredentials *creds)=0
Tries to bind server to the given addr.
~ServerInterface() override
Definition: server_interface.h:61
virtual void Start(::grpc::ServerCompletionQueue **cqs, size_t num_cqs)=0
Start the server.
void Shutdown(const T &deadline)
Shutdown does the following things:
Definition: server_interface.h:96
void Shutdown()
Shutdown the server without a deadline and forced cancellation.
Definition: server_interface.h:105
virtual void ShutdownInternal(gpr_timespec deadline)=0
virtual void RegisterCallbackGenericService(CallbackGenericService *)
Register a callback generic service.
Definition: server_interface.h:131
virtual int max_receive_message_size() const =0
virtual grpc_server * server()=0
void PerformOpsOnCall(internal::CallOpSetInterface *ops, internal::Call *call) override=0
virtual void Wait()=0
Block waiting for all work to complete.
void RequestAsyncCall(internal::RpcServiceMethod *method, ::grpc::ServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag)
Definition: server_interface.h:325
void RequestAsyncCall(internal::RpcServiceMethod *method, ::grpc::ServerContext *context, internal::ServerAsyncStreamingInterface *stream, ::grpc::CompletionQueue *call_cq, ::grpc::ServerCompletionQueue *notification_cq, void *tag, Message *message)
Definition: server_interface.h:314
virtual void RegisterAsyncGenericService(AsyncGenericService *service)=0
Register a generic service.
Desriptor of an RPC service and its various RPC methods.
Definition: service_type.h:56
If you are trying to use CompletionQueue::AsyncNext with a time class that isn't either gpr_timespec ...
Definition: time.h:40
This is an interface that Channel and Server implement to allow them to hook performing ops.
Definition: call_hook.h:30
Straightforward wrapping of the C call object.
Definition: call.h:35
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
An interface allowing implementors to process and filter event tags.
Definition: completion_queue_tag.h:26
Definition: interceptor_common.h:37
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetRecvMessage(void *message, bool *hijacked_recv_message_failed)
Definition: interceptor_common.h:168
RpcType
Definition: rpc_method.h:31
Server side rpc method class.
Definition: rpc_service_method.h:84
void * server_tag() const
Definition: rpc_service_method.h:103
@ GPR_CLOCK_MONOTONIC
Monotonic clock.
Definition: gpr_types.h:33
@ GRPC_STATUS_INTERNAL
Internal errors.
Definition: status.h:127
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
struct grpc_server grpc_server
A server listens to some port and responds to request calls.
Definition: grpc_types.h:65
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
::google::protobuf::Message Message
Definition: config_protobuf.h:76
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96
Analogous to struct timespec.
Definition: gpr_types.h:47
Definition: grpc_types.h:40
Definition: grpc_types.h:569