GRPC C++  1.39.1
server_callback_handlers.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
26 
27 namespace grpc {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
32  public:
35  const RequestType*, ResponseType*)>
36  get_reactor)
37  : get_reactor_(std::move(get_reactor)) {}
38 
41  allocator_ = allocator;
42  }
43 
44  void RunHandler(const HandlerParameter& param) final {
45  // Arena allocate a controller structure (that includes request/response)
47  auto* allocator_state =
49  param.internal_data);
50 
52  param.call->call(), sizeof(ServerCallbackUnaryImpl)))
53  ServerCallbackUnaryImpl(
54  static_cast<::grpc::CallbackServerContext*>(param.server_context),
55  param.call, allocator_state, param.call_requester);
56  param.server_context->BeginCompletionOp(
57  param.call, [call](bool) { call->MaybeDone(); }, call);
58 
59  ServerUnaryReactor* reactor = nullptr;
60  if (param.status.ok()) {
61  reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
62  get_reactor_,
63  static_cast<::grpc::CallbackServerContext*>(param.server_context),
64  call->request(), call->response());
65  }
66 
67  if (reactor == nullptr) {
68  // if deserialization or reactor creator failed, we need to fail the call
70  param.call->call(), sizeof(UnimplementedUnaryReactor)))
73  }
74 
76  call->SetupReactor(reactor);
77  }
78 
80  ::grpc::Status* status, void** handler_data) final {
82  buf.set_buffer(req);
83  RequestType* request = nullptr;
84  MessageHolder<RequestType, ResponseType>* allocator_state = nullptr;
85  if (allocator_ != nullptr) {
86  allocator_state = allocator_->AllocateMessages();
87  } else {
88  allocator_state =
92  }
93  *handler_data = allocator_state;
94  request = allocator_state->request();
95  *status =
97  buf.Release();
98  if (status->ok()) {
99  return request;
100  }
101  // Clean up on deserialization failure.
102  allocator_state->Release();
103  return nullptr;
104  }
105 
106  private:
108  const RequestType*, ResponseType*)>
109  get_reactor_;
110  MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
111 
112  class ServerCallbackUnaryImpl : public ServerCallbackUnary {
113  public:
114  void Finish(::grpc::Status s) override {
115  // A callback that only contains a call to MaybeDone can be run as an
116  // inline callback regardless of whether or not OnDone is inlineable
117  // because if the actual OnDone callback needs to be scheduled, MaybeDone
118  // is responsible for dispatching to an executor thread if needed. Thus,
119  // when setting up the finish_tag_, we can set its own callback to
120  // inlineable.
121  finish_tag_.Set(
122  call_.call(),
123  [this](bool) {
124  this->MaybeDone(
125  reactor_.load(std::memory_order_relaxed)->InternalInlineable());
126  },
127  &finish_ops_, /*can_inline=*/true);
128  finish_ops_.set_core_cq_tag(&finish_tag_);
129 
130  if (!ctx_->sent_initial_metadata_) {
131  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
132  ctx_->initial_metadata_flags());
133  if (ctx_->compression_level_set()) {
134  finish_ops_.set_compression_level(ctx_->compression_level());
135  }
136  ctx_->sent_initial_metadata_ = true;
137  }
138  // The response is dropped if the status is not OK.
139  if (s.ok()) {
140  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
141  finish_ops_.SendMessagePtr(response()));
142  } else {
143  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
144  }
145  finish_ops_.set_core_cq_tag(&finish_tag_);
146  call_.PerformOps(&finish_ops_);
147  }
148 
149  void SendInitialMetadata() override {
150  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
151  this->Ref();
152  // The callback for this function should not be marked inline because it
153  // is directly invoking a user-controlled reaction
154  // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
155  // thread. However, any OnDone needed after that can be inlined because it
156  // is already running on an executor thread.
157  meta_tag_.Set(
158  call_.call(),
159  [this](bool ok) {
160  ServerUnaryReactor* reactor =
161  reactor_.load(std::memory_order_relaxed);
162  reactor->OnSendInitialMetadataDone(ok);
163  this->MaybeDone(/*inlineable_ondone=*/true);
164  },
165  &meta_ops_, /*can_inline=*/false);
166  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
167  ctx_->initial_metadata_flags());
168  if (ctx_->compression_level_set()) {
169  meta_ops_.set_compression_level(ctx_->compression_level());
170  }
171  ctx_->sent_initial_metadata_ = true;
172  meta_ops_.set_core_cq_tag(&meta_tag_);
173  call_.PerformOps(&meta_ops_);
174  }
175 
176  private:
177  friend class CallbackUnaryHandler<RequestType, ResponseType>;
178 
179  ServerCallbackUnaryImpl(
181  MessageHolder<RequestType, ResponseType>* allocator_state,
182  std::function<void()> call_requester)
183  : ctx_(ctx),
184  call_(*call),
185  allocator_state_(allocator_state),
186  call_requester_(std::move(call_requester)) {
187  ctx_->set_message_allocator_state(allocator_state);
188  }
189 
194  void SetupReactor(ServerUnaryReactor* reactor) {
195  reactor_.store(reactor, std::memory_order_relaxed);
196  this->BindReactor(reactor);
197  this->MaybeCallOnCancel(reactor);
198  this->MaybeDone(reactor->InternalInlineable());
199  }
200 
201  const RequestType* request() { return allocator_state_->request(); }
202  ResponseType* response() { return allocator_state_->response(); }
203 
204  void CallOnDone() override {
205  reactor_.load(std::memory_order_relaxed)->OnDone();
206  grpc_call* call = call_.call();
207  auto call_requester = std::move(call_requester_);
208  allocator_state_->Release();
209  if (ctx_->context_allocator() != nullptr) {
210  ctx_->context_allocator()->Release(ctx_);
211  }
212  this->~ServerCallbackUnaryImpl(); // explicitly call destructor
214  call_requester();
215  }
216 
217  ServerReactor* reactor() override {
218  return reactor_.load(std::memory_order_relaxed);
219  }
220 
222  meta_ops_;
227  finish_ops_;
229 
230  ::grpc::CallbackServerContext* const ctx_;
232  MessageHolder<RequestType, ResponseType>* const allocator_state_;
233  std::function<void()> call_requester_;
234  // reactor_ can always be loaded/stored with relaxed memory ordering because
235  // its value is only set once, independently of other data in the object,
236  // and the loads that use it will always actually come provably later even
237  // though they are from different threads since they are triggered by
238  // actions initiated only by the setting up of the reactor_ variable. In
239  // a sense, it's a delayed "const": it gets its value from the SetupReactor
240  // method (not the constructor, so it's not a true const), but it doesn't
241  // change after that and it only gets used by actions caused, directly or
242  // indirectly, by that setup. This comment also applies to the reactor_
243  // variables of the other streaming objects in this file.
244  std::atomic<ServerUnaryReactor*> reactor_;
245  // callbacks_outstanding_ follows a refcount pattern
246  std::atomic<intptr_t> callbacks_outstanding_{
247  3}; // reserve for start, Finish, and CompletionOp
248  };
249 };
250 
251 template <class RequestType, class ResponseType>
253  public:
255  std::function<ServerReadReactor<RequestType>*(
256  ::grpc::CallbackServerContext*, ResponseType*)>
257  get_reactor)
258  : get_reactor_(std::move(get_reactor)) {}
259  void RunHandler(const HandlerParameter& param) final {
260  // Arena allocate a reader structure (that includes response)
261  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
262 
264  param.call->call(), sizeof(ServerCallbackReaderImpl)))
265  ServerCallbackReaderImpl(
266  static_cast<::grpc::CallbackServerContext*>(param.server_context),
267  param.call, param.call_requester);
268  // Inlineable OnDone can be false in the CompletionOp callback because there
269  // is no read reactor that has an inlineable OnDone; this only applies to
270  // the DefaultReactor (which is unary).
271  param.server_context->BeginCompletionOp(
272  param.call,
273  [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
274  reader);
275 
276  ServerReadReactor<RequestType>* reactor = nullptr;
277  if (param.status.ok()) {
280  get_reactor_,
281  static_cast<::grpc::CallbackServerContext*>(param.server_context),
282  reader->response());
283  }
284 
285  if (reactor == nullptr) {
286  // if deserialization or reactor creator failed, we need to fail the call
288  param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
291  }
292 
293  reader->SetupReactor(reactor);
294  }
295 
296  private:
297  std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
298  ResponseType*)>
299  get_reactor_;
300 
301  class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
302  public:
303  void Finish(::grpc::Status s) override {
304  // A finish tag with only MaybeDone can have its callback inlined
305  // regardless even if OnDone is not inlineable because this callback just
306  // checks a ref and then decides whether or not to dispatch OnDone.
307  finish_tag_.Set(
308  call_.call(),
309  [this](bool) {
310  // Inlineable OnDone can be false here because there is
311  // no read reactor that has an inlineable OnDone; this
312  // only applies to the DefaultReactor (which is unary).
313  this->MaybeDone(/*inlineable_ondone=*/false);
314  },
315  &finish_ops_, /*can_inline=*/true);
316  if (!ctx_->sent_initial_metadata_) {
317  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
318  ctx_->initial_metadata_flags());
319  if (ctx_->compression_level_set()) {
320  finish_ops_.set_compression_level(ctx_->compression_level());
321  }
322  ctx_->sent_initial_metadata_ = true;
323  }
324  // The response is dropped if the status is not OK.
325  if (s.ok()) {
326  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327  finish_ops_.SendMessagePtr(&resp_));
328  } else {
329  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
330  }
331  finish_ops_.set_core_cq_tag(&finish_tag_);
332  call_.PerformOps(&finish_ops_);
333  }
334 
335  void SendInitialMetadata() override {
336  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
337  this->Ref();
338  // The callback for this function should not be inlined because it invokes
339  // a user-controlled reaction, but any resulting OnDone can be inlined in
340  // the executor to which this callback is dispatched.
341  meta_tag_.Set(
342  call_.call(),
343  [this](bool ok) {
344  ServerReadReactor<RequestType>* reactor =
345  reactor_.load(std::memory_order_relaxed);
346  reactor->OnSendInitialMetadataDone(ok);
347  this->MaybeDone(/*inlineable_ondone=*/true);
348  },
349  &meta_ops_, /*can_inline=*/false);
350  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
351  ctx_->initial_metadata_flags());
352  if (ctx_->compression_level_set()) {
353  meta_ops_.set_compression_level(ctx_->compression_level());
354  }
355  ctx_->sent_initial_metadata_ = true;
356  meta_ops_.set_core_cq_tag(&meta_tag_);
357  call_.PerformOps(&meta_ops_);
358  }
359 
360  void Read(RequestType* req) override {
361  this->Ref();
362  read_ops_.RecvMessage(req);
363  call_.PerformOps(&read_ops_);
364  }
365 
366  private:
367  friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
368 
369  ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
370  ::grpc::internal::Call* call,
371  std::function<void()> call_requester)
372  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
373 
374  void SetupReactor(ServerReadReactor<RequestType>* reactor) {
375  reactor_.store(reactor, std::memory_order_relaxed);
376  // The callback for this function should not be inlined because it invokes
377  // a user-controlled reaction, but any resulting OnDone can be inlined in
378  // the executor to which this callback is dispatched.
379  read_tag_.Set(
380  call_.call(),
381  [this, reactor](bool ok) {
382  if (GPR_UNLIKELY(!ok)) {
383  ctx_->MaybeMarkCancelledOnRead();
384  }
385  reactor->OnReadDone(ok);
386  this->MaybeDone(/*inlineable_ondone=*/true);
387  },
388  &read_ops_, /*can_inline=*/false);
389  read_ops_.set_core_cq_tag(&read_tag_);
390  this->BindReactor(reactor);
391  this->MaybeCallOnCancel(reactor);
392  // Inlineable OnDone can be false here because there is no read
393  // reactor that has an inlineable OnDone; this only applies to the
394  // DefaultReactor (which is unary).
395  this->MaybeDone(/*inlineable_ondone=*/false);
396  }
397 
398  ~ServerCallbackReaderImpl() {}
399 
400  ResponseType* response() { return &resp_; }
401 
402  void CallOnDone() override {
403  reactor_.load(std::memory_order_relaxed)->OnDone();
404  grpc_call* call = call_.call();
405  auto call_requester = std::move(call_requester_);
406  if (ctx_->context_allocator() != nullptr) {
407  ctx_->context_allocator()->Release(ctx_);
408  }
409  this->~ServerCallbackReaderImpl(); // explicitly call destructor
411  call_requester();
412  }
413 
414  ServerReactor* reactor() override {
415  return reactor_.load(std::memory_order_relaxed);
416  }
417 
419  meta_ops_;
424  finish_ops_;
428  read_ops_;
430 
431  ::grpc::CallbackServerContext* const ctx_;
433  ResponseType resp_;
434  std::function<void()> call_requester_;
435  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
436  std::atomic<ServerReadReactor<RequestType>*> reactor_;
437  // callbacks_outstanding_ follows a refcount pattern
438  std::atomic<intptr_t> callbacks_outstanding_{
439  3}; // reserve for OnStarted, Finish, and CompletionOp
440  };
441 };
442 
443 template <class RequestType, class ResponseType>
445  public:
447  std::function<ServerWriteReactor<ResponseType>*(
448  ::grpc::CallbackServerContext*, const RequestType*)>
449  get_reactor)
450  : get_reactor_(std::move(get_reactor)) {}
451  void RunHandler(const HandlerParameter& param) final {
452  // Arena allocate a writer structure
453  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
454 
456  param.call->call(), sizeof(ServerCallbackWriterImpl)))
457  ServerCallbackWriterImpl(
458  static_cast<::grpc::CallbackServerContext*>(param.server_context),
459  param.call, static_cast<RequestType*>(param.request),
460  param.call_requester);
461  // Inlineable OnDone can be false in the CompletionOp callback because there
462  // is no write reactor that has an inlineable OnDone; this only applies to
463  // the DefaultReactor (which is unary).
464  param.server_context->BeginCompletionOp(
465  param.call,
466  [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
467  writer);
468 
469  ServerWriteReactor<ResponseType>* reactor = nullptr;
470  if (param.status.ok()) {
473  get_reactor_,
474  static_cast<::grpc::CallbackServerContext*>(param.server_context),
475  writer->request());
476  }
477  if (reactor == nullptr) {
478  // if deserialization or reactor creator failed, we need to fail the call
480  param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
483  }
484 
485  writer->SetupReactor(reactor);
486  }
487 
489  ::grpc::Status* status, void** /*handler_data*/) final {
490  ::grpc::ByteBuffer buf;
491  buf.set_buffer(req);
492  auto* request =
494  call, sizeof(RequestType))) RequestType();
495  *status =
497  buf.Release();
498  if (status->ok()) {
499  return request;
500  }
501  request->~RequestType();
502  return nullptr;
503  }
504 
505  private:
506  std::function<ServerWriteReactor<ResponseType>*(
507  ::grpc::CallbackServerContext*, const RequestType*)>
508  get_reactor_;
509 
510  class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
511  public:
512  void Finish(::grpc::Status s) override {
513  // A finish tag with only MaybeDone can have its callback inlined
514  // regardless even if OnDone is not inlineable because this callback just
515  // checks a ref and then decides whether or not to dispatch OnDone.
516  finish_tag_.Set(
517  call_.call(),
518  [this](bool) {
519  // Inlineable OnDone can be false here because there is
520  // no write reactor that has an inlineable OnDone; this
521  // only applies to the DefaultReactor (which is unary).
522  this->MaybeDone(/*inlineable_ondone=*/false);
523  },
524  &finish_ops_, /*can_inline=*/true);
525  finish_ops_.set_core_cq_tag(&finish_tag_);
526 
527  if (!ctx_->sent_initial_metadata_) {
528  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
529  ctx_->initial_metadata_flags());
530  if (ctx_->compression_level_set()) {
531  finish_ops_.set_compression_level(ctx_->compression_level());
532  }
533  ctx_->sent_initial_metadata_ = true;
534  }
535  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
536  call_.PerformOps(&finish_ops_);
537  }
538 
539  void SendInitialMetadata() override {
540  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
541  this->Ref();
542  // The callback for this function should not be inlined because it invokes
543  // a user-controlled reaction, but any resulting OnDone can be inlined in
544  // the executor to which this callback is dispatched.
545  meta_tag_.Set(
546  call_.call(),
547  [this](bool ok) {
548  ServerWriteReactor<ResponseType>* reactor =
549  reactor_.load(std::memory_order_relaxed);
550  reactor->OnSendInitialMetadataDone(ok);
551  this->MaybeDone(/*inlineable_ondone=*/true);
552  },
553  &meta_ops_, /*can_inline=*/false);
554  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
555  ctx_->initial_metadata_flags());
556  if (ctx_->compression_level_set()) {
557  meta_ops_.set_compression_level(ctx_->compression_level());
558  }
559  ctx_->sent_initial_metadata_ = true;
560  meta_ops_.set_core_cq_tag(&meta_tag_);
561  call_.PerformOps(&meta_ops_);
562  }
563 
564  void Write(const ResponseType* resp,
565  ::grpc::WriteOptions options) override {
566  this->Ref();
567  if (options.is_last_message()) {
568  options.set_buffer_hint();
569  }
570  if (!ctx_->sent_initial_metadata_) {
571  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
572  ctx_->initial_metadata_flags());
573  if (ctx_->compression_level_set()) {
574  write_ops_.set_compression_level(ctx_->compression_level());
575  }
576  ctx_->sent_initial_metadata_ = true;
577  }
578  // TODO(vjpai): don't assert
579  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
580  call_.PerformOps(&write_ops_);
581  }
582 
583  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
584  ::grpc::Status s) override {
585  // This combines the write into the finish callback
586  // TODO(vjpai): don't assert
587  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
588  Finish(std::move(s));
589  }
590 
591  private:
592  friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
593 
594  ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
595  ::grpc::internal::Call* call,
596  const RequestType* req,
597  std::function<void()> call_requester)
598  : ctx_(ctx),
599  call_(*call),
600  req_(req),
601  call_requester_(std::move(call_requester)) {}
602 
603  void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
604  reactor_.store(reactor, std::memory_order_relaxed);
605  // The callback for this function should not be inlined because it invokes
606  // a user-controlled reaction, but any resulting OnDone can be inlined in
607  // the executor to which this callback is dispatched.
608  write_tag_.Set(
609  call_.call(),
610  [this, reactor](bool ok) {
611  reactor->OnWriteDone(ok);
612  this->MaybeDone(/*inlineable_ondone=*/true);
613  },
614  &write_ops_, /*can_inline=*/false);
615  write_ops_.set_core_cq_tag(&write_tag_);
616  this->BindReactor(reactor);
617  this->MaybeCallOnCancel(reactor);
618  // Inlineable OnDone can be false here because there is no write
619  // reactor that has an inlineable OnDone; this only applies to the
620  // DefaultReactor (which is unary).
621  this->MaybeDone(/*inlineable_ondone=*/false);
622  }
623  ~ServerCallbackWriterImpl() {
624  if (req_ != nullptr) {
625  req_->~RequestType();
626  }
627  }
628 
629  const RequestType* request() { return req_; }
630 
631  void CallOnDone() override {
632  reactor_.load(std::memory_order_relaxed)->OnDone();
633  grpc_call* call = call_.call();
634  auto call_requester = std::move(call_requester_);
635  if (ctx_->context_allocator() != nullptr) {
636  ctx_->context_allocator()->Release(ctx_);
637  }
638  this->~ServerCallbackWriterImpl(); // explicitly call destructor
640  call_requester();
641  }
642 
643  ServerReactor* reactor() override {
644  return reactor_.load(std::memory_order_relaxed);
645  }
646 
648  meta_ops_;
653  finish_ops_;
657  write_ops_;
659 
660  ::grpc::CallbackServerContext* const ctx_;
662  const RequestType* req_;
663  std::function<void()> call_requester_;
664  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
665  std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
666  // callbacks_outstanding_ follows a refcount pattern
667  std::atomic<intptr_t> callbacks_outstanding_{
668  3}; // reserve for OnStarted, Finish, and CompletionOp
669  };
670 };
671 
672 template <class RequestType, class ResponseType>
674  public:
678  get_reactor)
679  : get_reactor_(std::move(get_reactor)) {}
680  void RunHandler(const HandlerParameter& param) final {
681  ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
682 
684  param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
685  ServerCallbackReaderWriterImpl(
686  static_cast<::grpc::CallbackServerContext*>(param.server_context),
687  param.call, param.call_requester);
688  // Inlineable OnDone can be false in the CompletionOp callback because there
689  // is no bidi reactor that has an inlineable OnDone; this only applies to
690  // the DefaultReactor (which is unary).
691  param.server_context->BeginCompletionOp(
692  param.call,
693  [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
694  stream);
695 
697  if (param.status.ok()) {
700  get_reactor_,
701  static_cast<::grpc::CallbackServerContext*>(param.server_context));
702  }
703 
704  if (reactor == nullptr) {
705  // if deserialization or reactor creator failed, we need to fail the call
707  param.call->call(),
711  }
712 
713  stream->SetupReactor(reactor);
714  }
715 
716  private:
717  std::function<ServerBidiReactor<RequestType, ResponseType>*(
719  get_reactor_;
720 
721  class ServerCallbackReaderWriterImpl
722  : public ServerCallbackReaderWriter<RequestType, ResponseType> {
723  public:
724  void Finish(::grpc::Status s) override {
725  // A finish tag with only MaybeDone can have its callback inlined
726  // regardless even if OnDone is not inlineable because this callback just
727  // checks a ref and then decides whether or not to dispatch OnDone.
728  finish_tag_.Set(
729  call_.call(),
730  [this](bool) {
731  // Inlineable OnDone can be false here because there is
732  // no bidi reactor that has an inlineable OnDone; this
733  // only applies to the DefaultReactor (which is unary).
734  this->MaybeDone(/*inlineable_ondone=*/false);
735  },
736  &finish_ops_, /*can_inline=*/true);
737  finish_ops_.set_core_cq_tag(&finish_tag_);
738 
739  if (!ctx_->sent_initial_metadata_) {
740  finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
741  ctx_->initial_metadata_flags());
742  if (ctx_->compression_level_set()) {
743  finish_ops_.set_compression_level(ctx_->compression_level());
744  }
745  ctx_->sent_initial_metadata_ = true;
746  }
747  finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
748  call_.PerformOps(&finish_ops_);
749  }
750 
751  void SendInitialMetadata() override {
752  GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
753  this->Ref();
754  // The callback for this function should not be inlined because it invokes
755  // a user-controlled reaction, but any resulting OnDone can be inlined in
756  // the executor to which this callback is dispatched.
757  meta_tag_.Set(
758  call_.call(),
759  [this](bool ok) {
760  ServerBidiReactor<RequestType, ResponseType>* reactor =
761  reactor_.load(std::memory_order_relaxed);
762  reactor->OnSendInitialMetadataDone(ok);
763  this->MaybeDone(/*inlineable_ondone=*/true);
764  },
765  &meta_ops_, /*can_inline=*/false);
766  meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
767  ctx_->initial_metadata_flags());
768  if (ctx_->compression_level_set()) {
769  meta_ops_.set_compression_level(ctx_->compression_level());
770  }
771  ctx_->sent_initial_metadata_ = true;
772  meta_ops_.set_core_cq_tag(&meta_tag_);
773  call_.PerformOps(&meta_ops_);
774  }
775 
776  void Write(const ResponseType* resp,
777  ::grpc::WriteOptions options) override {
778  this->Ref();
779  if (options.is_last_message()) {
780  options.set_buffer_hint();
781  }
782  if (!ctx_->sent_initial_metadata_) {
783  write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
784  ctx_->initial_metadata_flags());
785  if (ctx_->compression_level_set()) {
786  write_ops_.set_compression_level(ctx_->compression_level());
787  }
788  ctx_->sent_initial_metadata_ = true;
789  }
790  // TODO(vjpai): don't assert
791  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
792  call_.PerformOps(&write_ops_);
793  }
794 
795  void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
796  ::grpc::Status s) override {
797  // TODO(vjpai): don't assert
798  GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
799  Finish(std::move(s));
800  }
801 
802  void Read(RequestType* req) override {
803  this->Ref();
804  read_ops_.RecvMessage(req);
805  call_.PerformOps(&read_ops_);
806  }
807 
808  private:
809  friend class CallbackBidiHandler<RequestType, ResponseType>;
810 
811  ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
812  ::grpc::internal::Call* call,
813  std::function<void()> call_requester)
814  : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
815 
816  void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
817  reactor_.store(reactor, std::memory_order_relaxed);
818  // The callbacks for these functions should not be inlined because they
819  // invoke user-controlled reactions, but any resulting OnDones can be
820  // inlined in the executor to which a callback is dispatched.
821  write_tag_.Set(
822  call_.call(),
823  [this, reactor](bool ok) {
824  reactor->OnWriteDone(ok);
825  this->MaybeDone(/*inlineable_ondone=*/true);
826  },
827  &write_ops_, /*can_inline=*/false);
828  write_ops_.set_core_cq_tag(&write_tag_);
829  read_tag_.Set(
830  call_.call(),
831  [this, reactor](bool ok) {
832  if (GPR_UNLIKELY(!ok)) {
833  ctx_->MaybeMarkCancelledOnRead();
834  }
835  reactor->OnReadDone(ok);
836  this->MaybeDone(/*inlineable_ondone=*/true);
837  },
838  &read_ops_, /*can_inline=*/false);
839  read_ops_.set_core_cq_tag(&read_tag_);
840  this->BindReactor(reactor);
841  this->MaybeCallOnCancel(reactor);
842  // Inlineable OnDone can be false here because there is no bidi
843  // reactor that has an inlineable OnDone; this only applies to the
844  // DefaultReactor (which is unary).
845  this->MaybeDone(/*inlineable_ondone=*/false);
846  }
847 
848  void CallOnDone() override {
849  reactor_.load(std::memory_order_relaxed)->OnDone();
850  grpc_call* call = call_.call();
851  auto call_requester = std::move(call_requester_);
852  if (ctx_->context_allocator() != nullptr) {
853  ctx_->context_allocator()->Release(ctx_);
854  }
855  this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor
857  call_requester();
858  }
859 
860  ServerReactor* reactor() override {
861  return reactor_.load(std::memory_order_relaxed);
862  }
863 
865  meta_ops_;
870  finish_ops_;
874  write_ops_;
878  read_ops_;
880 
881  ::grpc::CallbackServerContext* const ctx_;
883  std::function<void()> call_requester_;
884  // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
885  std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
886  // callbacks_outstanding_ follows a refcount pattern
887  std::atomic<intptr_t> callbacks_outstanding_{
888  3}; // reserve for OnStarted, Finish, and CompletionOp
889  };
890 };
891 
892 } // namespace internal
893 } // namespace grpc
894 
895 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
A sequence of bytes.
Definition: byte_buffer.h:60
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
Definition: server_context.h:578
virtual void grpc_call_unref(grpc_call *call)=0
virtual void grpc_call_ref(grpc_call *call)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
virtual void Release()=0
RequestT * request()
Definition: message_allocator.h:45
ResponseT * response()
Definition: message_allocator.h:46
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:268
Definition: server_callback.h:207
virtual void SendInitialMetadata()=0
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:215
Definition: server_callback.h:238
virtual void SendInitialMetadata()=0
Definition: server_callback.h:191
virtual void SendInitialMetadata()=0
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
Definition: server_callback.h:221
virtual void SendInitialMetadata()=0
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:490
Definition: server_callback.h:697
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:578
Did it work? If it didn't, why?
Definition: status.h:31
bool ok() const
Is the status OK?
Definition: status.h:118
Per-message write options.
Definition: call_op_set.h:79
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc_call * call() const
Definition: call.h:69
Definition: call_op_set.h:424
Definition: call_op_set.h:212
Definition: call_op_set.h:282
Definition: call_op_set.h:654
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:867
Definition: server_callback_handlers.h:673
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:675
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:680
Definition: server_callback_handlers.h:252
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:259
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:254
Definition: server_callback_handlers.h:444
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:446
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:488
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:451
Definition: server_callback_handlers.h:31
void RunHandler(const HandlerParameter &param) final
Definition: server_callback_handlers.h:44
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:79
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:135
Definition: server_callback.h:161
Definition: server_callback.h:768
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:107
void MaybeDone()
Definition: server_callback.h:93
void Ref()
Increases the reference count.
Definition: server_callback.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
::grpc::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:798
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:52
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:774
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Null-initializes the global gRPC variables for the codegen library.
Definition: completion_queue.h:96
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
Definition: async_unary_call.h:398
Definition: rpc_service_method.h:41
Definition: grpc_types.h:40
Definition: call.cc:139