GRPC C++  1.39.1
subchannel_list.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21 
23 
24 #include <string.h>
25 
26 #include <grpc/support/alloc.h>
27 
28 #include "absl/container/inlined_vector.h"
29 
32 // TODO(roth): Should not need the include of subchannel.h here, since
33 // that implementation should be hidden from the LB policy API.
44 
45 // Code for maintaining a list of subchannels within an LB policy.
46 //
47 // To use this, callers must create their own subclasses, like so:
48 /*
49 
50 class MySubchannelList; // Forward declaration.
51 
52 class MySubchannelData
53  : public SubchannelData<MySubchannelList, MySubchannelData> {
54  public:
55  void ProcessConnectivityChangeLocked(
56  grpc_connectivity_state connectivity_state) override {
57  // ...code to handle connectivity changes...
58  }
59 };
60 
61 class MySubchannelList
62  : public SubchannelList<MySubchannelList, MySubchannelData> {
63 };
64 
65 */
66 // All methods will be called from within the client_channel work serializer.
67 
68 namespace grpc_core {
69 
70 // Forward declaration.
71 template <typename SubchannelListType, typename SubchannelDataType>
72 class SubchannelList;
73 
74 // Stores data for a particular subchannel in a subchannel list.
75 // Callers must create a subclass that implements the
76 // ProcessConnectivityChangeLocked() method.
77 template <typename SubchannelListType, typename SubchannelDataType>
79  public:
80  // Returns a pointer to the subchannel list containing this object.
81  SubchannelListType* subchannel_list() const {
82  return static_cast<SubchannelListType*>(subchannel_list_);
83  }
84 
85  // Returns the index into the subchannel list of this object.
86  size_t Index() const {
87  return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
88  subchannel_list_->subchannel(0));
89  }
90 
91  // Returns a pointer to the subchannel.
92  SubchannelInterface* subchannel() const { return subchannel_.get(); }
93 
94  // Synchronously checks the subchannel's connectivity state.
95  // Must not be called while there is a connectivity notification
96  // pending (i.e., between calling StartConnectivityWatchLocked() and
97  // calling CancelConnectivityWatchLocked()).
99  GPR_ASSERT(pending_watcher_ == nullptr);
100  connectivity_state_ = subchannel_->CheckConnectivityState();
101  return connectivity_state_;
102  }
103 
104  // Resets the connection backoff.
105  // TODO(roth): This method should go away when we move the backoff
106  // code out of the subchannel and into the LB policies.
107  void ResetBackoffLocked();
108 
109  // Starts watching the connectivity state of the subchannel.
110  // ProcessConnectivityChangeLocked() will be called whenever the
111  // connectivity state changes.
113 
114  // Cancels watching the connectivity state of the subchannel.
115  void CancelConnectivityWatchLocked(const char* reason);
116 
117  // Cancels any pending connectivity watch and unrefs the subchannel.
118  void ShutdownLocked();
119 
120  protected:
123  const ServerAddress& address,
125 
126  virtual ~SubchannelData();
127 
128  // After StartConnectivityWatchLocked() is called, this method will be
129  // invoked whenever the subchannel's connectivity state changes.
130  // To stop watching, use CancelConnectivityWatchLocked().
133 
134  private:
135  // Watcher for subchannel connectivity state.
136  class Watcher
138  public:
139  Watcher(
142  : subchannel_data_(subchannel_data),
143  subchannel_list_(std::move(subchannel_list)) {}
144 
145  ~Watcher() override {
146  subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
147  }
148 
149  void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
150 
151  grpc_pollset_set* interested_parties() override {
152  return subchannel_list_->policy()->interested_parties();
153  }
154 
155  private:
157  RefCountedPtr<SubchannelListType> subchannel_list_;
158  };
159 
160  // Unrefs the subchannel.
161  void UnrefSubchannelLocked(const char* reason);
162 
163  // Backpointer to owning subchannel list. Not owned.
165  // The subchannel.
167  // Will be non-null when the subchannel's state is being watched.
169  nullptr;
170  // Data updated by the watcher.
171  grpc_connectivity_state connectivity_state_;
172 };
173 
174 // A list of subchannels.
175 template <typename SubchannelListType, typename SubchannelDataType>
176 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
177  public:
178  typedef absl::InlinedVector<SubchannelDataType, 10> SubchannelVector;
179 
180  // The number of subchannels in the list.
181  size_t num_subchannels() const { return subchannels_.size(); }
182 
183  // The data for the subchannel at a particular index.
184  SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
185 
186  // Returns true if the subchannel list is shutting down.
187  bool shutting_down() const { return shutting_down_; }
188 
189  // Accessors.
190  LoadBalancingPolicy* policy() const { return policy_; }
191  TraceFlag* tracer() const { return tracer_; }
192 
193  // Resets connection backoff of all subchannels.
194  // TODO(roth): We will probably need to rethink this as part of moving
195  // the backoff code out of subchannels and into LB policies.
196  void ResetBackoffLocked();
197 
198  void Orphan() override {
199  ShutdownLocked();
201  }
202 
203  protected:
205  ServerAddressList addresses,
207  const grpc_channel_args& args);
208 
209  virtual ~SubchannelList();
210 
211  private:
212  // For accessing Ref() and Unref().
213  friend class SubchannelData<SubchannelListType, SubchannelDataType>;
214 
215  void ShutdownLocked();
216 
217  // Backpointer to owning policy.
218  LoadBalancingPolicy* policy_;
219 
220  TraceFlag* tracer_;
221 
222  // The list of subchannels.
223  SubchannelVector subchannels_;
224 
225  // Is this list shutting down? This may be true due to the shutdown of the
226  // policy itself or because a newer update has arrived while this one hadn't
227  // finished processing.
228  bool shutting_down_ = false;
229 };
230 
231 //
232 // implementation -- no user-servicable parts below
233 //
234 
235 //
236 // SubchannelData::Watcher
237 //
238 
239 template <typename SubchannelListType, typename SubchannelDataType>
242  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
244  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
245  " (subchannel %p): connectivity changed: state=%s, "
246  "shutting_down=%d, pending_watcher=%p",
247  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
248  subchannel_list_.get(), subchannel_data_->Index(),
249  subchannel_list_->num_subchannels(),
250  subchannel_data_->subchannel_.get(),
251  ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
252  subchannel_data_->pending_watcher_);
253  }
254  if (!subchannel_list_->shutting_down() &&
255  subchannel_data_->pending_watcher_ != nullptr) {
256  subchannel_data_->connectivity_state_ = new_state;
257  // Call the subclass's ProcessConnectivityChangeLocked() method.
258  subchannel_data_->ProcessConnectivityChangeLocked(new_state);
259  }
260 }
261 
262 //
263 // SubchannelData
264 //
265 
266 template <typename SubchannelListType, typename SubchannelDataType>
269  const ServerAddress& /*address*/,
271  : subchannel_list_(subchannel_list),
272  subchannel_(std::move(subchannel)),
273  // We assume that the current state is IDLE. If not, we'll get a
274  // callback telling us that.
275  connectivity_state_(GRPC_CHANNEL_IDLE) {}
276 
277 template <typename SubchannelListType, typename SubchannelDataType>
279  GPR_ASSERT(subchannel_ == nullptr);
280 }
281 
282 template <typename SubchannelListType, typename SubchannelDataType>
284  UnrefSubchannelLocked(const char* reason) {
285  if (subchannel_ != nullptr) {
286  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
288  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
289  " (subchannel %p): unreffing subchannel (%s)",
290  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
291  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
292  subchannel_.get(), reason);
293  }
294  subchannel_.reset();
295  }
296 }
297 
298 template <typename SubchannelListType, typename SubchannelDataType>
299 void SubchannelData<SubchannelListType,
300  SubchannelDataType>::ResetBackoffLocked() {
301  if (subchannel_ != nullptr) {
302  subchannel_->ResetBackoff();
303  }
304 }
305 
306 template <typename SubchannelListType, typename SubchannelDataType>
307 void SubchannelData<SubchannelListType,
308  SubchannelDataType>::StartConnectivityWatchLocked() {
309  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
311  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
312  " (subchannel %p): starting watch (from %s)",
313  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
314  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
315  subchannel_.get(), ConnectivityStateName(connectivity_state_));
316  }
317  GPR_ASSERT(pending_watcher_ == nullptr);
318  pending_watcher_ =
319  new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
320  subchannel_->WatchConnectivityState(
321  connectivity_state_,
322  std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
323  pending_watcher_));
324 }
325 
326 template <typename SubchannelListType, typename SubchannelDataType>
328  CancelConnectivityWatchLocked(const char* reason) {
329  if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
331  "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
332  " (subchannel %p): canceling connectivity watch (%s)",
333  subchannel_list_->tracer()->name(), subchannel_list_->policy(),
334  subchannel_list_, Index(), subchannel_list_->num_subchannels(),
335  subchannel_.get(), reason);
336  }
337  if (pending_watcher_ != nullptr) {
338  subchannel_->CancelConnectivityStateWatch(pending_watcher_);
339  pending_watcher_ = nullptr;
340  }
341 }
342 
343 template <typename SubchannelListType, typename SubchannelDataType>
345  if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
346  UnrefSubchannelLocked("shutdown");
347 }
348 
349 //
350 // SubchannelList
351 //
352 
353 template <typename SubchannelListType, typename SubchannelDataType>
357  const grpc_channel_args& args)
358  : InternallyRefCounted<SubchannelListType>(
359  GRPC_TRACE_FLAG_ENABLED(*tracer) ? "SubchannelList" : nullptr),
360  policy_(policy),
361  tracer_(tracer) {
362  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
364  "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
365  tracer_->name(), policy, this, addresses.size());
366  }
367  subchannels_.reserve(addresses.size());
368  // Create a subchannel for each address.
369  for (ServerAddress address : addresses) {
371  helper->CreateSubchannel(address, args);
372  if (subchannel == nullptr) {
373  // Subchannel could not be created.
374  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
376  "[%s %p] could not create subchannel for address %s, "
377  "ignoring",
378  tracer_->name(), policy_, address.ToString().c_str());
379  }
380  continue;
381  }
382  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
384  "[%s %p] subchannel list %p index %" PRIuPTR
385  ": Created subchannel %p for address %s",
386  tracer_->name(), policy_, this, subchannels_.size(),
387  subchannel.get(), address.ToString().c_str());
388  }
389  subchannels_.emplace_back(this, std::move(address), std::move(subchannel));
390  }
391 }
392 
393 template <typename SubchannelListType, typename SubchannelDataType>
395  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
396  gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
397  policy_, this);
398  }
399 }
400 
401 template <typename SubchannelListType, typename SubchannelDataType>
403  if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
404  gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
405  tracer_->name(), policy_, this);
406  }
407  GPR_ASSERT(!shutting_down_);
408  shutting_down_ = true;
409  for (size_t i = 0; i < subchannels_.size(); i++) {
410  SubchannelDataType* sd = &subchannels_[i];
411  sd->ShutdownLocked();
412  }
413 }
414 
415 template <typename SubchannelListType, typename SubchannelDataType>
416 void SubchannelList<SubchannelListType,
417  SubchannelDataType>::ResetBackoffLocked() {
418  for (size_t i = 0; i < subchannels_.size(); i++) {
419  SubchannelDataType* sd = &subchannels_[i];
420  sd->ResetBackoffLocked();
421  }
422 }
423 
424 } // namespace grpc_core
425 
426 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
Definition: orphanable.h:76
void Unref()
Definition: orphanable.h:103
A proxy object implemented by the client channel and used by the LB policy to communicate with the ch...
Definition: lb_policy.h:274
virtual RefCountedPtr< SubchannelInterface > CreateSubchannel(ServerAddress address, const grpc_channel_args &args)=0
Creates a new subchannel with the specified channel args.
Interface for load balancing policies.
Definition: lb_policy.h:82
Definition: ref_counted_ptr.h:35
Definition: server_address.h:43
Definition: subchannel_list.h:78
SubchannelInterface * subchannel() const
Definition: subchannel_list.h:92
virtual ~SubchannelData()
Definition: subchannel_list.h:278
virtual void ProcessConnectivityChangeLocked(grpc_connectivity_state connectivity_state)=0
void ShutdownLocked()
Definition: subchannel_list.h:344
SubchannelListType * subchannel_list() const
Definition: subchannel_list.h:81
size_t Index() const
Definition: subchannel_list.h:86
SubchannelData(SubchannelList< SubchannelListType, SubchannelDataType > *subchannel_list, const ServerAddress &address, RefCountedPtr< SubchannelInterface > subchannel)
Definition: subchannel_list.h:267
grpc_connectivity_state CheckConnectivityStateLocked()
Definition: subchannel_list.h:98
void ResetBackoffLocked()
Definition: subchannel_list.h:300
void CancelConnectivityWatchLocked(const char *reason)
Definition: subchannel_list.h:328
void StartConnectivityWatchLocked()
Definition: subchannel_list.h:308
Definition: subchannel_interface.h:33
Definition: subchannel_list.h:176
bool shutting_down() const
Definition: subchannel_list.h:187
absl::InlinedVector< SubchannelDataType, 10 > SubchannelVector
Definition: subchannel_list.h:178
void ResetBackoffLocked()
Definition: subchannel_list.h:417
SubchannelDataType * subchannel(size_t index)
Definition: subchannel_list.h:184
TraceFlag * tracer() const
Definition: subchannel_list.h:191
LoadBalancingPolicy * policy() const
Definition: subchannel_list.h:190
virtual ~SubchannelList()
Definition: subchannel_list.h:394
size_t num_subchannels() const
Definition: subchannel_list.h:181
SubchannelList(LoadBalancingPolicy *policy, TraceFlag *tracer, ServerAddressList addresses, LoadBalancingPolicy::ChannelControlHelper *helper, const grpc_channel_args &args)
Definition: subchannel_list.h:354
void Orphan() override
Definition: subchannel_list.h:198
Definition: trace.h:61
const char * name() const
Definition: trace.h:68
#define DEBUG_LOCATION
Definition: debug_location.h:41
#define GPR_ASSERT(x)
abort() the process if x is zero, having written a line to the log.
Definition: log.h:92
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
Log a message.
#define GPR_INFO
Definition: log.h:54
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
@ GRPC_CHANNEL_IDLE
channel is idle
Definition: connectivity_state.h:29
Round Robin Policy.
Definition: backend_metric.cc:26
absl::InlinedVector< ServerAddress, 1 > ServerAddressList
Definition: server_address.h:111
const char * ConnectivityStateName(grpc_connectivity_state state)
Definition: connectivity_state.cc:36
Definition: async_unary_call.h:398
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
RefCountedPtr< SubchannelInterface > subchannel
Definition: ring_hash.cc:213
grpc_connectivity_state connectivity_state
Definition: ring_hash.cc:214
An array of arguments that can be passed around.
Definition: grpc_types.h:132
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: trace.h:112
TraceFlag * tracer
Definition: xds_api.cc:907