GRPC C++  1.39.1
client_callback.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #include <atomic>
21 #include <functional>
22 
31 
32 namespace grpc {
33 class Channel;
34 class ClientContext;
35 
36 namespace internal {
37 class RpcMethod;
38 
45 template <class InputMessage, class OutputMessage,
46  class BaseInputMessage = InputMessage,
47  class BaseOutputMessage = OutputMessage>
49  const ::grpc::internal::RpcMethod& method,
50  ::grpc::ClientContext* context,
51  const InputMessage* request, OutputMessage* result,
52  std::function<void(::grpc::Status)> on_completion) {
53  static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
54  "Invalid input message specification");
55  static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
56  "Invalid output message specification");
58  channel, method, context, request, result, on_completion);
59 }
60 
61 template <class InputMessage, class OutputMessage>
63  public:
65  const ::grpc::internal::RpcMethod& method,
66  ::grpc::ClientContext* context,
67  const InputMessage* request, OutputMessage* result,
68  std::function<void(::grpc::Status)> on_completion) {
69  ::grpc::CompletionQueue* cq = channel->CallbackCQ();
70  GPR_CODEGEN_ASSERT(cq != nullptr);
71  grpc::internal::Call call(channel->CreateCall(method, context, cq));
72 
73  using FullCallOpSet = grpc::internal::CallOpSet<
80 
81  struct OpSetAndTag {
82  FullCallOpSet opset;
84  };
85  const size_t alloc_sz = sizeof(OpSetAndTag);
86  auto* const alloced = static_cast<OpSetAndTag*>(
88  alloc_sz));
89  auto* ops = new (&alloced->opset) FullCallOpSet;
90  auto* tag = new (&alloced->tag)
91  grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
92 
93  // TODO(vjpai): Unify code with sync API as much as possible
94  ::grpc::Status s = ops->SendMessagePtr(request);
95  if (!s.ok()) {
96  tag->force_run(s);
97  return;
98  }
99  ops->SendInitialMetadata(&context->send_initial_metadata_,
100  context->initial_metadata_flags());
101  ops->RecvInitialMetadata(context);
102  ops->RecvMessage(result);
103  ops->AllowNoMessage();
104  ops->ClientSendClose();
105  ops->ClientRecvStatus(context, tag->status_ptr());
106  ops->set_core_cq_tag(tag);
107  call.PerformOps(ops);
108  }
109 };
110 
111 // Base class for public API classes.
113  public:
114  virtual ~ClientReactor() = default;
115 
123  virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
124 
133 
141  virtual bool InternalTrailersOnly(const grpc_call* call) const;
142 };
143 
144 } // namespace internal
145 
146 // Forward declarations
147 template <class Request, class Response>
148 class ClientBidiReactor;
149 template <class Response>
150 class ClientReadReactor;
151 template <class Request>
152 class ClientWriteReactor;
153 class ClientUnaryReactor;
154 
155 // NOTE: The streaming objects are not actually implemented in the public API.
156 // These interfaces are provided for mocking only. Typical applications
157 // will interact exclusively with the reactors that they define.
158 template <class Request, class Response>
160  public:
162  virtual void StartCall() = 0;
163  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
164  virtual void WritesDone() = 0;
165  virtual void Read(Response* resp) = 0;
166  virtual void AddHold(int holds) = 0;
167  virtual void RemoveHold() = 0;
168 
169  protected:
171  reactor->BindStream(this);
172  }
173 };
174 
175 template <class Response>
177  public:
179  virtual void StartCall() = 0;
180  virtual void Read(Response* resp) = 0;
181  virtual void AddHold(int holds) = 0;
182  virtual void RemoveHold() = 0;
183 
184  protected:
186  reactor->BindReader(this);
187  }
188 };
189 
190 template <class Request>
192  public:
194  virtual void StartCall() = 0;
195  void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
196  virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
197  void WriteLast(const Request* req, ::grpc::WriteOptions options) {
198  Write(req, options.set_last_message());
199  }
200  virtual void WritesDone() = 0;
201 
202  virtual void AddHold(int holds) = 0;
203  virtual void RemoveHold() = 0;
204 
205  protected:
207  reactor->BindWriter(this);
208  }
209 };
210 
212  public:
213  virtual ~ClientCallbackUnary() {}
214  virtual void StartCall() = 0;
215 
216  protected:
217  void BindReactor(ClientUnaryReactor* reactor);
218 };
219 
220 // The following classes are the reactor interfaces that are to be implemented
221 // by the user. They are passed in to the library as an argument to a call on a
222 // stub (either a codegen-ed call or a generic call). The streaming RPC is
223 // activated by calling StartCall, possibly after initiating StartRead,
224 // StartWrite, or AddHold operations on the streaming object. Note that none of
225 // the classes are pure; all reactions have a default empty reaction so that the
226 // user class only needs to override those reactions that it cares about.
227 // The reactor must be passed to the stub invocation before any of the below
228 // operations can be called and its reactions will be invoked by the library in
229 // response to the completion of various operations. Reactions must not include
230 // blocking operations (such as blocking I/O, starting synchronous RPCs, or
231 // waiting on condition variables). Reactions may be invoked concurrently,
232 // except that OnDone is called after all others (assuming proper API usage).
233 // The reactor may not be deleted until OnDone is called.
234 
236 template <class Request, class Response>
238  public:
243  void StartCall() { stream_->StartCall(); }
244 
250  void StartRead(Response* resp) { stream_->Read(resp); }
251 
258  void StartWrite(const Request* req) {
259  StartWrite(req, ::grpc::WriteOptions());
260  }
261 
268  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
269  stream_->Write(req, options);
270  }
271 
281  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
282  StartWrite(req, options.set_last_message());
283  }
284 
290  void StartWritesDone() { stream_->WritesDone(); }
291 
314  void AddHold() { AddMultipleHolds(1); }
315  void AddMultipleHolds(int holds) {
316  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
317  stream_->AddHold(holds);
318  }
319  void RemoveHold() { stream_->RemoveHold(); }
320 
328  void OnDone(const ::grpc::Status& /*s*/) override {}
329 
338  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
339 
344  virtual void OnReadDone(bool /*ok*/) {}
345 
351  virtual void OnWriteDone(bool /*ok*/) {}
352 
360  virtual void OnWritesDoneDone(bool /*ok*/) {}
361 
362  private:
363  friend class ClientCallbackReaderWriter<Request, Response>;
364  void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
365  stream_ = stream;
366  }
368 };
369 
372 template <class Response>
374  public:
375  void StartCall() { reader_->StartCall(); }
376  void StartRead(Response* resp) { reader_->Read(resp); }
377 
378  void AddHold() { AddMultipleHolds(1); }
379  void AddMultipleHolds(int holds) {
380  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
381  reader_->AddHold(holds);
382  }
383  void RemoveHold() { reader_->RemoveHold(); }
384 
385  void OnDone(const ::grpc::Status& /*s*/) override {}
386  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
387  virtual void OnReadDone(bool /*ok*/) {}
388 
389  private:
390  friend class ClientCallbackReader<Response>;
391  void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
393 };
394 
397 template <class Request>
399  public:
400  void StartCall() { writer_->StartCall(); }
401  void StartWrite(const Request* req) {
402  StartWrite(req, ::grpc::WriteOptions());
403  }
404  void StartWrite(const Request* req, ::grpc::WriteOptions options) {
405  writer_->Write(req, options);
406  }
407  void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
408  StartWrite(req, options.set_last_message());
409  }
410  void StartWritesDone() { writer_->WritesDone(); }
411 
412  void AddHold() { AddMultipleHolds(1); }
413  void AddMultipleHolds(int holds) {
414  GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
415  writer_->AddHold(holds);
416  }
417  void RemoveHold() { writer_->RemoveHold(); }
418 
419  void OnDone(const ::grpc::Status& /*s*/) override {}
420  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
421  virtual void OnWriteDone(bool /*ok*/) {}
422  virtual void OnWritesDoneDone(bool /*ok*/) {}
423 
424  private:
425  friend class ClientCallbackWriter<Request>;
426  void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
427 
429 };
430 
443  public:
444  void StartCall() { call_->StartCall(); }
445  void OnDone(const ::grpc::Status& /*s*/) override {}
446  virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
447 
448  private:
449  friend class ClientCallbackUnary;
450  void BindCall(ClientCallbackUnary* call) { call_ = call; }
451  ClientCallbackUnary* call_;
452 };
453 
454 // Define function out-of-line from class to avoid forward declaration issue
456  reactor->BindCall(this);
457 }
458 
459 namespace internal {
460 
461 // Forward declare factory classes for friendship
462 template <class Request, class Response>
463 class ClientCallbackReaderWriterFactory;
464 template <class Response>
465 class ClientCallbackReaderFactory;
466 template <class Request>
467 class ClientCallbackWriterFactory;
468 
469 template <class Request, class Response>
471  : public ClientCallbackReaderWriter<Request, Response> {
472  public:
473  // always allocated against a call arena, no memory free required
474  static void operator delete(void* /*ptr*/, std::size_t size) {
476  }
477 
478  // This operator should never be called as the memory should be freed as part
479  // of the arena destruction. It only exists to provide a matching operator
480  // delete to the operator new so that some compilers will not complain (see
481  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
482  // there are no tests catching the compiler warning.
483  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
484 
485  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
486  // This call initiates two batches, plus any backlog, each with a callback
487  // 1. Send initial metadata (unless corked) + recv initial metadata
488  // 2. Any read backlog
489  // 3. Any write backlog
490  // 4. Recv trailing metadata (unless corked)
491  if (!start_corked_) {
492  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
493  context_->initial_metadata_flags());
494  }
495 
496  call_.PerformOps(&start_ops_);
497 
498  {
499  grpc::internal::MutexLock lock(&start_mu_);
500 
501  if (backlog_.read_ops) {
502  call_.PerformOps(&read_ops_);
503  }
504  if (backlog_.write_ops) {
505  call_.PerformOps(&write_ops_);
506  }
507  if (backlog_.writes_done_ops) {
508  call_.PerformOps(&writes_done_ops_);
509  }
510  call_.PerformOps(&finish_ops_);
511  // The last thing in this critical section is to set started_ so that it
512  // can be used lock-free as well.
513  started_.store(true, std::memory_order_release);
514  }
515  // MaybeFinish outside the lock to make sure that destruction of this object
516  // doesn't take place while holding the lock (which would cause the lock to
517  // be released after destruction)
518  this->MaybeFinish(/*from_reaction=*/false);
519  }
520 
521  void Read(Response* msg) override {
522  read_ops_.RecvMessage(msg);
523  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
524  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
525  grpc::internal::MutexLock lock(&start_mu_);
526  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
527  backlog_.read_ops = true;
528  return;
529  }
530  }
531  call_.PerformOps(&read_ops_);
532  }
533 
534  void Write(const Request* msg, ::grpc::WriteOptions options)
535  ABSL_LOCKS_EXCLUDED(start_mu_) override {
536  if (options.is_last_message()) {
537  options.set_buffer_hint();
538  write_ops_.ClientSendClose();
539  }
540  // TODO(vjpai): don't assert
541  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
542  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
543  if (GPR_UNLIKELY(corked_write_needed_)) {
544  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
545  context_->initial_metadata_flags());
546  corked_write_needed_ = false;
547  }
548 
549  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
550  grpc::internal::MutexLock lock(&start_mu_);
551  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
552  backlog_.write_ops = true;
553  return;
554  }
555  }
556  call_.PerformOps(&write_ops_);
557  }
558  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
559  writes_done_ops_.ClientSendClose();
560  writes_done_tag_.Set(
561  call_.call(),
562  [this](bool ok) {
563  reactor_->OnWritesDoneDone(ok);
564  MaybeFinish(/*from_reaction=*/true);
565  },
566  &writes_done_ops_, /*can_inline=*/false);
567  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
568  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
569  if (GPR_UNLIKELY(corked_write_needed_)) {
570  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
571  context_->initial_metadata_flags());
572  corked_write_needed_ = false;
573  }
574  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
575  grpc::internal::MutexLock lock(&start_mu_);
576  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
577  backlog_.writes_done_ops = true;
578  return;
579  }
580  }
581  call_.PerformOps(&writes_done_ops_);
582  }
583 
584  void AddHold(int holds) override {
585  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
586  }
587  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
588 
589  private:
590  friend class ClientCallbackReaderWriterFactory<Request, Response>;
591 
593  ::grpc::ClientContext* context,
595  : context_(context),
596  call_(call),
597  reactor_(reactor),
598  start_corked_(context_->initial_metadata_corked_),
599  corked_write_needed_(start_corked_) {
600  this->BindReactor(reactor);
601 
602  // Set up the unchanging parts of the start, read, and write tags and ops.
603  start_tag_.Set(
604  call_.call(),
605  [this](bool ok) {
606  reactor_->OnReadInitialMetadataDone(
607  ok && !reactor_->InternalTrailersOnly(call_.call()));
608  MaybeFinish(/*from_reaction=*/true);
609  },
610  &start_ops_, /*can_inline=*/false);
611  start_ops_.RecvInitialMetadata(context_);
612  start_ops_.set_core_cq_tag(&start_tag_);
613 
614  write_tag_.Set(
615  call_.call(),
616  [this](bool ok) {
617  reactor_->OnWriteDone(ok);
618  MaybeFinish(/*from_reaction=*/true);
619  },
620  &write_ops_, /*can_inline=*/false);
621  write_ops_.set_core_cq_tag(&write_tag_);
622 
623  read_tag_.Set(
624  call_.call(),
625  [this](bool ok) {
626  reactor_->OnReadDone(ok);
627  MaybeFinish(/*from_reaction=*/true);
628  },
629  &read_ops_, /*can_inline=*/false);
630  read_ops_.set_core_cq_tag(&read_tag_);
631 
632  // Also set up the Finish tag and op set.
633  finish_tag_.Set(
634  call_.call(),
635  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
636  &finish_ops_,
637  /*can_inline=*/false);
638  finish_ops_.ClientRecvStatus(context_, &finish_status_);
639  finish_ops_.set_core_cq_tag(&finish_tag_);
640  }
641 
642  // MaybeFinish can be called from reactions or from user-initiated operations
643  // like StartCall or RemoveHold. If this is the last operation or hold on this
644  // object, it will invoke the OnDone reaction. If MaybeFinish was called from
645  // a reaction, it can call OnDone directly. If not, it would need to schedule
646  // OnDone onto an executor thread to avoid the possibility of deadlocking with
647  // any locks in the user code that invoked it.
648  void MaybeFinish(bool from_reaction) {
649  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
650  1, std::memory_order_acq_rel) == 1)) {
651  ::grpc::Status s = std::move(finish_status_);
652  auto* reactor = reactor_;
653  auto* call = call_.call();
654  this->~ClientCallbackReaderWriterImpl();
656  if (GPR_LIKELY(from_reaction)) {
657  reactor->OnDone(s);
658  } else {
659  reactor->InternalScheduleOnDone(std::move(s));
660  }
661  }
662  }
663 
664  ::grpc::ClientContext* const context_;
665  grpc::internal::Call call_;
666  ClientBidiReactor<Request, Response>* const reactor_;
667 
670  start_ops_;
672  const bool start_corked_;
673  bool corked_write_needed_; // no lock needed since only accessed in
674  // Write/WritesDone which cannot be concurrent
675 
678  ::grpc::Status finish_status_;
679 
683  write_ops_;
685 
688  writes_done_ops_;
690 
692  read_ops_;
694 
695  struct StartCallBacklog {
696  bool write_ops = false;
697  bool writes_done_ops = false;
698  bool read_ops = false;
699  };
700  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
701 
702  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
703  std::atomic<intptr_t> callbacks_outstanding_{3};
704  std::atomic_bool started_{false};
705  grpc::internal::Mutex start_mu_;
706 };
707 
708 template <class Request, class Response>
710  public:
711  static void Create(::grpc::ChannelInterface* channel,
712  const ::grpc::internal::RpcMethod& method,
713  ::grpc::ClientContext* context,
715  grpc::internal::Call call =
716  channel->CreateCall(method, context, channel->CallbackCQ());
717 
722  reactor);
723  }
724 };
725 
726 template <class Response>
728  public:
729  // always allocated against a call arena, no memory free required
730  static void operator delete(void* /*ptr*/, std::size_t size) {
732  }
733 
734  // This operator should never be called as the memory should be freed as part
735  // of the arena destruction. It only exists to provide a matching operator
736  // delete to the operator new so that some compilers will not complain (see
737  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
738  // there are no tests catching the compiler warning.
739  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
740 
741  void StartCall() override {
742  // This call initiates two batches, plus any backlog, each with a callback
743  // 1. Send initial metadata (unless corked) + recv initial metadata
744  // 2. Any backlog
745  // 3. Recv trailing metadata
746 
747  start_tag_.Set(
748  call_.call(),
749  [this](bool ok) {
750  reactor_->OnReadInitialMetadataDone(
751  ok && !reactor_->InternalTrailersOnly(call_.call()));
752  MaybeFinish(/*from_reaction=*/true);
753  },
754  &start_ops_, /*can_inline=*/false);
755  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
756  context_->initial_metadata_flags());
757  start_ops_.RecvInitialMetadata(context_);
758  start_ops_.set_core_cq_tag(&start_tag_);
759  call_.PerformOps(&start_ops_);
760 
761  // Also set up the read tag so it doesn't have to be set up each time
762  read_tag_.Set(
763  call_.call(),
764  [this](bool ok) {
765  reactor_->OnReadDone(ok);
766  MaybeFinish(/*from_reaction=*/true);
767  },
768  &read_ops_, /*can_inline=*/false);
769  read_ops_.set_core_cq_tag(&read_tag_);
770 
771  {
772  grpc::internal::MutexLock lock(&start_mu_);
773  if (backlog_.read_ops) {
774  call_.PerformOps(&read_ops_);
775  }
776  started_.store(true, std::memory_order_release);
777  }
778 
779  finish_tag_.Set(
780  call_.call(),
781  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
782  &finish_ops_, /*can_inline=*/false);
783  finish_ops_.ClientRecvStatus(context_, &finish_status_);
784  finish_ops_.set_core_cq_tag(&finish_tag_);
785  call_.PerformOps(&finish_ops_);
786  }
787 
788  void Read(Response* msg) override {
789  read_ops_.RecvMessage(msg);
790  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
791  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
792  grpc::internal::MutexLock lock(&start_mu_);
793  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
794  backlog_.read_ops = true;
795  return;
796  }
797  }
798  call_.PerformOps(&read_ops_);
799  }
800 
801  void AddHold(int holds) override {
802  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
803  }
804  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
805 
806  private:
807  friend class ClientCallbackReaderFactory<Response>;
808 
809  template <class Request>
811  ::grpc::ClientContext* context, Request* request,
813  : context_(context), call_(call), reactor_(reactor) {
814  this->BindReactor(reactor);
815  // TODO(vjpai): don't assert
816  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
817  start_ops_.ClientSendClose();
818  }
819 
820  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
821  void MaybeFinish(bool from_reaction) {
822  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
823  1, std::memory_order_acq_rel) == 1)) {
824  ::grpc::Status s = std::move(finish_status_);
825  auto* reactor = reactor_;
826  auto* call = call_.call();
827  this->~ClientCallbackReaderImpl();
829  if (GPR_LIKELY(from_reaction)) {
830  reactor->OnDone(s);
831  } else {
832  reactor->InternalScheduleOnDone(std::move(s));
833  }
834  }
835  }
836 
837  ::grpc::ClientContext* const context_;
838  grpc::internal::Call call_;
839  ClientReadReactor<Response>* const reactor_;
840 
845  start_ops_;
847 
850  ::grpc::Status finish_status_;
851 
853  read_ops_;
855 
856  struct StartCallBacklog {
857  bool read_ops = false;
858  };
859  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
860 
861  // Minimum of 2 callbacks to pre-register for start and finish
862  std::atomic<intptr_t> callbacks_outstanding_{2};
863  std::atomic_bool started_{false};
864  grpc::internal::Mutex start_mu_;
865 };
866 
867 template <class Response>
869  public:
870  template <class Request>
871  static void Create(::grpc::ChannelInterface* channel,
872  const ::grpc::internal::RpcMethod& method,
873  ::grpc::ClientContext* context, const Request* request,
874  ClientReadReactor<Response>* reactor) {
875  grpc::internal::Call call =
876  channel->CreateCall(method, context, channel->CallbackCQ());
877 
880  call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
881  ClientCallbackReaderImpl<Response>(call, context, request, reactor);
882  }
883 };
884 
885 template <class Request>
887  public:
888  // always allocated against a call arena, no memory free required
889  static void operator delete(void* /*ptr*/, std::size_t size) {
891  }
892 
893  // This operator should never be called as the memory should be freed as part
894  // of the arena destruction. It only exists to provide a matching operator
895  // delete to the operator new so that some compilers will not complain (see
896  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
897  // there are no tests catching the compiler warning.
898  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
899 
900  void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override {
901  // This call initiates two batches, plus any backlog, each with a callback
902  // 1. Send initial metadata (unless corked) + recv initial metadata
903  // 2. Any backlog
904  // 3. Recv trailing metadata
905 
906  if (!start_corked_) {
907  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
908  context_->initial_metadata_flags());
909  }
910  call_.PerformOps(&start_ops_);
911 
912  {
913  grpc::internal::MutexLock lock(&start_mu_);
914 
915  if (backlog_.write_ops) {
916  call_.PerformOps(&write_ops_);
917  }
918  if (backlog_.writes_done_ops) {
919  call_.PerformOps(&writes_done_ops_);
920  }
921  call_.PerformOps(&finish_ops_);
922  // The last thing in this critical section is to set started_ so that it
923  // can be used lock-free as well.
924  started_.store(true, std::memory_order_release);
925  }
926  // MaybeFinish outside the lock to make sure that destruction of this object
927  // doesn't take place while holding the lock (which would cause the lock to
928  // be released after destruction)
929  this->MaybeFinish(/*from_reaction=*/false);
930  }
931 
932  void Write(const Request* msg, ::grpc::WriteOptions options)
933  ABSL_LOCKS_EXCLUDED(start_mu_) override {
934  if (GPR_UNLIKELY(options.is_last_message())) {
935  options.set_buffer_hint();
936  write_ops_.ClientSendClose();
937  }
938  // TODO(vjpai): don't assert
939  GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
940  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
941 
942  if (GPR_UNLIKELY(corked_write_needed_)) {
943  write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
944  context_->initial_metadata_flags());
945  corked_write_needed_ = false;
946  }
947 
948  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
949  grpc::internal::MutexLock lock(&start_mu_);
950  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
951  backlog_.write_ops = true;
952  return;
953  }
954  }
955  call_.PerformOps(&write_ops_);
956  }
957 
958  void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override {
959  writes_done_ops_.ClientSendClose();
960  writes_done_tag_.Set(
961  call_.call(),
962  [this](bool ok) {
963  reactor_->OnWritesDoneDone(ok);
964  MaybeFinish(/*from_reaction=*/true);
965  },
966  &writes_done_ops_, /*can_inline=*/false);
967  writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
968  callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
969 
970  if (GPR_UNLIKELY(corked_write_needed_)) {
971  writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
972  context_->initial_metadata_flags());
973  corked_write_needed_ = false;
974  }
975 
976  if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
977  grpc::internal::MutexLock lock(&start_mu_);
978  if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
979  backlog_.writes_done_ops = true;
980  return;
981  }
982  }
983  call_.PerformOps(&writes_done_ops_);
984  }
985 
986  void AddHold(int holds) override {
987  callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
988  }
989  void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
990 
991  private:
992  friend class ClientCallbackWriterFactory<Request>;
993 
994  template <class Response>
996  ::grpc::ClientContext* context, Response* response,
998  : context_(context),
999  call_(call),
1000  reactor_(reactor),
1001  start_corked_(context_->initial_metadata_corked_),
1002  corked_write_needed_(start_corked_) {
1003  this->BindReactor(reactor);
1004 
1005  // Set up the unchanging parts of the start and write tags and ops.
1006  start_tag_.Set(
1007  call_.call(),
1008  [this](bool ok) {
1009  reactor_->OnReadInitialMetadataDone(
1010  ok && !reactor_->InternalTrailersOnly(call_.call()));
1011  MaybeFinish(/*from_reaction=*/true);
1012  },
1013  &start_ops_, /*can_inline=*/false);
1014  start_ops_.RecvInitialMetadata(context_);
1015  start_ops_.set_core_cq_tag(&start_tag_);
1016 
1017  write_tag_.Set(
1018  call_.call(),
1019  [this](bool ok) {
1020  reactor_->OnWriteDone(ok);
1021  MaybeFinish(/*from_reaction=*/true);
1022  },
1023  &write_ops_, /*can_inline=*/false);
1024  write_ops_.set_core_cq_tag(&write_tag_);
1025 
1026  // Also set up the Finish tag and op set.
1027  finish_ops_.RecvMessage(response);
1028  finish_ops_.AllowNoMessage();
1029  finish_tag_.Set(
1030  call_.call(),
1031  [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1032  &finish_ops_,
1033  /*can_inline=*/false);
1034  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1035  finish_ops_.set_core_cq_tag(&finish_tag_);
1036  }
1037 
1038  // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
1039  void MaybeFinish(bool from_reaction) {
1040  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1041  1, std::memory_order_acq_rel) == 1)) {
1042  ::grpc::Status s = std::move(finish_status_);
1043  auto* reactor = reactor_;
1044  auto* call = call_.call();
1045  this->~ClientCallbackWriterImpl();
1047  if (GPR_LIKELY(from_reaction)) {
1048  reactor->OnDone(s);
1049  } else {
1050  reactor->InternalScheduleOnDone(std::move(s));
1051  }
1052  }
1053  }
1054 
1055  ::grpc::ClientContext* const context_;
1056  grpc::internal::Call call_;
1057  ClientWriteReactor<Request>* const reactor_;
1058 
1061  start_ops_;
1063  const bool start_corked_;
1064  bool corked_write_needed_; // no lock needed since only accessed in
1065  // Write/WritesDone which cannot be concurrent
1066 
1069  finish_ops_;
1071  ::grpc::Status finish_status_;
1072 
1076  write_ops_;
1078 
1081  writes_done_ops_;
1082  grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1083 
1084  struct StartCallBacklog {
1085  bool write_ops = false;
1086  bool writes_done_ops = false;
1087  };
1088  StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1089 
1090  // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1091  std::atomic<intptr_t> callbacks_outstanding_{3};
1092  std::atomic_bool started_{false};
1093  grpc::internal::Mutex start_mu_;
1094 };
1095 
1096 template <class Request>
1098  public:
1099  template <class Response>
1100  static void Create(::grpc::ChannelInterface* channel,
1101  const ::grpc::internal::RpcMethod& method,
1102  ::grpc::ClientContext* context, Response* response,
1103  ClientWriteReactor<Request>* reactor) {
1104  grpc::internal::Call call =
1105  channel->CreateCall(method, context, channel->CallbackCQ());
1106 
1109  call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1110  ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1111  }
1112 };
1113 
1115  public:
1116  // always allocated against a call arena, no memory free required
1117  static void operator delete(void* /*ptr*/, std::size_t size) {
1119  }
1120 
1121  // This operator should never be called as the memory should be freed as part
1122  // of the arena destruction. It only exists to provide a matching operator
1123  // delete to the operator new so that some compilers will not complain (see
1124  // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1125  // there are no tests catching the compiler warning.
1126  static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1127 
1128  void StartCall() override {
1129  // This call initiates two batches, each with a callback
1130  // 1. Send initial metadata + write + writes done + recv initial metadata
1131  // 2. Read message, recv trailing metadata
1132 
1133  start_tag_.Set(
1134  call_.call(),
1135  [this](bool ok) {
1136  reactor_->OnReadInitialMetadataDone(
1137  ok && !reactor_->InternalTrailersOnly(call_.call()));
1138  MaybeFinish();
1139  },
1140  &start_ops_, /*can_inline=*/false);
1141  start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1142  context_->initial_metadata_flags());
1143  start_ops_.RecvInitialMetadata(context_);
1144  start_ops_.set_core_cq_tag(&start_tag_);
1145  call_.PerformOps(&start_ops_);
1146 
1147  finish_tag_.Set(
1148  call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1149  /*can_inline=*/false);
1150  finish_ops_.ClientRecvStatus(context_, &finish_status_);
1151  finish_ops_.set_core_cq_tag(&finish_tag_);
1152  call_.PerformOps(&finish_ops_);
1153  }
1154 
1155  private:
1157 
1158  template <class Request, class Response>
1160  ::grpc::ClientContext* context, Request* request,
1161  Response* response, ClientUnaryReactor* reactor)
1162  : context_(context), call_(call), reactor_(reactor) {
1163  this->BindReactor(reactor);
1164  // TODO(vjpai): don't assert
1165  GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1166  start_ops_.ClientSendClose();
1167  finish_ops_.RecvMessage(response);
1168  finish_ops_.AllowNoMessage();
1169  }
1170 
1171  // In the unary case, MaybeFinish is only ever invoked from a
1172  // library-initiated reaction, so it will just directly call OnDone if this is
1173  // the last reaction for this RPC.
1174  void MaybeFinish() {
1175  if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1176  1, std::memory_order_acq_rel) == 1)) {
1177  ::grpc::Status s = std::move(finish_status_);
1178  auto* reactor = reactor_;
1179  auto* call = call_.call();
1180  this->~ClientCallbackUnaryImpl();
1182  reactor->OnDone(s);
1183  }
1184  }
1185 
1186  ::grpc::ClientContext* const context_;
1187  grpc::internal::Call call_;
1188  ClientUnaryReactor* const reactor_;
1189 
1194  start_ops_;
1196 
1199  finish_ops_;
1201  ::grpc::Status finish_status_;
1202 
1203  // This call will have 2 callbacks: start and finish
1204  std::atomic<intptr_t> callbacks_outstanding_{2};
1205 };
1206 
1208  public:
1209  template <class Request, class Response, class BaseRequest = Request,
1210  class BaseResponse = Response>
1211  static void Create(::grpc::ChannelInterface* channel,
1212  const ::grpc::internal::RpcMethod& method,
1213  ::grpc::ClientContext* context, const Request* request,
1214  Response* response, ClientUnaryReactor* reactor) {
1215  grpc::internal::Call call =
1216  channel->CreateCall(method, context, channel->CallbackCQ());
1217 
1219 
1221  call.call(), sizeof(ClientCallbackUnaryImpl)))
1222  ClientCallbackUnaryImpl(call, context,
1223  static_cast<const BaseRequest*>(request),
1224  static_cast<BaseResponse*>(response), reactor);
1225  }
1226 };
1227 
1228 } // namespace internal
1229 
1230 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
1231 namespace experimental {
1232 
1233 template <class Response>
1235 
1236 template <class Request>
1238 
1239 template <class Request, class Response>
1242 
1243 template <class Response>
1245 
1246 template <class Request>
1248 
1249 template <class Request, class Response>
1251 
1253 
1254 } // namespace experimental
1255 
1256 } // namespace grpc
1257 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:237
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:268
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:258
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:290
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:250
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:243
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:281
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:314
void AddMultipleHolds(int holds)
Definition: client_callback.h:315
friend class ClientCallbackReaderWriter< Request, Response >
Definition: client_callback.h:363
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:338
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:351
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:344
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:360
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:328
void RemoveHold()
Definition: client_callback.h:319
Definition: client_callback.h:176
virtual ~ClientCallbackReader()
Definition: client_callback.h:178
virtual void RemoveHold()=0
virtual void Read(Response *resp)=0
virtual void StartCall()=0
virtual void AddHold(int holds)=0
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:185
Definition: client_callback.h:159
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
virtual void Read(Response *resp)=0
virtual void AddHold(int holds)=0
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:161
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:170
Definition: client_callback.h:211
virtual ~ClientCallbackUnary()
Definition: client_callback.h:213
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:455
virtual void StartCall()=0
Definition: client_callback.h:191
virtual void WritesDone()=0
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
void Write(const Request *req)
Definition: client_callback.h:195
virtual ~ClientCallbackWriter()
Definition: client_callback.h:193
virtual void RemoveHold()=0
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:206
virtual void AddHold(int holds)=0
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:197
virtual void StartCall()=0
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:193
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:373
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:386
void StartCall()
Definition: client_callback.h:375
void AddMultipleHolds(int holds)
Definition: client_callback.h:379
void AddHold()
Definition: client_callback.h:378
virtual void OnReadDone(bool)
Definition: client_callback.h:387
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:385
void StartRead(Response *resp)
Definition: client_callback.h:376
friend class ClientCallbackReader< Response >
Definition: client_callback.h:390
void RemoveHold()
Definition: client_callback.h:383
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:442
void StartCall()
Definition: client_callback.h:444
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:446
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:445
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:398
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:419
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:420
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:422
void StartWrite(const Request *req)
Definition: client_callback.h:401
void AddMultipleHolds(int holds)
Definition: client_callback.h:413
void RemoveHold()
Definition: client_callback.h:417
virtual void OnWriteDone(bool)
Definition: client_callback.h:421
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:404
void StartWritesDone()
Definition: client_callback.h:410
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:407
friend class ClientCallbackWriter< Request >
Definition: client_callback.h:425
void StartCall()
Definition: client_callback.h:400
void AddHold()
Definition: client_callback.h:412
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
virtual void grpc_call_unref(grpc_call *call)=0
virtual void grpc_call_ref(grpc_call *call)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
Did it work? If it didn't, why?
Definition: status.h:31
bool ok() const
Is the status OK?
Definition: status.h:118
Per-message write options.
Definition: call_op_set.h:79
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc_call * call() const
Definition: call.h:69
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
Definition: call_op_set.h:769
void ClientRecvStatus(::grpc::ClientContext *context, Status *status)
Definition: call_op_set.h:774
Definition: call_op_set.h:619
void ClientSendClose()
Definition: call_op_set.h:623
Definition: call_op_set.h:526
void RecvMessage(R *message)
Definition: call_op_set.h:529
void AllowNoMessage()
Definition: call_op_set.h:538
Definition: call_op_set.h:721
void RecvInitialMetadata(::grpc::ClientContext *context)
Definition: call_op_set.h:725
Definition: call_op_set.h:424
void RecvMessage(R *message)
Definition: call_op_set.h:426
Definition: call_op_set.h:212
void SendInitialMetadata(std::multimap< std::string, std::string > *metadata, uint32_t flags)
Definition: call_op_set.h:218
Definition: call_op_set.h:282
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:397
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:867
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:944
Definition: client_callback.h:62
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback.h:64
Definition: callback_common.h:69
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:135
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:162
Definition: client_callback.h:868
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:871
Definition: client_callback.h:727
void Read(Response *msg) override
Definition: client_callback.h:788
void StartCall() override
Definition: client_callback.h:741
void AddHold(int holds) override
Definition: client_callback.h:801
void RemoveHold() override
Definition: client_callback.h:804
Definition: client_callback.h:709
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:711
Definition: client_callback.h:471
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:558
void Write(const Request *msg, ::grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:534
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:485
void Read(Response *msg) override
Definition: client_callback.h:521
void AddHold(int holds) override
Definition: client_callback.h:584
void RemoveHold() override
Definition: client_callback.h:587
Definition: client_callback.h:1207
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1211
Definition: client_callback.h:1114
void StartCall() override
Definition: client_callback.h:1128
Definition: client_callback.h:1097
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1100
Definition: client_callback.h:886
void AddHold(int holds) override
Definition: client_callback.h:986
void Write(const Request *msg, ::grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:932
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:958
void RemoveHold() override
Definition: client_callback.h:989
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:900
Definition: client_callback.h:112
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
virtual bool InternalTrailersOnly(const grpc_call *call) const
InternalTrailersOnly is not part of the API and is not meant to be overridden.
virtual ~ClientReactor()=default
Definition: sync.h:58
Definition: sync.h:85
struct grpc_call grpc_call
A Call represents an RPC.
Definition: grpc_types.h:70
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
#define GPR_LIKELY(x)
Definition: port_platform.h:659
#define GPR_UNLIKELY(x)
Definition: port_platform.h:660
::grpc::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:1252
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:48
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue.h:96