GRPC C++  1.39.1
completion_queue.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
32 #ifndef GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
33 #define GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
34 
35 #include <list>
36 
37 #include <grpc/impl/codegen/atm.h>
45 
47 
48 namespace grpc {
49 template <class R>
50 class ClientReader;
51 template <class W>
52 class ClientWriter;
53 template <class W, class R>
54 class ClientReaderWriter;
55 template <class R>
56 class ServerReader;
57 template <class W>
58 class ServerWriter;
59 namespace internal {
60 template <class W, class R>
61 class ServerReaderWriterBody;
62 
63 template <class ResponseType>
65  const ::grpc::internal::MethodHandler::HandlerParameter&, ResponseType*,
66  ::grpc::Status&);
67 template <class ServiceType, class RequestType, class ResponseType,
68  class BaseRequestType, class BaseResponseType>
69 class RpcMethodHandler;
70 template <class ServiceType, class RequestType, class ResponseType>
72 template <class ServiceType, class RequestType, class ResponseType>
74 template <class Streamer, bool WriteNeeded>
76 template <::grpc::StatusCode code>
77 class ErrorMethodHandler;
78 } // namespace internal
79 
80 class Channel;
81 class ChannelInterface;
82 class Server;
83 class ServerBuilder;
84 class ServerContextBase;
85 class ServerInterface;
86 
87 namespace internal {
88 class CompletionQueueTag;
89 class RpcMethod;
90 template <class InputMessage, class OutputMessage>
91 class BlockingUnaryCallImpl;
92 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
93 class CallOpSet;
94 } // namespace internal
95 
97 
103  public:
109  nullptr}) {}
110 
114  explicit CompletionQueue(grpc_completion_queue* take);
115 
117  ~CompletionQueue() override {
119  }
120 
122  enum NextStatus {
124  GOT_EVENT,
126  TIMEOUT
127  };
128 
177  bool Next(void** tag, bool* ok) {
178  return (AsyncNextInternal(tag, ok,
181  }
182 
194  template <typename T>
195  NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) {
196  ::grpc::TimePoint<T> deadline_tp(deadline);
197  return AsyncNextInternal(tag, ok, deadline_tp.raw_time());
198  }
199 
214  template <typename T, typename F>
215  NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) {
216  CompletionQueueTLSCache cache = CompletionQueueTLSCache(this);
217  f();
218  if (cache.Flush(tag, ok)) {
219  return GOT_EVENT;
220  } else {
221  return AsyncNext(tag, ok, deadline);
222  }
223  }
224 
235  void Shutdown();
236 
242  grpc_completion_queue* cq() { return cq_; }
243 
244  protected:
246  explicit CompletionQueue(const grpc_completion_queue_attributes& attributes) {
249  &attributes),
250  &attributes, nullptr);
251  InitialAvalanching(); // reserve this for the future shutdown
252  }
253 
254  private:
255  // Friends for access to server registration lists that enable checking and
256  // logging on shutdown
257  friend class ::grpc::ServerBuilder;
258  friend class ::grpc::Server;
259 
260  // Friend synchronous wrappers so that they can access Pluck(), which is
261  // a semi-private API geared towards the synchronous implementation.
262  template <class R>
263  friend class ::grpc::ClientReader;
264  template <class W>
265  friend class ::grpc::ClientWriter;
266  template <class W, class R>
267  friend class ::grpc::ClientReaderWriter;
268  template <class R>
269  friend class ::grpc::ServerReader;
270  template <class W>
271  friend class ::grpc::ServerWriter;
272  template <class W, class R>
273  friend class ::grpc::internal::ServerReaderWriterBody;
274  template <class ResponseType>
276  const ::grpc::internal::MethodHandler::HandlerParameter&, ResponseType*,
277  ::grpc::Status&);
278  template <class ServiceType, class RequestType, class ResponseType>
279  friend class ::grpc::internal::ClientStreamingHandler;
280  template <class ServiceType, class RequestType, class ResponseType>
281  friend class ::grpc::internal::ServerStreamingHandler;
282  template <class Streamer, bool WriteNeeded>
283  friend class ::grpc::internal::TemplatedBidiStreamingHandler;
284  template <::grpc::StatusCode code>
285  friend class ::grpc::internal::ErrorMethodHandler;
287  friend class ::grpc::ServerInterface;
288  template <class InputMessage, class OutputMessage>
289  friend class ::grpc::internal::BlockingUnaryCallImpl;
290 
291  // Friends that need access to constructor for callback CQ
292  friend class ::grpc::Channel;
293 
294  // For access to Register/CompleteAvalanching
295  template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
296  friend class ::grpc::internal::CallOpSet;
297 
302  class CompletionQueueTLSCache {
303  public:
304  explicit CompletionQueueTLSCache(CompletionQueue* cq);
305  ~CompletionQueueTLSCache();
306  bool Flush(void** tag, bool* ok);
307 
308  private:
309  CompletionQueue* cq_;
310  bool flushed_;
311  };
312 
313  NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
314 
317  bool Pluck(::grpc::internal::CompletionQueueTag* tag) {
318  auto deadline =
320  while (true) {
322  cq_, tag, deadline, nullptr);
323  bool ok = ev.success != 0;
324  void* ignored = tag;
325  if (tag->FinalizeResult(&ignored, &ok)) {
326  GPR_CODEGEN_ASSERT(ignored == tag);
327  return ok;
328  }
329  }
330  }
331 
340  void TryPluck(::grpc::internal::CompletionQueueTag* tag) {
341  auto deadline =
344  cq_, tag, deadline, nullptr);
345  if (ev.type == GRPC_QUEUE_TIMEOUT) return;
346  bool ok = ev.success != 0;
347  void* ignored = tag;
348  // the tag must be swallowed if using TryPluck
349  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
350  }
351 
357  void TryPluck(::grpc::internal::CompletionQueueTag* tag,
358  gpr_timespec deadline) {
360  cq_, tag, deadline, nullptr);
361  if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
362  return;
363  }
364 
365  bool ok = ev.success != 0;
366  void* ignored = tag;
367  GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok));
368  }
369 
376  void InitialAvalanching() {
377  gpr_atm_rel_store(&avalanches_in_flight_, static_cast<gpr_atm>(1));
378  }
379  void RegisterAvalanching() {
380  gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
381  static_cast<gpr_atm>(1));
382  }
383  void CompleteAvalanching() {
384  if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
385  static_cast<gpr_atm>(-1)) == 1) {
387  }
388  }
389 
390  void RegisterServer(const ::grpc::Server* server) {
391  (void)server;
392 #ifndef NDEBUG
393  grpc::internal::MutexLock l(&server_list_mutex_);
394  server_list_.push_back(server);
395 #endif
396  }
397  void UnregisterServer(const ::grpc::Server* server) {
398  (void)server;
399 #ifndef NDEBUG
400  grpc::internal::MutexLock l(&server_list_mutex_);
401  server_list_.remove(server);
402 #endif
403  }
404  bool ServerListEmpty() const {
405 #ifndef NDEBUG
406  grpc::internal::MutexLock l(&server_list_mutex_);
407  return server_list_.empty();
408 #endif
409  return true;
410  }
411 
412  static CompletionQueue* CallbackAlternativeCQ();
413  static void ReleaseCallbackAlternativeCQ(CompletionQueue* cq);
414 
415  grpc_completion_queue* cq_; // owned
416 
417  gpr_atm avalanches_in_flight_;
418 
419  // List of servers associated with this CQ. Even though this is only used with
420  // NDEBUG, instantiate it in all cases since otherwise the size will be
421  // inconsistent.
422  mutable grpc::internal::Mutex server_list_mutex_;
423  std::list<const ::grpc::Server*>
424  server_list_ /* GUARDED_BY(server_list_mutex_) */;
425 };
426 
430  public:
431  bool IsFrequentlyPolled() { return polling_type_ != GRPC_CQ_NON_LISTENING; }
432 
433  protected:
436 
437  private:
445  grpc_cq_polling_type polling_type,
446  grpc_completion_queue_functor* shutdown_cb)
448  GRPC_CQ_CURRENT_VERSION, completion_type, polling_type,
449  shutdown_cb}),
450  polling_type_(polling_type) {}
451 
452  grpc_cq_polling_type polling_type_;
453  friend class ::grpc::ServerBuilder;
454  friend class ::grpc::Server;
455 };
456 
457 } // namespace grpc
458 
459 #endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H
Channels represent a connection to an endpoint. Created by CreateChannel.
Definition: channel.h:57
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
NextStatus AsyncNext(void **tag, bool *ok, const T &deadline)
Read from the queue, blocking up to deadline (or the queue's shutdown).
Definition: completion_queue.h:195
void Shutdown()
Request the shutdown of the queue.
Definition: completion_queue_cc.cc:132
NextStatus
Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT.
Definition: completion_queue.h:122
@ SHUTDOWN
The completion queue has been shutdown and fully-drained.
Definition: completion_queue.h:123
@ GOT_EVENT
Got a new event; tag will be filled in with its associated value; ok indicating its success.
Definition: completion_queue.h:124
@ TIMEOUT
deadline was reached.
Definition: completion_queue.h:126
~CompletionQueue() override
Destructor. Destroys the owned wrapped completion queue / instance.
Definition: completion_queue.h:117
bool Next(void **tag, bool *ok)
Read from the queue, blocking until an event is available or the queue is shutting down.
Definition: completion_queue.h:177
grpc_completion_queue * cq()
Returns a raw pointer to the underlying grpc_completion_queue instance.
Definition: completion_queue.h:242
CompletionQueue(const grpc_completion_queue_attributes &attributes)
Private constructor of CompletionQueue only visible to friend classes.
Definition: completion_queue.h:246
CompletionQueue()
Default constructor.
Definition: completion_queue.h:106
NextStatus DoThenAsyncNext(F &&f, void **tag, bool *ok, const T &deadline)
EXPERIMENTAL First executes F, then reads from the queue, blocking up to deadline (or the queue's shu...
Definition: completion_queue.h:215
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:38
virtual void grpc_completion_queue_destroy(grpc_completion_queue *cq)=0
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline, void *reserved)=0
virtual void grpc_completion_queue_shutdown(grpc_completion_queue *cq)=0
virtual gpr_timespec gpr_inf_future(gpr_clock_type type)=0
virtual gpr_timespec gpr_time_0(gpr_clock_type type)=0
virtual grpc_completion_queue * grpc_completion_queue_create(const grpc_completion_queue_factory *factory, const grpc_completion_queue_attributes *attributes, void *reserved)=0
Classes that require gRPC to be initialized should inherit from this class.
Definition: grpc_library.h:38
A builder class for the creation and startup of grpc::Server instances.
Definition: server_builder.h:85
A specific type of completion queue used by the processing of notifications by servers.
Definition: completion_queue.h:429
ServerCompletionQueue()
Default constructor.
Definition: completion_queue.h:435
bool IsFrequentlyPolled()
Definition: completion_queue.h:431
Base class of ServerContext. Experimental until callback API is final.
Definition: server_context.h:126
Represents a gRPC server.
Definition: server.h:59
Definition: server_interface.h:59
Did it work? If it didn't, why?
Definition: status.h:31
If you are trying to use CompletionQueue::AsyncNext with a time class that isn't either gpr_timespec ...
Definition: time.h:40
gpr_timespec raw_time()=delete
A wrapper class of an application provided client streaming handler.
Definition: method_handler.h:141
An interface allowing implementors to process and filter event tags.
Definition: completion_queue_tag.h:26
virtual bool FinalizeResult(void **tag, bool *status)=0
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
General method handler class for errors that prevent real method use e.g., handle unknown method by r...
Definition: method_handler.h:358
Definition: sync.h:58
Definition: sync.h:85
A wrapper class of an application provided rpc method handler.
Definition: method_handler.h:98
A wrapper class of an application provided server streaming handler.
Definition: method_handler.h:189
A wrapper class of an application provided bidi-streaming handler.
Definition: method_handler.h:262
@ GPR_CLOCK_REALTIME
Realtime clock.
Definition: gpr_types.h:36
GPRAPI gpr_timespec gpr_inf_future(gpr_clock_type type)
The zero time interval.
Definition: time.cc:54
GRPCAPI const grpc_completion_queue_factory * grpc_completion_queue_factory_lookup(const grpc_completion_queue_attributes *attributes)
Returns the completion queue factory based on the attributes.
Definition: completion_queue_factory.cc:46
grpc_cq_completion_type
Specifies the type of APIs to use to pop events from the completion queue.
Definition: grpc_types.h:742
@ GRPC_CQ_NEXT
Events are popped out by calling grpc_completion_queue_next() API ONLY.
Definition: grpc_types.h:744
#define GRPC_CQ_CURRENT_VERSION
Definition: grpc_types.h:776
grpc_cq_polling_type
Completion queues internally MAY maintain a set of file descriptors in a structure called 'pollset'.
Definition: grpc_types.h:724
@ GRPC_CQ_NON_LISTENING
Similar to GRPC_CQ_DEFAULT_POLLING except that the completion queues will not contain any 'listening ...
Definition: grpc_types.h:732
@ GRPC_CQ_DEFAULT_POLLING
The completion queue will have an associated pollset and there is no restriction on the type of file ...
Definition: grpc_types.h:727
@ GRPC_QUEUE_TIMEOUT
No event before timeout.
Definition: grpc_types.h:540
@ GRPC_QUEUE_SHUTDOWN
Shutting down.
Definition: grpc_types.h:538
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
#define gpr_atm_rel_store(p, value)
Definition: atm_gcc_atomic.h:52
#define gpr_atm_no_barrier_fetch_add(p, delta)
Definition: atm_gcc_atomic.h:57
::grpc::ServerContextBase ServerContextBase
Definition: server_context.h:108
void UnaryRunHandlerHelper(const ::grpc::internal::MethodHandler::HandlerParameter &, ResponseType *, ::grpc::Status &)
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Null-initializes the global gRPC variables for the codegen library.
Definition: completion_queue.h:96
Analogous to struct timespec.
Definition: gpr_types.h:47
Definition: grpc_types.h:778
Specifies an interface class to be used as a tag for callback-based completion queues.
Definition: grpc_types.h:757
Definition: completion_queue.cc:339
int success
If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates whether the operation was succe...
Definition: grpc_types.h:556