GRPC C++  1.39.1
subchannel.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21 
23 
24 #include <deque>
25 
41 
42 // Channel arg containing a URI indicating the address to connect to.
43 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
44 
45 namespace grpc_core {
46 
47 class SubchannelCall;
48 
49 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
50  public:
54  ~ConnectedSubchannel() override;
55 
56  void StartWatch(grpc_pollset_set* interested_parties,
58 
59  void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
60 
61  grpc_channel_stack* channel_stack() const { return channel_stack_; }
62  const grpc_channel_args* args() const { return args_; }
64  return channelz_subchannel_.get();
65  }
66 
67  size_t GetInitialCallSizeEstimate() const;
68 
69  private:
70  grpc_channel_stack* channel_stack_;
71  grpc_channel_args* args_;
72  // ref counted pointer to the channelz node in this connected subchannel's
73  // owning subchannel.
74  RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
75 };
76 
77 // Implements the interface of RefCounted<>.
79  public:
80  struct Args {
84  gpr_cycle_counter start_time;
89  };
92 
93  // Continues processing a transport stream op batch.
95 
96  // Returns the call stack of the subchannel call.
98 
99  // Sets the 'then_schedule_closure' argument for call stack destruction.
100  // Must be called once per call.
102 
103  // Interface of RefCounted<>.
106  const char* reason) GRPC_MUST_USE_RESULT;
107  // When refcount drops to 0, destroys itself and the associated call stack,
108  // but does NOT free the memory because it's in the call arena.
109  void Unref();
110  void Unref(const DebugLocation& location, const char* reason);
111 
112  private:
113  // Allow RefCountedPtr<> to access IncrementRefCount().
114  template <typename T>
115  friend class RefCountedPtr;
116 
118 
119  // If channelz is enabled, intercepts recv_trailing so that we may check the
120  // status and associate it to a subchannel.
121  void MaybeInterceptRecvTrailingMetadata(
123 
124  static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
125 
126  // Interface of RefCounted<>.
127  void IncrementRefCount();
128  void IncrementRefCount(const DebugLocation& location, const char* reason);
129 
130  static void Destroy(void* arg, grpc_error_handle error);
131 
132  RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
133  grpc_closure* after_call_stack_destroy_ = nullptr;
134  // State needed to support channelz interception of recv trailing metadata.
135  grpc_closure recv_trailing_metadata_ready_;
136  grpc_closure* original_recv_trailing_metadata_ = nullptr;
137  grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
138  grpc_millis deadline_;
139 };
140 
141 // A subchannel that knows how to connect to exactly one target address. It
142 // provides a target for load balancing.
143 //
144 // Note that this is the "real" subchannel implementation, whose API is
145 // different from the SubchannelInterface that is exposed to LB policy
146 // implementations. The client channel provides an adaptor class
147 // (SubchannelWrapper) that "converts" between the two.
149  public:
151  : public RefCounted<ConnectivityStateWatcherInterface> {
152  public:
157  };
158 
160 
161  // Will be invoked whenever the subchannel's connectivity state
162  // changes. There will be only one invocation of this method on a
163  // given watcher instance at any given time.
164  // Implementations should call PopConnectivityStateChange to get the next
165  // connectivity state change.
166  virtual void OnConnectivityStateChange() = 0;
167 
169 
170  // Enqueues connectivity state change notifications.
171  // When the state changes to READY, connected_subchannel will
172  // contain a ref to the connected subchannel. When it changes from
173  // READY to some other state, the implementation must release its
174  // ref to the connected subchannel.
175  // TODO(yashkt): This is currently needed to send the state updates in the
176  // right order when asynchronously notifying. This will no longer be
177  // necessary when we have access to EventManager.
178  void PushConnectivityStateChange(ConnectivityStateChange state_change);
179 
180  // Dequeues connectivity state change notifications.
181  ConnectivityStateChange PopConnectivityStateChange();
182 
183  private:
184  Mutex mu_; // protects the queue
185  // Keeps track of the updates that the watcher instance must be notified of.
186  // TODO(yashkt): This is currently needed to send the state updates in the
187  // right order when asynchronously notifying. This will no longer be
188  // necessary when we have access to EventManager.
189  std::deque<ConnectivityStateChange> connectivity_state_queue_
190  ABSL_GUARDED_BY(&mu_);
191  };
192 
193  // Creates a subchannel given \a connector and \a args.
196  const grpc_channel_args* args);
197 
198  // The ctor and dtor are not intended to use directly.
200  const grpc_channel_args* args);
201  ~Subchannel() override;
202 
203  // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
204  // is larger than the subchannel's current keepalive time. The updated value
205  // will have an affect when the subchannel creates a new ConnectedSubchannel.
206  void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_);
207 
208  // Gets the string representing the subchannel address.
209  // Caller doesn't take ownership.
210  const char* GetTargetAddress();
211 
212  const grpc_channel_args* channel_args() const { return args_; }
213 
214  channelz::SubchannelNode* channelz_node();
215 
216  // Returns the current connectivity state of the subchannel.
217  // If health_check_service_name is non-null, the returned connectivity
218  // state will be based on the state reported by the backend for that
219  // service name.
220  // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
221  grpc_connectivity_state CheckConnectivityState(
222  const absl::optional<std::string>& health_check_service_name,
223  RefCountedPtr<ConnectedSubchannel>* connected_subchannel)
224  ABSL_LOCKS_EXCLUDED(mu_);
225 
226  // Starts watching the subchannel's connectivity state.
227  // The first callback to the watcher will be delivered when the
228  // subchannel's connectivity state becomes a value other than
229  // initial_state, which may happen immediately.
230  // Subsequent callbacks will be delivered as the subchannel's state
231  // changes.
232  // The watcher will be destroyed either when the subchannel is
233  // destroyed or when CancelConnectivityStateWatch() is called.
234  void WatchConnectivityState(
235  grpc_connectivity_state initial_state,
236  const absl::optional<std::string>& health_check_service_name,
238  ABSL_LOCKS_EXCLUDED(mu_);
239 
240  // Cancels a connectivity state watch.
241  // If the watcher has already been destroyed, this is a no-op.
242  void CancelConnectivityStateWatch(
243  const absl::optional<std::string>& health_check_service_name,
244  ConnectivityStateWatcherInterface* watcher) ABSL_LOCKS_EXCLUDED(mu_);
245 
246  // Attempt to connect to the backend. Has no effect if already connected.
247  void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_);
248 
249  // Resets the connection backoff of the subchannel.
250  // TODO(roth): Move connection backoff out of subchannels and up into LB
251  // policy code (probably by adding a SubchannelGroup between
252  // SubchannelList and SubchannelData), at which point this method can
253  // go away.
254  void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
255 
256  // Tears down any existing connection, and arranges for destruction
257  void Orphan() override ABSL_LOCKS_EXCLUDED(mu_);
258 
259  // Returns a new channel arg encoding the subchannel address as a URI
260  // string. Caller is responsible for freeing the string.
261  static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
262 
263  // Returns the URI string from the subchannel address arg in \a args.
264  static const char* GetUriFromSubchannelAddressArg(
265  const grpc_channel_args* args);
266 
267  // Sets \a addr from the subchannel address arg in \a args.
268  static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
269  grpc_resolved_address* addr);
270 
271  private:
272  // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
273  // the subchannel's state.
274  class ConnectivityStateWatcherList {
275  public:
276  ~ConnectivityStateWatcherList() { Clear(); }
277 
278  void AddWatcherLocked(
279  RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
280  void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
281 
282  // Notifies all watchers in the list about a change to state.
283  void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
284  const absl::Status& status);
285 
286  void Clear() { watchers_.clear(); }
287 
288  bool empty() const { return watchers_.empty(); }
289 
290  private:
291  // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
292  // be a set instead of a map.
293  std::map<ConnectivityStateWatcherInterface*,
294  RefCountedPtr<ConnectivityStateWatcherInterface>>
295  watchers_;
296  };
297 
298  // A map that tracks ConnectivityStateWatcherInterfaces using a particular
299  // health check service name.
300  //
301  // There is one entry in the map for each health check service name.
302  // Entries exist only as long as there are watchers using the
303  // corresponding service name.
304  //
305  // A health check client is maintained only while the subchannel is in
306  // state READY.
307  class HealthWatcherMap {
308  public:
309  void AddWatcherLocked(
310  WeakRefCountedPtr<Subchannel> subchannel,
311  grpc_connectivity_state initial_state,
312  const std::string& health_check_service_name,
313  RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
314  void RemoveWatcherLocked(const std::string& health_check_service_name,
315  ConnectivityStateWatcherInterface* watcher);
316 
317  // Notifies the watcher when the subchannel's state changes.
318  void NotifyLocked(grpc_connectivity_state state, const absl::Status& status)
319  ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_);
320 
321  grpc_connectivity_state CheckConnectivityStateLocked(
322  Subchannel* subchannel, const std::string& health_check_service_name)
323  ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_);
324 
325  void ShutdownLocked();
326 
327  private:
328  class HealthWatcher;
329 
330  std::map<std::string, OrphanablePtr<HealthWatcher>> map_;
331  };
332 
333  class ConnectedSubchannelStateWatcher;
334 
335  class AsyncWatcherNotifierLocked;
336 
337  // Sets the subchannel's connectivity state to \a state.
338  void SetConnectivityStateLocked(grpc_connectivity_state state,
339  const absl::Status& status)
340  ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
341 
342  // Methods for connection.
343  void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
344  static void OnRetryAlarm(void* arg, grpc_error_handle error)
345  ABSL_LOCKS_EXCLUDED(mu_);
346  void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
347  static void OnConnectingFinished(void* arg, grpc_error_handle error)
348  ABSL_LOCKS_EXCLUDED(mu_);
349  bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
350 
351  // The subchannel pool this subchannel is in.
352  RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
353  // TODO(juanlishen): Consider using args_ as key_ directly.
354  // Subchannel key that identifies this subchannel in the subchannel pool.
355  const SubchannelKey key_;
356  // Channel args.
357  grpc_channel_args* args_;
358  // pollset_set tracking who's interested in a connection being setup.
359  grpc_pollset_set* pollset_set_;
360  // Channelz tracking.
361  RefCountedPtr<channelz::SubchannelNode> channelz_node_;
362 
363  // Connection state.
364  OrphanablePtr<SubchannelConnector> connector_;
365  SubchannelConnector::Result connecting_result_;
366  grpc_closure on_connecting_finished_;
367 
368  // Protects the other members.
369  Mutex mu_;
370 
371  // Active connection, or null.
372  RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
373  bool connecting_ ABSL_GUARDED_BY(mu_) = false;
374  bool disconnected_ ABSL_GUARDED_BY(mu_) = false;
375 
376  // Connectivity state tracking.
377  grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE;
378  absl::Status status_ ABSL_GUARDED_BY(mu_);
379  // The list of watchers without a health check service name.
380  ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_);
381  // The map of watchers with health check service names.
382  HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_);
383 
384  // Backoff state.
385  BackOff backoff_ ABSL_GUARDED_BY(mu_);
386  grpc_millis next_attempt_deadline_ ABSL_GUARDED_BY(mu_);
387  grpc_millis min_connect_timeout_ms_ ABSL_GUARDED_BY(mu_);
388  bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false;
389 
390  // Retry alarm.
391  grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_);
392  grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_);
393  bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false;
394  // reset_backoff() was called while alarm was pending.
395  bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false;
396  // Keepalive time period (-1 for unset)
397  int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;
398 };
399 
400 } // namespace grpc_core
401 
402 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */
ClusterWatcher * watcher
Definition: cds.cc:112
Definition: arena.h:44
Definition: call_combiner.h:50
Definition: subchannel.h:49
size_t GetInitialCallSizeEstimate() const
Definition: subchannel.cc:124
void Ping(grpc_closure *on_initiate, grpc_closure *on_ack)
Definition: subchannel.cc:114
channelz::SubchannelNode * channelz_subchannel() const
Definition: subchannel.h:63
void StartWatch(grpc_pollset_set *interested_parties, OrphanablePtr< ConnectivityStateWatcherInterface > watcher)
Definition: subchannel.cc:103
ConnectedSubchannel(grpc_channel_stack *channel_stack, const grpc_channel_args *args, RefCountedPtr< channelz::SubchannelNode > channelz_subchannel)
Definition: subchannel.cc:87
~ConnectedSubchannel() override
Definition: subchannel.cc:98
const grpc_channel_args * args() const
Definition: subchannel.h:62
grpc_channel_stack * channel_stack() const
Definition: subchannel.h:61
Definition: connectivity_state.h:51
Definition: debug_location.h:31
Definition: dual_ref_counted.h:52
Definition: sync.h:59
Definition: ref_counted.h:282
Definition: ref_counted_ptr.h:35
Definition: subchannel.h:78
grpc_call_stack * GetCallStack()
Definition: subchannel.cc:179
void SetAfterCallStackDestroy(grpc_closure *closure)
Definition: subchannel.cc:183
void Unref()
Definition: subchannel.cc:200
static RefCountedPtr< SubchannelCall > Create(Args args, grpc_error_handle *error)
Definition: subchannel.cc:133
friend class RefCountedPtr
Definition: subchannel.h:115
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch *batch)
Definition: subchannel.cc:169
RefCountedPtr< SubchannelCall > Ref() GRPC_MUST_USE_RESULT
Definition: subchannel.cc:189
Definition: subchannel.h:148
const grpc_channel_args * channel_args() const
Definition: subchannel.h:212
Definition: subchannel_pool_interface.h:36
Definition: client_channel_channelz.h:34
int64_t grpc_millis
Definition: exec_ctx.h:37
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:524
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
@ GRPC_CHANNEL_IDLE
channel is idle
Definition: connectivity_state.h:29
grpc_error_handle error
Definition: lame_client.cc:54
::google::protobuf::util::Status Status
Definition: config_protobuf.h:91
Round Robin Policy.
Definition: backend_metric.cc:26
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:67
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:208
RefCountedPtr< SubchannelInterface > subchannel
Definition: ring_hash.cc:213
grpc_closure closure
Definition: server.cc:460
A single argument...
Definition: grpc_types.h:103
Definition: context.h:44
Definition: channel_stack.h:192
An array of arguments that can be passed around.
Definition: grpc_types.h:132
Definition: channel_stack.h:182
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
RefCountedPtr< ConnectedSubchannel > connected_subchannel
Definition: subchannel.h:156
Definition: subchannel.h:80
grpc_millis deadline
Definition: subchannel.h:85
grpc_slice path
Definition: subchannel.h:83
grpc_polling_entity * pollent
Definition: subchannel.h:82
RefCountedPtr< ConnectedSubchannel > connected_subchannel
Definition: subchannel.h:81
Arena * arena
Definition: subchannel.h:86
gpr_cycle_counter start_time
Definition: subchannel.h:84
grpc_call_context_element * context
Definition: subchannel.h:87
CallCombiner * call_combiner
Definition: subchannel.h:88
Definition: error_internal.h:41
Definition: metadata_batch.h:52
Definition: polling_entity.h:37
Definition: resolve_address.h:44
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice.h:60
Definition: timer.h:32
Definition: transport.h:163