18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
30 template <
class RequestType,
class ResponseType>
35 const RequestType*, ResponseType*)>
37 : get_reactor_(
std::move(get_reactor)) {}
41 allocator_ = allocator;
47 auto* allocator_state =
52 param.call->call(),
sizeof(ServerCallbackUnaryImpl)))
53 ServerCallbackUnaryImpl(
55 param.call, allocator_state, param.call_requester);
56 param.server_context->BeginCompletionOp(
57 param.call, [call](
bool) { call->MaybeDone(); }, call);
60 if (param.status.ok()) {
61 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
67 if (reactor ==
nullptr) {
76 call->SetupReactor(reactor);
83 RequestType* request =
nullptr;
85 if (allocator_ !=
nullptr) {
93 *handler_data = allocator_state;
94 request = allocator_state->
request();
108 const RequestType*, ResponseType*)>
125 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
128 finish_ops_.set_core_cq_tag(&finish_tag_);
130 if (!ctx_->sent_initial_metadata_) {
132 ctx_->initial_metadata_flags());
133 if (ctx_->compression_level_set()) {
134 finish_ops_.set_compression_level(ctx_->compression_level());
136 ctx_->sent_initial_metadata_ =
true;
140 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
141 finish_ops_.SendMessagePtr(response()));
143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
145 finish_ops_.set_core_cq_tag(&finish_tag_);
146 call_.PerformOps(&finish_ops_);
149 void SendInitialMetadata()
override {
160 ServerUnaryReactor* reactor =
161 reactor_.load(std::memory_order_relaxed);
162 reactor->OnSendInitialMetadataDone(ok);
163 this->MaybeDone(true);
166 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
167 ctx_->initial_metadata_flags());
168 if (ctx_->compression_level_set()) {
169 meta_ops_.set_compression_level(ctx_->compression_level());
171 ctx_->sent_initial_metadata_ =
true;
172 meta_ops_.set_core_cq_tag(&meta_tag_);
173 call_.PerformOps(&meta_ops_);
179 ServerCallbackUnaryImpl(
181 MessageHolder<RequestType, ResponseType>* allocator_state,
182 std::function<
void()> call_requester)
185 allocator_state_(allocator_state),
186 call_requester_(
std::move(call_requester)) {
187 ctx_->set_message_allocator_state(allocator_state);
195 reactor_.store(reactor, std::memory_order_relaxed);
198 this->
MaybeDone(reactor->InternalInlineable());
201 const RequestType* request() {
return allocator_state_->request(); }
202 ResponseType* response() {
return allocator_state_->response(); }
204 void CallOnDone()
override {
205 reactor_.load(std::memory_order_relaxed)->OnDone();
207 auto call_requester = std::move(call_requester_);
208 allocator_state_->Release();
209 if (ctx_->context_allocator() !=
nullptr) {
210 ctx_->context_allocator()->Release(ctx_);
212 this->~ServerCallbackUnaryImpl();
217 ServerReactor* reactor()
override {
218 return reactor_.load(std::memory_order_relaxed);
232 MessageHolder<RequestType, ResponseType>*
const allocator_state_;
233 std::function<void()> call_requester_;
244 std::atomic<ServerUnaryReactor*> reactor_;
246 std::atomic<intptr_t> callbacks_outstanding_{
251 template <
class RequestType,
class ResponseType>
258 : get_reactor_(
std::move(get_reactor)) {}
264 param.call->call(),
sizeof(ServerCallbackReaderImpl)))
265 ServerCallbackReaderImpl(
267 param.call, param.call_requester);
271 param.server_context->BeginCompletionOp(
273 [reader](
bool) { reader->MaybeDone(
false); },
277 if (param.status.ok()) {
285 if (reactor ==
nullptr) {
293 reader->SetupReactor(reactor);
313 this->MaybeDone(false);
316 if (!ctx_->sent_initial_metadata_) {
318 ctx_->initial_metadata_flags());
319 if (ctx_->compression_level_set()) {
320 finish_ops_.set_compression_level(ctx_->compression_level());
322 ctx_->sent_initial_metadata_ =
true;
326 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
327 finish_ops_.SendMessagePtr(&resp_));
329 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
331 finish_ops_.set_core_cq_tag(&finish_tag_);
332 call_.PerformOps(&finish_ops_);
335 void SendInitialMetadata()
override {
344 ServerReadReactor<RequestType>* reactor =
345 reactor_.load(std::memory_order_relaxed);
346 reactor->OnSendInitialMetadataDone(ok);
347 this->MaybeDone(true);
350 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
351 ctx_->initial_metadata_flags());
352 if (ctx_->compression_level_set()) {
353 meta_ops_.set_compression_level(ctx_->compression_level());
355 ctx_->sent_initial_metadata_ =
true;
356 meta_ops_.set_core_cq_tag(&meta_tag_);
357 call_.PerformOps(&meta_ops_);
360 void Read(RequestType* req)
override {
362 read_ops_.RecvMessage(req);
363 call_.PerformOps(&read_ops_);
371 std::function<
void()> call_requester)
372 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
374 void SetupReactor(ServerReadReactor<RequestType>* reactor) {
375 reactor_.store(reactor, std::memory_order_relaxed);
381 [
this, reactor](
bool ok) {
382 if (GPR_UNLIKELY(!ok)) {
383 ctx_->MaybeMarkCancelledOnRead();
385 reactor->OnReadDone(ok);
386 this->MaybeDone(
true);
389 read_ops_.set_core_cq_tag(&read_tag_);
398 ~ServerCallbackReaderImpl() {}
400 ResponseType* response() {
return &resp_; }
402 void CallOnDone()
override {
403 reactor_.load(std::memory_order_relaxed)->OnDone();
405 auto call_requester = std::move(call_requester_);
406 if (ctx_->context_allocator() !=
nullptr) {
407 ctx_->context_allocator()->Release(ctx_);
409 this->~ServerCallbackReaderImpl();
414 ServerReactor* reactor()
override {
415 return reactor_.load(std::memory_order_relaxed);
434 std::function<void()> call_requester_;
436 std::atomic<ServerReadReactor<RequestType>*> reactor_;
438 std::atomic<intptr_t> callbacks_outstanding_{
443 template <
class RequestType,
class ResponseType>
450 : get_reactor_(
std::move(get_reactor)) {}
456 param.call->call(),
sizeof(ServerCallbackWriterImpl)))
457 ServerCallbackWriterImpl(
459 param.call,
static_cast<RequestType*
>(param.request),
460 param.call_requester);
464 param.server_context->BeginCompletionOp(
466 [writer](
bool) { writer->MaybeDone(
false); },
470 if (param.status.ok()) {
477 if (reactor ==
nullptr) {
485 writer->SetupReactor(reactor);
494 call,
sizeof(RequestType))) RequestType();
501 request->~RequestType();
506 std::function<ServerWriteReactor<ResponseType>*(
522 this->MaybeDone(false);
525 finish_ops_.set_core_cq_tag(&finish_tag_);
527 if (!ctx_->sent_initial_metadata_) {
529 ctx_->initial_metadata_flags());
530 if (ctx_->compression_level_set()) {
531 finish_ops_.set_compression_level(ctx_->compression_level());
533 ctx_->sent_initial_metadata_ =
true;
535 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
536 call_.PerformOps(&finish_ops_);
539 void SendInitialMetadata()
override {
548 ServerWriteReactor<ResponseType>* reactor =
549 reactor_.load(std::memory_order_relaxed);
550 reactor->OnSendInitialMetadataDone(ok);
551 this->MaybeDone(true);
554 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
555 ctx_->initial_metadata_flags());
556 if (ctx_->compression_level_set()) {
557 meta_ops_.set_compression_level(ctx_->compression_level());
559 ctx_->sent_initial_metadata_ =
true;
560 meta_ops_.set_core_cq_tag(&meta_tag_);
561 call_.PerformOps(&meta_ops_);
564 void Write(
const ResponseType* resp,
570 if (!ctx_->sent_initial_metadata_) {
571 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
572 ctx_->initial_metadata_flags());
573 if (ctx_->compression_level_set()) {
574 write_ops_.set_compression_level(ctx_->compression_level());
576 ctx_->sent_initial_metadata_ =
true;
580 call_.PerformOps(&write_ops_);
588 Finish(std::move(s));
592 friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
596 const RequestType* req,
597 std::function<
void()> call_requester)
601 call_requester_(
std::move(call_requester)) {}
603 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
604 reactor_.store(reactor, std::memory_order_relaxed);
610 [
this, reactor](
bool ok) {
611 reactor->OnWriteDone(ok);
612 this->MaybeDone(true);
615 write_ops_.set_core_cq_tag(&write_tag_);
616 this->BindReactor(reactor);
617 this->MaybeCallOnCancel(reactor);
621 this->MaybeDone(
false);
623 ~ServerCallbackWriterImpl() {
624 if (req_ !=
nullptr) {
625 req_->~RequestType();
629 const RequestType* request() {
return req_; }
631 void CallOnDone()
override {
632 reactor_.load(std::memory_order_relaxed)->OnDone();
634 auto call_requester = std::move(call_requester_);
635 if (ctx_->context_allocator() !=
nullptr) {
636 ctx_->context_allocator()->Release(ctx_);
638 this->~ServerCallbackWriterImpl();
643 ServerReactor* reactor()
override {
644 return reactor_.load(std::memory_order_relaxed);
662 const RequestType* req_;
663 std::function<void()> call_requester_;
665 std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
667 std::atomic<intptr_t> callbacks_outstanding_{
672 template <
class RequestType,
class ResponseType>
679 : get_reactor_(
std::move(get_reactor)) {}
684 param.call->call(),
sizeof(ServerCallbackReaderWriterImpl)))
685 ServerCallbackReaderWriterImpl(
687 param.call, param.call_requester);
691 param.server_context->BeginCompletionOp(
693 [stream](
bool) { stream->MaybeDone(
false); },
697 if (param.status.ok()) {
704 if (reactor ==
nullptr) {
713 stream->SetupReactor(reactor);
717 std::function<ServerBidiReactor<RequestType, ResponseType>*(
721 class ServerCallbackReaderWriterImpl
734 this->MaybeDone(false);
737 finish_ops_.set_core_cq_tag(&finish_tag_);
739 if (!ctx_->sent_initial_metadata_) {
741 ctx_->initial_metadata_flags());
742 if (ctx_->compression_level_set()) {
743 finish_ops_.set_compression_level(ctx_->compression_level());
745 ctx_->sent_initial_metadata_ =
true;
747 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
748 call_.PerformOps(&finish_ops_);
751 void SendInitialMetadata()
override {
760 ServerBidiReactor<RequestType, ResponseType>* reactor =
761 reactor_.load(std::memory_order_relaxed);
762 reactor->OnSendInitialMetadataDone(ok);
763 this->MaybeDone(true);
766 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
767 ctx_->initial_metadata_flags());
768 if (ctx_->compression_level_set()) {
769 meta_ops_.set_compression_level(ctx_->compression_level());
771 ctx_->sent_initial_metadata_ =
true;
772 meta_ops_.set_core_cq_tag(&meta_tag_);
773 call_.PerformOps(&meta_ops_);
776 void Write(
const ResponseType* resp,
782 if (!ctx_->sent_initial_metadata_) {
783 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
784 ctx_->initial_metadata_flags());
785 if (ctx_->compression_level_set()) {
786 write_ops_.set_compression_level(ctx_->compression_level());
788 ctx_->sent_initial_metadata_ =
true;
792 call_.PerformOps(&write_ops_);
799 Finish(std::move(s));
802 void Read(RequestType* req)
override {
804 read_ops_.RecvMessage(req);
805 call_.PerformOps(&read_ops_);
809 friend class CallbackBidiHandler<RequestType, ResponseType>;
813 std::function<
void()> call_requester)
814 : ctx_(ctx), call_(*call), call_requester_(
std::move(call_requester)) {}
816 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
817 reactor_.store(reactor, std::memory_order_relaxed);
823 [
this, reactor](
bool ok) {
824 reactor->OnWriteDone(ok);
825 this->MaybeDone(true);
828 write_ops_.set_core_cq_tag(&write_tag_);
831 [
this, reactor](
bool ok) {
832 if (GPR_UNLIKELY(!ok)) {
833 ctx_->MaybeMarkCancelledOnRead();
835 reactor->OnReadDone(ok);
836 this->MaybeDone(
true);
839 read_ops_.set_core_cq_tag(&read_tag_);
840 this->BindReactor(reactor);
841 this->MaybeCallOnCancel(reactor);
845 this->MaybeDone(
false);
848 void CallOnDone()
override {
849 reactor_.load(std::memory_order_relaxed)->OnDone();
851 auto call_requester = std::move(call_requester_);
852 if (ctx_->context_allocator() !=
nullptr) {
853 ctx_->context_allocator()->Release(ctx_);
855 this->~ServerCallbackReaderWriterImpl();
860 ServerReactor* reactor()
override {
861 return reactor_.load(std::memory_order_relaxed);
883 std::function<void()> call_requester_;
885 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
887 std::atomic<intptr_t> callbacks_outstanding_{
A sequence of bytes.
Definition: byte_buffer.h:60
void Release()
Forget underlying byte buffer without destroying Use this only for un-owned byte buffers.
Definition: byte_buffer.h:146
Definition: server_context.h:578
virtual void grpc_call_unref(grpc_call *call)=0
virtual void grpc_call_ref(grpc_call *call)=0
virtual void * grpc_call_arena_alloc(grpc_call *call, size_t length)=0
virtual MessageHolder< RequestT, ResponseT > * AllocateMessages()=0
RequestT * request()
Definition: message_allocator.h:45
ResponseT * response()
Definition: message_allocator.h:46
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
ServerBidiReactor is the interface for a bidirectional streaming RPC.
Definition: server_callback.h:268
Definition: server_callback.h:207
virtual void SendInitialMetadata()=0
void BindReactor(ServerReadReactor< RequestType > *reactor)
Definition: server_callback.h:215
Definition: server_callback.h:238
virtual void SendInitialMetadata()=0
Definition: server_callback.h:191
virtual void SendInitialMetadata()=0
void BindReactor(Reactor *reactor)
Definition: server_callback.h:201
Definition: server_callback.h:221
virtual void SendInitialMetadata()=0
ServerReadReactor is the interface for a client-streaming RPC.
Definition: server_callback.h:490
Definition: server_callback.h:697
ServerWriteReactor is the interface for a server-streaming RPC.
Definition: server_callback.h:578
Did it work? If it didn't, why?
Definition: status.h:31
bool ok() const
Is the status OK?
Definition: status.h:118
Per-message write options.
Definition: call_op_set.h:79
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:181
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately.
Definition: call_op_set.h:117
Straightforward wrapping of the C call object.
Definition: call.h:35
grpc_call * call() const
Definition: call.h:69
Definition: call_op_set.h:424
Definition: call_op_set.h:282
Definition: call_op_set.h:654
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:867
Definition: server_callback_handlers.h:673
CallbackBidiHandler(std::function< ServerBidiReactor< RequestType, ResponseType > *(::grpc::CallbackServerContext *)> get_reactor)
Definition: server_callback_handlers.h:675
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:680
Definition: server_callback_handlers.h:252
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:259
CallbackClientStreamingHandler(std::function< ServerReadReactor< RequestType > *(::grpc::CallbackServerContext *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:254
Definition: server_callback_handlers.h:444
CallbackServerStreamingHandler(std::function< ServerWriteReactor< ResponseType > *(::grpc::CallbackServerContext *, const RequestType *)> get_reactor)
Definition: server_callback_handlers.h:446
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **) final
Definition: server_callback_handlers.h:488
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:451
Definition: server_callback_handlers.h:31
void RunHandler(const HandlerParameter ¶m) final
Definition: server_callback_handlers.h:44
void SetMessageAllocator(MessageAllocator< RequestType, ResponseType > *allocator)
Definition: server_callback_handlers.h:39
void * Deserialize(grpc_call *call, grpc_byte_buffer *req, ::grpc::Status *status, void **handler_data) final
Definition: server_callback_handlers.h:79
CallbackUnaryHandler(std::function< ServerUnaryReactor *(::grpc::CallbackServerContext *, const RequestType *, ResponseType *)> get_reactor)
Definition: server_callback_handlers.h:33
CallbackWithSuccessTag can be reused multiple times, and will be used in this fashion for streaming o...
Definition: callback_common.h:135
Definition: server_callback.h:161
Definition: server_callback.h:768
Base class for running an RPC handler.
Definition: rpc_service_method.h:38
void MaybeCallOnCancel(ServerReactor *reactor)
Definition: server_callback.h:107
void MaybeDone()
Definition: server_callback.h:93
void Ref()
Increases the reference count.
Definition: server_callback.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:146
::grpc::ServerUnaryReactor ServerUnaryReactor
Definition: server_callback.h:798
Reactor * CatchingReactorGetter(Func &&func, Args &&... args)
Definition: callback_common.h:52
FinishOnlyReactor< ServerUnaryReactor > UnimplementedUnaryReactor
Definition: server_callback.h:774
An Alarm posts the user-provided tag to its associated completion queue or invokes the user-provided ...
Definition: alarm.h:33
CoreCodegenInterface * g_core_codegen_interface
Null-initializes the global gRPC variables for the codegen library.
Definition: completion_queue.h:96
@ UNIMPLEMENTED
Operation is not implemented or not supported/enabled in this service.
Definition: status_code_enum.h:115
Definition: async_unary_call.h:398
Definition: rpc_service_method.h:41
Definition: grpc_types.h:40