GRPC C++  1.39.1
server.h
Go to the documentation of this file.
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_CORE_LIB_SURFACE_SERVER_H
19 
21 
22 #include <list>
23 #include <vector>
24 
25 #include "absl/status/statusor.h"
26 #include "absl/types/optional.h"
27 
28 #include <grpc/grpc.h>
29 
38 
39 namespace grpc_core {
40 
42 
43 class Server : public InternallyRefCounted<Server> {
44  public:
45  // Filter vtable.
47 
48  // Opaque type used for registered methods.
49  struct RegisteredMethod;
50 
51  // An object to represent the most relevant characteristics of a
52  // newly-allocated call object when using an AllocatingRequestMatcherBatch.
54  void* tag;
59  };
60 
61  // An object to represent the most relevant characteristics of a
62  // newly-allocated call object when using an
63  // AllocatingRequestMatcherRegistered.
65  void* tag;
71  };
72 
76  class ListenerInterface : public Orphanable {
77  public:
78  ~ListenerInterface() override = default;
79 
82  virtual void Start(Server* server,
83  const std::vector<grpc_pollset*>* pollsets) = 0;
84 
88 
91  virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
92  };
93 
94  explicit Server(const grpc_channel_args* args);
95  ~Server() override;
96 
97  void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override;
98 
99  const grpc_channel_args* channel_args() const { return channel_args_; }
101  return default_resource_user_;
102  }
103  channelz::ServerNode* channelz_node() const { return channelz_node_.get(); }
104 
105  // Do not call this before Start(). Returns the pollsets. The
106  // vector itself is immutable, but the pollsets inside are mutable. The
107  // result is valid for the lifetime of the server.
108  const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
109 
111  return config_fetcher_.get();
112  }
113 
115  std::unique_ptr<grpc_server_config_fetcher> config_fetcher) {
116  config_fetcher_ = std::move(config_fetcher);
117  }
118 
119  bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_);
120 
121  // Adds a listener to the server. When the server starts, it will call
122  // the listener's Start() method, and when it shuts down, it will orphan
123  // the listener.
124  void AddListener(OrphanablePtr<ListenerInterface> listener);
125 
126  // Starts listening for connections.
127  void Start() ABSL_LOCKS_EXCLUDED(mu_global_);
128 
129  // Sets up a transport. Creates a channel stack and binds the transport to
130  // the server. Called from the listener when a new connection is accepted.
132  grpc_transport* transport, grpc_pollset* accepting_pollset,
133  const grpc_channel_args* args,
134  const RefCountedPtr<channelz::SocketNode>& socket_node,
135  grpc_resource_user* resource_user = nullptr);
136 
138 
139  // Functions to specify that a specific registered method or the unregistered
140  // collection should use a specific allocator for request matching.
142  grpc_completion_queue* cq, void* method_tag,
143  std::function<RegisteredCallAllocation()> allocator);
145  std::function<BatchCallAllocation()> allocator);
146 
147  RegisteredMethod* RegisterMethod(
148  const char* method, const char* host,
150  uint32_t flags);
151 
153  grpc_metadata_array* request_metadata,
154  grpc_completion_queue* cq_bound_to_call,
155  grpc_completion_queue* cq_for_notification,
156  void* tag);
157 
159  RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
160  grpc_metadata_array* request_metadata,
161  grpc_byte_buffer** optional_payload,
162  grpc_completion_queue* cq_bound_to_call,
163  grpc_completion_queue* cq_for_notification, void* tag_new);
164 
165  void ShutdownAndNotify(grpc_completion_queue* cq, void* tag)
166  ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
167 
168  void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_);
169 
170  private:
171  struct RequestedCall;
172 
173  struct ChannelRegisteredMethod {
174  RegisteredMethod* server_registered_method = nullptr;
175  uint32_t flags;
176  bool has_host;
177  ExternallyManagedSlice method;
179  };
180 
181  class RequestMatcherInterface;
182  class RealRequestMatcher;
183  class AllocatingRequestMatcherBase;
184  class AllocatingRequestMatcherBatch;
185  class AllocatingRequestMatcherRegistered;
186 
187  class ChannelData {
188  public:
189  ChannelData() = default;
190  ~ChannelData();
191 
192  void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
193  size_t cq_idx, grpc_transport* transport,
194  intptr_t channelz_socket_uuid);
195 
196  RefCountedPtr<Server> server() const { return server_; }
197  grpc_channel* channel() const { return channel_; }
198  size_t cq_idx() const { return cq_idx_; }
199 
200  ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
201  const grpc_slice& path,
202  bool is_idempotent);
203 
204  // Filter vtable functions.
205  static grpc_error_handle InitChannelElement(
207  static void DestroyChannelElement(grpc_channel_element* elem);
208 
209  private:
210  class ConnectivityWatcher;
211 
212  static void AcceptStream(void* arg, grpc_transport* /*transport*/,
213  const void* transport_server_data);
214 
215  void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);
216 
217  static void FinishDestroy(void* arg, grpc_error_handle error);
218 
219  RefCountedPtr<Server> server_;
220  grpc_channel* channel_;
221  // The index into Server::cqs_ of the CQ used as a starting point for
222  // where to publish new incoming calls.
223  size_t cq_idx_;
224  absl::optional<std::list<ChannelData*>::iterator> list_position_;
225  // A hash-table of the methods and hosts of the registered methods.
226  // TODO(vjpai): Convert this to an STL map type as opposed to a direct
227  // bucket implementation. (Consider performance impact, hash function to
228  // use, etc.)
229  std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_;
230  uint32_t registered_method_max_probes_;
231  grpc_closure finish_destroy_channel_closure_;
232  intptr_t channelz_socket_uuid_;
233  };
234 
235  class CallData {
236  public:
237  enum class CallState {
238  NOT_STARTED, // Waiting for metadata.
239  PENDING, // Initial metadata read, not flow controlled in yet.
240  ACTIVATED, // Flow controlled in, on completion queue.
241  ZOMBIED, // Cancelled before being queued.
242  };
243 
244  CallData(grpc_call_element* elem, const grpc_call_element_args& args,
245  RefCountedPtr<Server> server);
246  ~CallData();
247 
248  // Starts the recv_initial_metadata batch on the call.
249  // Invoked from ChannelData::AcceptStream().
250  void Start(grpc_call_element* elem);
251 
252  void SetState(CallState state);
253 
254  // Attempts to move from PENDING to ACTIVATED state. Returns true
255  // on success.
256  bool MaybeActivate();
257 
258  // Publishes an incoming call to the application after it has been
259  // matched.
260  void Publish(size_t cq_idx, RequestedCall* rc);
261 
262  void KillZombie();
263 
264  void FailCallCreation();
265 
266  // Filter vtable functions.
267  static grpc_error_handle InitCallElement(
268  grpc_call_element* elem, const grpc_call_element_args* args);
269  static void DestroyCallElement(grpc_call_element* elem,
270  const grpc_call_final_info* /*final_info*/,
271  grpc_closure* /*ignored*/);
272  static void StartTransportStreamOpBatch(
274 
275  private:
276  // Helper functions for handling calls at the top of the call stack.
277  static void RecvInitialMetadataBatchComplete(void* arg,
279  void StartNewRpc(grpc_call_element* elem);
280  static void PublishNewRpc(void* arg, grpc_error_handle error);
281 
282  // Functions used inside the call stack.
283  void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
285  static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
286  static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
287 
288  RefCountedPtr<Server> server_;
289 
290  grpc_call* call_;
291 
292  Atomic<CallState> state_{CallState::NOT_STARTED};
293 
294  absl::optional<grpc_slice> path_;
295  absl::optional<grpc_slice> host_;
297 
298  grpc_completion_queue* cq_new_ = nullptr;
299 
300  RequestMatcherInterface* matcher_ = nullptr;
301  grpc_byte_buffer* payload_ = nullptr;
302 
303  grpc_closure kill_zombie_closure_;
304 
305  grpc_metadata_array initial_metadata_ =
306  grpc_metadata_array(); // Zero-initialize the C struct.
307  grpc_closure recv_initial_metadata_batch_complete_;
308 
309  grpc_metadata_batch* recv_initial_metadata_ = nullptr;
310  uint32_t recv_initial_metadata_flags_ = 0;
311  grpc_closure recv_initial_metadata_ready_;
312  grpc_closure* original_recv_initial_metadata_ready_;
313  grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;
314 
315  bool seen_recv_trailing_metadata_ready_ = false;
316  grpc_closure recv_trailing_metadata_ready_;
317  grpc_closure* original_recv_trailing_metadata_ready_;
318  grpc_error_handle recv_trailing_metadata_error_ = GRPC_ERROR_NONE;
319 
320  grpc_closure publish_;
321 
322  CallCombiner* call_combiner_;
323  };
324 
325  struct Listener {
326  explicit Listener(OrphanablePtr<ListenerInterface> l)
327  : listener(std::move(l)) {}
328  OrphanablePtr<ListenerInterface> listener;
329  grpc_closure destroy_done;
330  };
331 
332  struct ShutdownTag {
333  ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
334  : tag(tag_arg), cq(cq_arg) {}
335  void* const tag;
336  grpc_completion_queue* const cq;
337  grpc_cq_completion completion;
338  };
339 
340  static void ListenerDestroyDone(void* arg, grpc_error_handle error);
341 
342  static void DoneShutdownEvent(void* server,
343  grpc_cq_completion* /*completion*/) {
344  static_cast<Server*>(server)->Unref();
345  }
346 
347  static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
348 
349  void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error);
350  grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
351 
352  void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_)
353  ABSL_LOCKS_EXCLUDED(mu_call_);
354 
355  void KillPendingWorkLocked(grpc_error_handle error)
356  ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_);
357 
358  static grpc_call_error ValidateServerRequest(
359  grpc_completion_queue* cq_for_notification, void* tag,
360  grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
361  grpc_call_error ValidateServerRequestAndCq(
362  size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
363  grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
364 
365  std::vector<grpc_channel*> GetChannelsLocked() const;
366 
367  // Take a shutdown ref for a request (increment by 2) and return if shutdown
368  // has already been called.
369  bool ShutdownRefOnRequest() {
370  int old_value = shutdown_refs_.FetchAdd(2, MemoryOrder::ACQ_REL);
371  return (old_value & 1) != 0;
372  }
373 
374  // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2
375  // (for in-flight request) and possibly call MaybeFinishShutdown if
376  // appropriate.
377  void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) {
378  if (shutdown_refs_.FetchSub(2, MemoryOrder::ACQ_REL) == 2) {
379  MutexLock lock(&mu_global_);
380  MaybeFinishShutdown();
381  }
382  }
383  void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
384  if (shutdown_refs_.FetchSub(1, MemoryOrder::ACQ_REL) == 1) {
385  MaybeFinishShutdown();
386  }
387  }
388 
389  bool ShutdownCalled() const {
390  return (shutdown_refs_.Load(MemoryOrder::ACQUIRE) & 1) == 0;
391  }
392 
393  // Returns whether there are no more shutdown refs, which means that shutdown
394  // has been called and all accepted requests have been published if using an
395  // AllocatingRequestMatcher.
396  bool ShutdownReady() const {
397  return shutdown_refs_.Load(MemoryOrder::ACQUIRE) == 0;
398  }
399 
400  grpc_channel_args* const channel_args_;
401  grpc_resource_user* default_resource_user_ = nullptr;
402  RefCountedPtr<channelz::ServerNode> channelz_node_;
403  std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
404 
405  std::vector<grpc_completion_queue*> cqs_;
406  std::vector<grpc_pollset*> pollsets_;
407  bool started_ = false;
408 
409  // The two following mutexes control access to server-state.
410  // mu_global_ controls access to non-call-related state (e.g., channel state).
411  // mu_call_ controls access to call-related state (e.g., the call lists).
412  //
413  // If they are ever required to be nested, you must lock mu_global_
414  // before mu_call_. This is currently used in shutdown processing
415  // (ShutdownAndNotify() and MaybeFinishShutdown()).
416  Mutex mu_global_; // mutex for server and channel state
417  Mutex mu_call_; // mutex for call-specific state
418 
419  // startup synchronization: flag is protected by mu_global_, signals whether
420  // we are doing the listener start routine or not.
421  bool starting_ = false;
422  CondVar starting_cv_;
423 
424  std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
425 
426  // Request matcher for unregistered methods.
427  std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
428 
429  // The shutdown refs counter tracks whether or not shutdown has been called
430  // and whether there are any AllocatingRequestMatcher requests that have been
431  // accepted but not yet started (+2 on each one). If shutdown has been called,
432  // the lowest bit will be 0 (defaults to 1) and the counter will be even. The
433  // server should not notify on shutdown until the counter is 0 (shutdown is
434  // called and there are no requests that are accepted but not started).
435  Atomic<int> shutdown_refs_{1};
436  bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
437  std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
438 
439  std::list<ChannelData*> channels_;
440 
441  std::list<Listener> listeners_;
442  size_t listeners_destroyed_ = 0;
443 
444  // The last time we printed a shutdown progress message.
445  gpr_timespec last_shutdown_message_time_;
446 };
447 
448 } // namespace grpc_core
449 
450 struct grpc_server {
452 };
453 
454 // TODO(roth): Eventually, will need a way to modify configuration even after
455 // a connection is established (e.g., to change things like L7 rate
456 // limiting, RBAC, and fault injection configs). One possible option
457 // would be to do something like ServiceConfig and ConfigSelector, but
458 // that might add unnecessary per-call overhead. Need to consider other
459 // approaches here.
461  public:
462  class ConnectionManager : public grpc_core::RefCounted<ConnectionManager> {
463  public:
464  // Ownership of \a args is transfered.
465  virtual absl::StatusOr<grpc_channel_args*> UpdateChannelArgsForConnection(
466  grpc_channel_args* args, grpc_endpoint* tcp) = 0;
467  };
468 
470  public:
471  virtual ~WatcherInterface() = default;
472  // UpdateConnectionManager() is invoked by the config fetcher when a new
473  // config is available. Implementations should update the connection manager
474  // and start serving if not already serving.
477  // Implementations should stop serving when this is called. Serving should
478  // only resume when UpdateConfig() is invoked.
479  virtual void StopServing() = 0;
480  };
481 
482  virtual ~grpc_server_config_fetcher() = default;
483 
484  // Ownership of \a args is transferred.
485  virtual void StartWatch(std::string listening_address,
486  grpc_channel_args* args,
487  std::unique_ptr<WatcherInterface> watcher) = 0;
490 };
491 
492 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */
ClusterWatcher * watcher
Definition: cds.cc:112
Definition: orphanable.h:76
void Unref()
Definition: orphanable.h:103
friend class RefCountedPtr
Definition: orphanable.h:85
Definition: orphanable.h:42
Definition: ref_counted.h:282
Definition: ref_counted_ptr.h:35
Interface for listeners.
Definition: server.h:76
virtual void Start(Server *server, const std::vector< grpc_pollset * > *pollsets)=0
Starts listening.
virtual void SetOnDestroyDone(grpc_closure *on_destroy_done)=0
Sets a closure to be invoked by the listener when its destruction is complete.
virtual channelz::ListenSocketNode * channelz_listen_socket_node() const =0
Returns the channelz node for the listen socket, or null if not supported.
Definition: server.h:43
bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_)
Definition: server.cc:647
void SetBatchMethodAllocator(grpc_completion_queue *cq, std::function< BatchCallAllocation()> allocator)
Definition: server.cc:660
void RegisterCompletionQueue(grpc_completion_queue *cq)
Definition: server.cc:668
static const grpc_channel_filter kServerTopFilter
Definition: server.h:46
void AddListener(OrphanablePtr< ListenerInterface > listener)
Definition: server.cc:566
void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override
Definition: server.cc:861
void set_config_fetcher(std::unique_ptr< grpc_server_config_fetcher > config_fetcher)
Definition: server.h:114
void Start() ABSL_LOCKS_EXCLUDED(mu_global_)
Definition: server.cc:575
grpc_resource_user * default_resource_user() const
Definition: server.h:100
const grpc_channel_args * channel_args() const
Definition: server.h:99
void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_)
Definition: server.cc:850
channelz::ServerNode * channelz_node() const
Definition: server.h:103
grpc_call_error RequestCall(grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag)
Definition: server.cc:929
Server(const grpc_channel_args *args)
Definition: server.cc:546
void ShutdownAndNotify(grpc_completion_queue *cq, void *tag) ABSL_LOCKS_EXCLUDED(mu_global_
Definition: server.cc:807
grpc_server_config_fetcher * config_fetcher() const
Definition: server.h:110
RegisteredMethod * RegisterMethod(const char *method, const char *host, grpc_server_register_method_payload_handling payload_handling, uint32_t flags)
Definition: server.cc:685
void SetRegisteredMethodAllocator(grpc_completion_queue *cq, void *method_tag, std::function< RegisteredCallAllocation()> allocator)
Definition: server.cc:652
const std::vector< grpc_pollset * > & pollsets() const
Definition: server.h:108
~Server() override
Definition: server.cc:551
grpc_error_handle SetupTransport(grpc_transport *transport, grpc_pollset *accepting_pollset, const grpc_channel_args *args, const RefCountedPtr< channelz::SocketNode > &socket_node, grpc_resource_user *resource_user=nullptr)
Definition: server.cc:612
void mu_call_
Definition: server.h:166
grpc_call_error RequestRegisteredCall(RegisteredMethod *rm, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new)
Definition: server.cc:946
Definition: trace.h:61
Definition: channelz.h:343
Definition: channelz.h:231
virtual absl::StatusOr< grpc_channel_args * > UpdateChannelArgsForConnection(grpc_channel_args *args, grpc_endpoint *tcp)=0
virtual void UpdateConnectionManager(grpc_core::RefCountedPtr< ConnectionManager > manager)=0
#define GRPC_ERROR_NONE
The following "special" errors can be propagated without allocating memory.
Definition: error.h:228
int64_t grpc_millis
Definition: exec_ctx.h:37
#define GRPC_MILLIS_INF_FUTURE
Definition: exec_ctx.h:39
grpc_server_register_method_payload_handling
How to handle payloads for a registered method.
Definition: grpc.h:366
grpc_call_error
Result of a grpc call.
Definition: grpc_types.h:441
grpc_error_handle error
Definition: lame_client.cc:54
Round Robin Policy.
Definition: backend_metric.cc:26
TraceFlag grpc_server_channel_trace(false, "server_channel")
Definition: server.h:41
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:67
Definition: async_unary_call.h:398
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:208
Analogous to struct timespec.
Definition: gpr_types.h:47
Definition: grpc_types.h:40
Definition: grpc_types.h:569
Definition: channel_stack.h:76
Definition: channel_stack.h:174
Information about the call upon completion.
Definition: channel_stack.h:91
Definition: call.cc:139
An array of arguments that can be passed around.
Definition: grpc_types.h:132
Definition: channel_stack.h:68
Definition: channel_stack.h:166
Definition: channel_stack.h:107
Definition: channel.h:105
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
Definition: completion_queue.cc:339
Definition: slice_utils.h:147
grpc_completion_queue * cq
Definition: server.h:58
grpc_call ** call
Definition: server.h:55
grpc_metadata_array * initial_metadata
Definition: server.h:56
void * tag
Definition: server.h:54
grpc_call_details * details
Definition: server.h:57
gpr_timespec * deadline
Definition: server.h:68
grpc_completion_queue * cq
Definition: server.h:70
grpc_metadata_array * initial_metadata
Definition: server.h:67
void * tag
Definition: server.h:65
grpc_call ** call
Definition: server.h:66
grpc_byte_buffer ** optional_payload
Definition: server.h:69
Definition: server.cc:118
Definition: server.cc:66
Definition: completion_queue.h:39
Definition: endpoint.h:106
Definition: error_internal.h:41
Definition: grpc_types.h:563
Definition: metadata_batch.h:52
Definition: pollset_custom.cc:40
Definition: resource_quota.cc:65
Definition: server.h:460
virtual ~grpc_server_config_fetcher()=default
virtual void StartWatch(std::string listening_address, grpc_channel_args *args, std::unique_ptr< WatcherInterface > watcher)=0
virtual grpc_pollset_set * interested_parties()=0
virtual void CancelWatch(WatcherInterface *watcher)=0
Definition: server.h:450
grpc_core::OrphanablePtr< grpc_core::Server > core_server
Definition: server.h:451
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice.h:60
Definition: transport.h:163
Definition: transport_impl.h:66
std::string listening_address
Definition: xds_server_config_fetcher.cc:509