17 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
18 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H
27 #include "absl/status/status.h"
28 #include "absl/types/optional.h"
61 #define GRPC_ARG_SERVER_URI "grpc.server_uri"
64 #define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel"
67 #define GRPC_ARG_SERVICE_CONFIG_OBJ "grpc.internal.service_config_obj"
77 #define MAX_PENDING_BATCHES 6
106 new ExternalConnectivityWatcher(
this, pollent, state, on_complete,
113 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
114 this, on_complete,
true);
119 return static_cast<int>(external_watchers_.size());
141 class ResolverResultHandler;
142 class SubchannelWrapper;
143 class ClientChannelControlHelper;
144 class ConnectivityWatcherAdder;
145 class ConnectivityWatcherRemover;
157 ~ExternalConnectivityWatcher()
override;
160 static void RemoveWatcherFromExternalWatchersMap(
ClientChannel* chand,
165 const absl::Status& )
override;
172 void AddWatcherLocked()
173 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_);
174 void RemoveWatcherLocked()
175 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_);
183 Atomic<
bool> done_{
false};
186 struct ResolverQueuedCall {
188 ResolverQueuedCall* next =
nullptr;
190 struct LbQueuedCall {
191 LoadBalancedCall* lb_call;
192 LbQueuedCall* next =
nullptr;
215 void OnResolverResultChangedLocked(Resolver::Result result)
216 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
218 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
220 void CreateOrUpdateLbPolicyLocked(
221 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
222 Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
223 OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
225 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
227 void UpdateStateAndPickerLocked(
230 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)
231 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
233 void UpdateServiceConfigInControlPlaneLocked(
234 RefCountedPtr<ServiceConfig> service_config,
235 RefCountedPtr<ConfigSelector> config_selector,
236 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
237 const char* lb_policy_name)
238 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
240 void UpdateServiceConfigInDataPlaneLocked()
241 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
243 void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
244 void DestroyResolverAndLbPolicyLocked()
245 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
248 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
251 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
253 void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_);
256 void AddResolverQueuedCall(ResolverQueuedCall* call,
258 ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
259 void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
261 ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_);
265 ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
267 ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
268 RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
270 ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_);
275 const
bool deadline_checking_enabled_;
276 const
bool enable_retries_;
278 ClientChannelFactory* client_channel_factory_;
280 RefCountedPtr<ServiceConfig> default_service_config_;
281 std::
string server_name_;
283 channelz::ChannelNode* channelz_node_;
289 mutable Mutex resolution_mu_;
291 ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) =
296 bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false;
297 RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_);
298 RefCountedPtr<ConfigSelector> config_selector_
299 ABSL_GUARDED_BY(resolution_mu_);
300 RefCountedPtr<DynamicFilters> dynamic_filters_
301 ABSL_GUARDED_BY(resolution_mu_);
306 mutable Mutex data_plane_mu_;
307 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_
308 ABSL_GUARDED_BY(data_plane_mu_);
310 LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) =
nullptr;
315 std::shared_ptr<WorkSerializer> work_serializer_;
316 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(work_serializer_);
317 OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(work_serializer_);
318 bool previous_resolution_contained_addresses_
319 ABSL_GUARDED_BY(work_serializer_) = false;
320 RefCountedPtr<ServiceConfig> saved_service_config_
321 ABSL_GUARDED_BY(work_serializer_);
322 RefCountedPtr<ConfigSelector> saved_config_selector_
323 ABSL_GUARDED_BY(work_serializer_);
324 absl::optional<std::
string> health_check_service_name_
325 ABSL_GUARDED_BY(work_serializer_);
327 ABSL_GUARDED_BY(work_serializer_);
328 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_
329 ABSL_GUARDED_BY(work_serializer_);
331 std::map<Subchannel*,
int> subchannel_refcount_map_
332 ABSL_GUARDED_BY(work_serializer_);
336 std::set<SubchannelWrapper*> subchannel_wrappers_
337 ABSL_GUARDED_BY(work_serializer_);
341 std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
342 pending_subchannel_updates_ ABSL_GUARDED_BY(work_serializer_);
343 int keepalive_time_ ABSL_GUARDED_BY(work_serializer_) = -1;
356 UniquePtr<
char> info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_);
357 UniquePtr<
char> info_service_config_json_ ABSL_GUARDED_BY(info_mu_);
363 mutable Mutex external_watchers_mu_;
364 std::map<
grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
365 external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_);
398 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
404 return subchannel_call_;
408 class LbQueuedCallCanceller;
415 static void FailPendingBatchInCallCombiner(
void* arg,
418 typedef bool (*YieldCallCombinerPredicate)(
426 static bool YieldCallCombinerIfPendingBatchesFound(
427 const CallCombinerClosureList& closures) {
428 return closures.size() > 0;
433 void PendingBatchesFail(
435 YieldCallCombinerPredicate yield_call_combiner_predicate);
436 static void ResumePendingBatchInCallCombiner(
void* arg,
439 void PendingBatchesResume();
441 static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
443 void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
446 void CreateSubchannelCall();
450 void MaybeRemoveCallFromLbQueuedCallsLocked()
451 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
453 void MaybeAddCallToLbQueuedCallsLocked()
454 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_);
456 ClientChannel* chand_;
462 gpr_cycle_counter call_start_time_;
466 CallCombiner* call_combiner_;
480 ClientChannel::LbQueuedCall queued_call_
481 ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
482 bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
484 LbQueuedCallCanceller* lb_call_canceller_
485 ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) =
nullptr;
487 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
488 const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ =
nullptr;
490 LoadBalancingPolicy::CallState*)>
491 lb_recv_trailing_metadata_ready_;
493 RefCountedPtr<SubchannelCall> subchannel_call_;
498 grpc_closure* original_recv_trailing_metadata_ready_ =
nullptr;
ClusterWatcher * watcher
Definition: cds.cc:112
Definition: connectivity_state.h:66
Definition: call_combiner.h:144
Definition: client_channel.h:377
RefCountedPtr< SubchannelCall > subchannel_call() const
Definition: client_channel.h:403
bool PickSubchannelLocked(grpc_error_handle *error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel voi AsyncPickDone)(grpc_error_handle error)
Definition: client_channel.h:401
Definition: client_channel.h:81
static ClientChannel * GetFromChannel(grpc_channel *channel)
Definition: client_channel.cc:1040
void AddExternalConnectivityWatcher(grpc_polling_entity pollent, grpc_connectivity_state *state, grpc_closure *on_complete, grpc_closure *watcher_timer_init)
Definition: client_channel.h:102
RefCountedPtr< LoadBalancedCall > CreateLoadBalancedCall(const grpc_call_element_args &args, grpc_polling_entity *pollent, grpc_closure *on_call_destruction_complete)
Definition: client_channel.cc:1173
int NumExternalConnectivityWatchers() const
Definition: client_channel.h:117
void RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface *watcher)
Definition: client_channel.cc:1855
void AddConnectivityWatcher(grpc_connectivity_state initial_state, OrphanablePtr< AsyncConnectivityStateWatcherInterface > watcher)
Definition: client_channel.cc:1849
void CancelExternalConnectivityWatcher(grpc_closure *on_complete)
Definition: client_channel.h:112
static const grpc_channel_filter kFilterVtable
Definition: client_channel.h:83
grpc_connectivity_state CheckConnectivityState(bool try_to_connect)
Definition: client_channel.cc:1833
Definition: connectivity_state.h:51
Definition: ref_counted.h:205
Definition: ref_counted.h:282
Definition: ref_counted_ptr.h:35
#define MAX_PENDING_BATCHES
Definition: client_channel.h:77
#define GRPC_ERROR_NONE
The following "special" errors can be propagated without allocating memory.
Definition: error.h:228
int64_t grpc_millis
Definition: exec_ctx.h:37
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
grpc_error_handle error
Definition: lame_client.cc:54
Round Robin Policy.
Definition: backend_metric.cc:26
std::unique_ptr< T, DefaultDeleteChar > UniquePtr
Definition: memory.h:47
@ kUnrefCallDtor
Definition: ref_counted.h:231
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:67
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:208
RefCountedPtr< SubchannelInterface > subchannel
Definition: ring_hash.cc:213
Definition: channel_stack.h:76
Definition: channel_stack.h:174
Definition: channel_stack.h:192
An array of arguments that can be passed around.
Definition: grpc_types.h:132
Definition: channel_stack.h:68
Definition: channel_stack.h:166
Definition: channel_stack.h:107
Information requested from the channel.
Definition: grpc_types.h:704
Definition: channel_stack.h:182
Definition: channel.h:105
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
Definition: error_internal.h:41
Definition: polling_entity.h:37
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice.h:60
Transport op: a set of operations to perform on a transport as a whole.
Definition: transport.h:332
Definition: transport.h:163