18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
45 template <
class InputMessage,
class OutputMessage,
46 class BaseInputMessage = InputMessage,
47 class BaseOutputMessage = OutputMessage>
49 const ::grpc::internal::RpcMethod& method,
51 const InputMessage* request, OutputMessage* result,
53 static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
54 "Invalid input message specification");
55 static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
56 "Invalid output message specification");
58 channel, method, context, request, result, on_completion);
61 template <
class InputMessage,
class OutputMessage>
65 const ::grpc::internal::RpcMethod& method,
67 const InputMessage* request, OutputMessage* result,
85 const size_t alloc_sz =
sizeof(OpSetAndTag);
86 auto*
const alloced =
static_cast<OpSetAndTag*
>(
89 auto* ops =
new (&alloced->opset) FullCallOpSet;
90 auto* tag =
new (&alloced->tag)
99 ops->SendInitialMetadata(&context->send_initial_metadata_,
100 context->initial_metadata_flags());
101 ops->RecvInitialMetadata(context);
102 ops->RecvMessage(result);
103 ops->AllowNoMessage();
104 ops->ClientSendClose();
105 ops->ClientRecvStatus(context, tag->status_ptr());
106 ops->set_core_cq_tag(tag);
147 template <
class Request,
class Response>
149 template <
class Response>
151 template <
class Request>
158 template <
class Request,
class Response>
165 virtual void Read(Response* resp) = 0;
171 reactor->BindStream(
this);
175 template <
class Response>
180 virtual void Read(Response* resp) = 0;
186 reactor->BindReader(
this);
190 template <
class Request>
207 reactor->BindWriter(
this);
236 template <
class Request,
class Response>
269 stream_->Write(req, options);
317 stream_->AddHold(holds);
372 template <
class Response>
381 reader_->AddHold(holds);
397 template <
class Request>
405 writer_->Write(req, options);
415 writer_->AddHold(holds);
456 reactor->BindCall(
this);
462 template <
class Request,
class Response>
463 class ClientCallbackReaderWriterFactory;
464 template <
class Response>
465 class ClientCallbackReaderFactory;
466 template <
class Request>
467 class ClientCallbackWriterFactory;
469 template <
class Request,
class Response>
474 static void operator delete(
void* , std::size_t size) {
485 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_)
override {
491 if (!start_corked_) {
493 context_->initial_metadata_flags());
501 if (backlog_.read_ops) {
504 if (backlog_.write_ops) {
507 if (backlog_.writes_done_ops) {
513 started_.store(
true, std::memory_order_release);
518 this->MaybeFinish(
false);
521 void Read(Response* msg)
override {
523 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
524 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
526 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
527 backlog_.read_ops =
true;
535 ABSL_LOCKS_EXCLUDED(start_mu_)
override {
536 if (options.is_last_message()) {
537 options.set_buffer_hint();
542 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
545 context_->initial_metadata_flags());
546 corked_write_needed_ =
false;
549 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
551 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
552 backlog_.write_ops =
true;
560 writes_done_tag_.
Set(
563 reactor_->OnWritesDoneDone(ok);
566 &writes_done_ops_,
false);
568 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
571 context_->initial_metadata_flags());
572 corked_write_needed_ =
false;
574 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
576 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
577 backlog_.writes_done_ops =
true;
585 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
598 start_corked_(context_->initial_metadata_corked_),
599 corked_write_needed_(start_corked_) {
606 reactor_->OnReadInitialMetadataDone(
607 ok && !reactor_->InternalTrailersOnly(call_.call()));
617 reactor_->OnWriteDone(ok);
626 reactor_->OnReadDone(ok);
635 [
this](
bool ) { MaybeFinish(true); },
648 void MaybeFinish(
bool from_reaction) {
650 1, std::memory_order_acq_rel) == 1)) {
652 auto* reactor = reactor_;
653 auto* call = call_.
call();
654 this->~ClientCallbackReaderWriterImpl();
666 ClientBidiReactor<Request, Response>*
const reactor_;
672 const bool start_corked_;
673 bool corked_write_needed_;
695 struct StartCallBacklog {
696 bool write_ops =
false;
697 bool writes_done_ops =
false;
698 bool read_ops =
false;
700 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
703 std::atomic<intptr_t> callbacks_outstanding_{3};
704 std::atomic_bool started_{
false};
708 template <
class Request,
class Response>
712 const ::grpc::internal::RpcMethod& method,
716 channel->CreateCall(method, context, channel->CallbackCQ());
726 template <
class Response>
730 static void operator delete(
void* , std::size_t size) {
750 reactor_->OnReadInitialMetadataDone(
751 ok && !reactor_->InternalTrailersOnly(call_.call()));
756 context_->initial_metadata_flags());
765 reactor_->OnReadDone(ok);
773 if (backlog_.read_ops) {
776 started_.store(
true, std::memory_order_release);
781 [
this](
bool ) { MaybeFinish(true); },
782 &finish_ops_,
false);
788 void Read(Response* msg)
override {
790 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
791 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
793 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
794 backlog_.read_ops =
true;
802 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
809 template <
class Request>
813 : context_(context), call_(call), reactor_(reactor) {
821 void MaybeFinish(
bool from_reaction) {
823 1, std::memory_order_acq_rel) == 1)) {
825 auto* reactor = reactor_;
826 auto* call = call_.
call();
827 this->~ClientCallbackReaderImpl();
839 ClientReadReactor<Response>*
const reactor_;
856 struct StartCallBacklog {
857 bool read_ops =
false;
859 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
862 std::atomic<intptr_t> callbacks_outstanding_{2};
863 std::atomic_bool started_{
false};
867 template <
class Response>
870 template <
class Request>
872 const ::grpc::internal::RpcMethod& method,
876 channel->CreateCall(method, context, channel->CallbackCQ());
885 template <
class Request>
889 static void operator delete(
void* , std::size_t size) {
900 void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_)
override {
906 if (!start_corked_) {
908 context_->initial_metadata_flags());
915 if (backlog_.write_ops) {
918 if (backlog_.writes_done_ops) {
924 started_.store(
true, std::memory_order_release);
929 this->MaybeFinish(
false);
933 ABSL_LOCKS_EXCLUDED(start_mu_)
override {
935 options.set_buffer_hint();
940 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
944 context_->initial_metadata_flags());
945 corked_write_needed_ =
false;
948 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
950 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
951 backlog_.write_ops =
true;
960 writes_done_tag_.
Set(
963 reactor_->OnWritesDoneDone(ok);
966 &writes_done_ops_,
false);
968 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
972 context_->initial_metadata_flags());
973 corked_write_needed_ =
false;
976 if (
GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
978 if (
GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
979 backlog_.writes_done_ops =
true;
987 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
994 template <
class Response>
1001 start_corked_(context_->initial_metadata_corked_),
1002 corked_write_needed_(start_corked_) {
1009 reactor_->OnReadInitialMetadataDone(
1010 ok && !reactor_->InternalTrailersOnly(call_.call()));
1013 &start_ops_,
false);
1020 reactor_->OnWriteDone(ok);
1023 &write_ops_,
false);
1031 [
this](
bool ) { MaybeFinish(true); },
1039 void MaybeFinish(
bool from_reaction) {
1041 1, std::memory_order_acq_rel) == 1)) {
1043 auto* reactor = reactor_;
1044 auto* call = call_.
call();
1045 this->~ClientCallbackWriterImpl();
1057 ClientWriteReactor<Request>*
const reactor_;
1063 const bool start_corked_;
1064 bool corked_write_needed_;
1084 struct StartCallBacklog {
1085 bool write_ops =
false;
1086 bool writes_done_ops =
false;
1088 StartCallBacklog backlog_ ABSL_GUARDED_BY(start_mu_);
1091 std::atomic<intptr_t> callbacks_outstanding_{3};
1092 std::atomic_bool started_{
false};
1096 template <
class Request>
1099 template <
class Response>
1101 const ::grpc::internal::RpcMethod& method,
1105 channel->CreateCall(method, context, channel->CallbackCQ());
1117 static void operator delete(
void* , std::size_t size) {
1136 reactor_->OnReadInitialMetadataDone(
1137 ok && !reactor_->InternalTrailersOnly(call_.call()));
1140 &start_ops_,
false);
1142 context_->initial_metadata_flags());
1148 call_.
call(), [
this](
bool ) { MaybeFinish(); }, &finish_ops_,
1158 template <
class Request,
class Response>
1162 : context_(context), call_(call), reactor_(reactor) {
1174 void MaybeFinish() {
1176 1, std::memory_order_acq_rel) == 1)) {
1178 auto* reactor = reactor_;
1179 auto* call = call_.
call();
1204 std::atomic<intptr_t> callbacks_outstanding_{2};
1209 template <
class Request,
class Response,
class BaseRequest = Request,
1210 class BaseResponse = Response>
1212 const ::grpc::internal::RpcMethod& method,
1216 channel->CreateCall(method, context, channel->CallbackCQ());
1223 static_cast<const BaseRequest*
>(request),
1224 static_cast<BaseResponse*
>(response), reactor);
1231 namespace experimental {
1233 template <
class Response>
1236 template <
class Request>
1239 template <
class Request,
class Response>
1243 template <
class Response>
1246 template <
class Request>
1249 template <
class Request,
class Response>
Codegen interface for grpc::Channel.
Definition: channel_interface.h:71
ClientBidiReactor is the interface for a bidirectional streaming RPC.
Definition: client_callback.h:237
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options.
Definition: client_callback.h:268
void StartWrite(const Request *req)
Initiate a write operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:258
void StartWritesDone()
Indicate that the RPC will have no more write operations.
Definition: client_callback.h:290
void StartRead(Response *resp)
Initiate a read operation (or post it for later initiation if StartCall has not yet been invoked).
Definition: client_callback.h:250
void StartCall()
Activate the RPC and initiate any reads or writes that have been Start'ed before this call.
Definition: client_callback.h:243
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Initiate/post a write operation with specified options and an indication that this is the last write ...
Definition: client_callback.h:281
void AddHold()
Holds are needed if (and only if) this stream has operations that take place on it after StartCall bu...
Definition: client_callback.h:314
void AddMultipleHolds(int holds)
Definition: client_callback.h:315
friend class ClientCallbackReaderWriter< Request, Response >
Definition: client_callback.h:363
virtual void OnReadInitialMetadataDone(bool)
Notifies the application that a read of initial metadata from the server is done.
Definition: client_callback.h:338
virtual void OnWriteDone(bool)
Notifies the application that a StartWrite or StartWriteLast operation completed.
Definition: client_callback.h:351
virtual void OnReadDone(bool)
Notifies the application that a StartRead operation completed.
Definition: client_callback.h:344
virtual void OnWritesDoneDone(bool)
Notifies the application that a StartWritesDone operation completed.
Definition: client_callback.h:360
void OnDone(const ::grpc::Status &) override
Notifies the application that all operations associated with this RPC have completed and all Holds ha...
Definition: client_callback.h:328
void RemoveHold()
Definition: client_callback.h:319
Definition: client_callback.h:176
virtual ~ClientCallbackReader()
Definition: client_callback.h:178
virtual void RemoveHold()=0
virtual void Read(Response *resp)=0
virtual void StartCall()=0
virtual void AddHold(int holds)=0
void BindReactor(ClientReadReactor< Response > *reactor)
Definition: client_callback.h:185
Definition: client_callback.h:159
virtual void RemoveHold()=0
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
virtual void Read(Response *resp)=0
virtual void StartCall()=0
virtual void AddHold(int holds)=0
virtual void WritesDone()=0
virtual ~ClientCallbackReaderWriter()
Definition: client_callback.h:161
void BindReactor(ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:170
Definition: client_callback.h:211
virtual ~ClientCallbackUnary()
Definition: client_callback.h:213
void BindReactor(ClientUnaryReactor *reactor)
Definition: client_callback.h:455
virtual void StartCall()=0
Definition: client_callback.h:191
virtual void WritesDone()=0
virtual void Write(const Request *req, ::grpc::WriteOptions options)=0
void Write(const Request *req)
Definition: client_callback.h:195
virtual ~ClientCallbackWriter()
Definition: client_callback.h:193
virtual void RemoveHold()=0
void BindReactor(ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:206
virtual void AddHold(int holds)=0
void WriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:197
virtual void StartCall()=0
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:193
ClientReadReactor is the interface for a server-streaming RPC.
Definition: client_callback.h:373
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:386
void StartCall()
Definition: client_callback.h:375
void AddMultipleHolds(int holds)
Definition: client_callback.h:379
void AddHold()
Definition: client_callback.h:378
virtual void OnReadDone(bool)
Definition: client_callback.h:387
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:385
void StartRead(Response *resp)
Definition: client_callback.h:376
friend class ClientCallbackReader< Response >
Definition: client_callback.h:390
void RemoveHold()
Definition: client_callback.h:383
ClientUnaryReactor is a reactor-style interface for a unary RPC.
Definition: client_callback.h:442
void StartCall()
Definition: client_callback.h:444
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:446
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:445
ClientWriteReactor is the interface for a client-streaming RPC.
Definition: client_callback.h:398
void OnDone(const ::grpc::Status &) override
Called by the library when all operations associated with this RPC have completed and all Holds have ...
Definition: client_callback.h:419
virtual void OnReadInitialMetadataDone(bool)
Definition: client_callback.h:420
virtual void OnWritesDoneDone(bool)
Definition: client_callback.h:422
void StartWrite(const Request *req)
Definition: client_callback.h:401
void AddMultipleHolds(int holds)
Definition: client_callback.h:413
void RemoveHold()
Definition: client_callback.h:417
virtual void OnWriteDone(bool)
Definition: client_callback.h:421
void StartWrite(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:404
void StartWritesDone()
Definition: client_callback.h:410
void StartWriteLast(const Request *req, ::grpc::WriteOptions options)
Definition: client_callback.h:407
friend class ClientCallbackWriter< Request >
Definition: client_callback.h:425
void StartCall()
Definition: client_callback.h:400
void AddHold()
Definition: client_callback.h:412
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue....
Definition: completion_queue.h:102
virtual void grpc_call_unref(grpc_call *call)=0
virtual void grpc_call_ref(grpc_call *call)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
Did it work? If it didn't, why?
Definition: status.h:31
bool ok() const
Is the status OK?
Definition: status.h:118
Per-message write options.
Definition: call_op_set.h:79
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:156
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc_call * call() const
Definition: call.h:69
void PerformOps(CallOpSetInterface *ops)
Definition: call.h:65
Definition: call_op_set.h:769
void ClientRecvStatus(::grpc::ClientContext *context, Status *status)
Definition: call_op_set.h:774
Definition: call_op_set.h:619
void ClientSendClose()
Definition: call_op_set.h:623
Definition: call_op_set.h:526
void RecvMessage(R *message)
Definition: call_op_set.h:529
void AllowNoMessage()
Definition: call_op_set.h:538
Definition: call_op_set.h:424
void RecvMessage(R *message)
Definition: call_op_set.h:426
Definition: call_op_set.h:282
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:397
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:867
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:944
Definition: client_callback.h:62
CallbackUnaryCallImpl(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Definition: client_callback.h:64
Definition: callback_common.h:69
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:135
void Set(grpc_call *call, std::function< void(bool)> f, CompletionQueueTag *ops, bool can_inline)
Definition: callback_common.h:162
Definition: client_callback.h:868
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, ClientReadReactor< Response > *reactor)
Definition: client_callback.h:871
Definition: client_callback.h:727
void Read(Response *msg) override
Definition: client_callback.h:788
void StartCall() override
Definition: client_callback.h:741
void AddHold(int holds) override
Definition: client_callback.h:801
void RemoveHold() override
Definition: client_callback.h:804
Definition: client_callback.h:709
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, ClientBidiReactor< Request, Response > *reactor)
Definition: client_callback.h:711
Definition: client_callback.h:471
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:558
void Write(const Request *msg, ::grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:534
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:485
void Read(Response *msg) override
Definition: client_callback.h:521
void AddHold(int holds) override
Definition: client_callback.h:584
void RemoveHold() override
Definition: client_callback.h:587
Definition: client_callback.h:1207
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const Request *request, Response *response, ClientUnaryReactor *reactor)
Definition: client_callback.h:1211
Definition: client_callback.h:1114
void StartCall() override
Definition: client_callback.h:1128
Definition: client_callback.h:1097
static void Create(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, Response *response, ClientWriteReactor< Request > *reactor)
Definition: client_callback.h:1100
Definition: client_callback.h:886
void AddHold(int holds) override
Definition: client_callback.h:986
void Write(const Request *msg, ::grpc::WriteOptions options) ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:932
void WritesDone() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:958
void RemoveHold() override
Definition: client_callback.h:989
void StartCall() ABSL_LOCKS_EXCLUDED(start_mu_) override
Definition: client_callback.h:900
Definition: client_callback.h:112
virtual void OnDone(const ::grpc::Status &)=0
Called by the library when all operations associated with this RPC have completed and all Holds have ...
virtual bool InternalTrailersOnly(const grpc_call *call) const
InternalTrailersOnly is not part of the API and is not meant to be overridden.
Definition: client_callback.cc:52
virtual ~ClientReactor()=default
virtual void InternalScheduleOnDone(::grpc::Status s)
InternalScheduleOnDone is not part of the API and is not meant to be overridden.
Definition: client_callback.cc:28
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
#define GPR_CODEGEN_DEBUG_ASSERT(x)
Codegen specific version of GPR_DEBUG_ASSERT.
Definition: core_codegen_interface.h:155
::grpc::ClientUnaryReactor ClientUnaryReactor
Definition: client_callback.h:1252
void CallbackUnaryCall(::grpc::ChannelInterface *channel, const ::grpc::internal::RpcMethod &method, ::grpc::ClientContext *context, const InputMessage *request, OutputMessage *result, std::function< void(::grpc::Status)> on_completion)
Perform a callback-based unary call.
Definition: client_callback.h:48
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Null-initializes the global gRPC variables for the codegen library.
Definition: completion_queue.h:96