GRPC Core  18.0.0
flow_control.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
23 
24 #include <stdint.h>
25 
31 
33 struct grpc_chttp2_stream;
34 
36 
37 namespace grpc {
38 namespace testing {
39 class TrickledCHTTP2; // to make this a friend
40 } // namespace testing
41 } // namespace grpc
42 
43 namespace grpc_core {
44 namespace chttp2 {
45 
46 static constexpr uint32_t kDefaultWindow = 65535;
47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
48 // TODO(ncteisen): Tune this
49 static constexpr uint32_t kFrameSize = 1024 * 1024;
50 
51 class TransportFlowControl;
52 class StreamFlowControl;
53 
54 // Encapsulates a collections of actions the transport needs to take with
55 // regard to flow control. Each action comes with urgencies that tell the
56 // transport how quickly the action must take place.
58  public:
59  enum class Urgency : uint8_t {
60  // Nothing to be done.
61  NO_ACTION_NEEDED = 0,
62  // Initiate a write to update the initial window immediately.
64  // Push the flow control update into a send buffer, to be sent
65  // out the next time a write is initiated.
67  };
68 
69  Urgency send_stream_update() const { return send_stream_update_; }
70  Urgency send_transport_update() const { return send_transport_update_; }
72  return send_initial_window_update_;
73  }
75  return send_max_frame_size_update_;
76  }
77  uint32_t initial_window_size() const { return initial_window_size_; }
78  uint32_t max_frame_size() const { return max_frame_size_; }
79 
81  send_stream_update_ = u;
82  return *this;
83  }
85  send_transport_update_ = u;
86  return *this;
87  }
89  uint32_t update) {
90  send_initial_window_update_ = u;
91  initial_window_size_ = update;
92  return *this;
93  }
95  uint32_t update) {
96  send_max_frame_size_update_ = u;
97  max_frame_size_ = update;
98  return *this;
99  }
100 
101  static const char* UrgencyString(Urgency u);
102  void Trace(grpc_chttp2_transport* t) const;
103 
104  private:
105  Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
106  Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
107  Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
108  Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
109  uint32_t initial_window_size_ = 0;
110  uint32_t max_frame_size_ = 0;
111 };
112 
114  public:
115  FlowControlTrace(const char* reason, TransportFlowControl* tfc,
116  StreamFlowControl* sfc) {
117  if (enabled_) Init(reason, tfc, sfc);
118  }
119 
121  if (enabled_) Finish();
122  }
123 
124  private:
125  void Init(const char* reason, TransportFlowControl* tfc,
126  StreamFlowControl* sfc);
127  void Finish();
128 
129  const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace);
130 
131  TransportFlowControl* tfc_;
132  StreamFlowControl* sfc_;
133  const char* reason_;
134  int64_t remote_window_;
135  int64_t target_window_;
136  int64_t announced_window_;
137  int64_t remote_window_delta_;
138  int64_t local_window_delta_;
139  int64_t announced_window_delta_;
140 };
141 
142 // Fat interface with all methods a flow control implementation needs to
143 // support.
145  public:
148 
149  // Is flow control enabled? This is needed in other codepaths like the checks
150  // in parsing and in writing.
151  virtual bool flow_control_enabled() const = 0;
152 
153  // Called to check if the transport needs to send a WINDOW_UPDATE frame
154  virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0;
155 
156  // Using the protected members, returns and Action to be taken by the
157  // tranport.
159 
160  // Using the protected members, returns and Action to be taken by the
161  // tranport. Also checks for updates to our BDP estimate and acts
162  // accordingly.
164 
165  // Called to do bookkeeping when a stream owned by this transport sends
166  // data on the wire
167  virtual void StreamSentData(int64_t /* size */) = 0;
168 
169  // Called to do bookkeeping when a stream owned by this transport receives
170  // data from the wire. Also does error checking for frame size.
171  virtual grpc_error_handle RecvData(int64_t /* incoming_frame_size */) = 0;
172 
173  // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
174  virtual void RecvUpdate(uint32_t /* size */) = 0;
175 
176  // Returns the BdpEstimator held by this object. Caller is responsible for
177  // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
178  // bdp estimator actions inside TransportFlowControl
179  virtual BdpEstimator* bdp_estimator() { return nullptr; }
180 
181  // Getters
182  int64_t remote_window() const { return remote_window_; }
183  virtual int64_t target_window() const { return target_initial_window_size_; }
184  int64_t announced_window() const { return announced_window_; }
185 
186  // Used in certain benchmarks in which we don't want FlowControl to be a
187  // factor
188  virtual void TestOnlyForceHugeWindow() {}
189 
190  protected:
191  friend class ::grpc::testing::TrickledCHTTP2;
192  int64_t remote_window_ = kDefaultWindow;
193  int64_t target_initial_window_size_ = kDefaultWindow;
194  int64_t announced_window_ = kDefaultWindow;
195 };
196 
197 // Implementation of flow control that does NOTHING. Always returns maximum
198 // values, never initiates writes, and assumes that the remote peer is doing
199 // the same. To be used to narrow down on flow control as the cause of negative
200 // performance.
202  public:
203  // Maxes out all values
205 
206  bool flow_control_enabled() const override { return false; }
207 
208  // Never do anything.
209  uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; }
212  void StreamSentData(int64_t /* size */) override {}
213  grpc_error_handle RecvData(int64_t /* incoming_frame_size */) override {
214  return GRPC_ERROR_NONE;
215  }
216  void RecvUpdate(uint32_t /* size */) override {}
217 };
218 
219 // Implementation of flow control that abides to HTTP/2 spec and attempts
220 // to be as performant as possible.
222  public:
223  TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
224  ~TransportFlowControl() override {}
225 
226  bool flow_control_enabled() const override { return true; }
227 
228  bool bdp_probe() const { return enable_bdp_probe_; }
229 
230  // returns an announce if we should send a transport update to our peer,
231  // else returns zero; writing_anyway indicates if a write would happen
232  // regardless of the send - if it is false and this function returns non-zero,
233  // this announce will cause a write to occur
234  uint32_t MaybeSendUpdate(bool writing_anyway) override;
235 
236  // Reads the flow control data and returns and actionable struct that will
237  // tell chttp2 exactly what it needs to do
239  return UpdateAction(FlowControlAction());
240  }
241 
242  // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
243  // to perform more complex flow control calculations and return an action
244  // to let chttp2 change its parameters
246 
247  void StreamSentData(int64_t size) override { remote_window_ -= size; }
248 
249  grpc_error_handle ValidateRecvData(int64_t incoming_frame_size);
250  void CommitRecvData(int64_t incoming_frame_size) {
251  announced_window_ -= incoming_frame_size;
252  }
253 
254  grpc_error_handle RecvData(int64_t incoming_frame_size) override {
255  FlowControlTrace trace(" data recv", this, nullptr);
256  grpc_error_handle error = ValidateRecvData(incoming_frame_size);
257  if (error != GRPC_ERROR_NONE) return error;
258  CommitRecvData(incoming_frame_size);
259  return GRPC_ERROR_NONE;
260  }
261 
262  // we have received a WINDOW_UPDATE frame for a transport
263  void RecvUpdate(uint32_t size) override {
264  FlowControlTrace trace("t updt recv", this, nullptr);
265  remote_window_ += size;
266  }
267 
268  // See comment above announced_stream_total_over_incoming_window_ for the
269  // logic behind this decision.
270  int64_t target_window() const override {
271  return static_cast<uint32_t> GPR_MIN(
272  (int64_t)((1u << 31) - 1),
273  announced_stream_total_over_incoming_window_ +
275  }
276 
277  const grpc_chttp2_transport* transport() const { return t_; }
278 
280  if (delta > 0) {
281  announced_stream_total_over_incoming_window_ -= delta;
282  }
283  }
284 
286  if (delta > 0) {
287  announced_stream_total_over_incoming_window_ += delta;
288  }
289  }
290 
291  BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
292 
293  void TestOnlyForceHugeWindow() override {
294  announced_window_ = 1024 * 1024 * 1024;
295  remote_window_ = 1024 * 1024 * 1024;
296  }
297 
298  private:
299  double TargetLogBdp();
300  double SmoothLogBdp(double value);
301  FlowControlAction::Urgency DeltaUrgency(int64_t value,
302  grpc_chttp2_setting_id setting_id);
303 
304  FlowControlAction UpdateAction(FlowControlAction action) {
305  if (announced_window_ < target_window() / 2) {
308  }
309  return action;
310  }
311 
312  const grpc_chttp2_transport* const t_;
313 
322  int64_t announced_stream_total_over_incoming_window_ = 0;
323 
325  const bool enable_bdp_probe_;
326 
327  /* bdp estimation */
328  grpc_core::BdpEstimator bdp_estimator_;
329 
330  /* pid controller */
331  grpc_core::PidController pid_controller_;
332  grpc_millis last_pid_update_ = 0;
333 };
334 
335 // Fat interface with all methods a stream flow control implementation needs
336 // to support.
338  public:
341 
342  // Updates an action using the protected members.
344  abort();
345  }
346 
347  // Using the protected members, returns an Action for this stream to be
348  // taken by the tranport.
350 
351  // Bookkeeping for when data is sent on this stream.
352  virtual void SentData(int64_t /* outgoing_frame_size */) = 0;
353 
354  // Bookkeeping and error checking for when data is received by this stream.
355  virtual grpc_error_handle RecvData(int64_t /* incoming_frame_size */) = 0;
356 
357  // Called to check if this stream needs to send a WINDOW_UPDATE frame.
358  virtual uint32_t MaybeSendUpdate() = 0;
359 
360  // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
361  virtual void RecvUpdate(uint32_t /* size */) = 0;
362 
363  // Bookkeeping for when a call pulls bytes out of the transport. At this
364  // point we consider the data 'used' and can thus let out peer know we are
365  // ready for more data.
366  virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */,
367  size_t /* have_already */) {
368  abort();
369  }
370 
371  // Used in certain benchmarks in which we don't want FlowControl to be a
372  // factor
373  virtual void TestOnlyForceHugeWindow() {}
374 
375  // Getters
379 
380  protected:
381  friend class ::grpc::testing::TrickledCHTTP2;
382  int64_t remote_window_delta_ = 0;
383  int64_t local_window_delta_ = 0;
385 };
386 
387 // Implementation of flow control that does NOTHING. Always returns maximum
388 // values, never initiates writes, and assumes that the remote peer is doing
389 // the same. To be used to narrow down on flow control as the cause of negative
390 // performance.
392  public:
394  return action;
395  }
397  void SentData(int64_t /* outgoing_frame_size */) override {}
398  grpc_error_handle RecvData(int64_t /* incoming_frame_size */) override {
399  return GRPC_ERROR_NONE;
400  }
401  uint32_t MaybeSendUpdate() override { return 0; }
402  void RecvUpdate(uint32_t /* size */) override {}
403  void IncomingByteStreamUpdate(size_t /* max_size_hint */,
404  size_t /* have_already */) override {}
405 };
406 
407 // Implementation of flow control that abides to HTTP/2 spec and attempts
408 // to be as performant as possible.
410  public:
412  ~StreamFlowControl() override {
414  }
415 
418  return UpdateAction(tfc_->MakeAction());
419  }
420 
421  // we have sent data on the wire, we must track this in our bookkeeping for
422  // the remote peer's flow control.
423  void SentData(int64_t outgoing_frame_size) override {
424  FlowControlTrace tracer(" data sent", tfc_, this);
425  tfc_->StreamSentData(outgoing_frame_size);
426  remote_window_delta_ -= outgoing_frame_size;
427  }
428 
429  // we have received data from the wire
430  grpc_error_handle RecvData(int64_t incoming_frame_size) override;
431 
432  // returns an announce if we should send a stream update to our peer, else
433  // returns zero
434  uint32_t MaybeSendUpdate() override;
435 
436  // we have received a WINDOW_UPDATE frame for a stream
437  void RecvUpdate(uint32_t size) override {
438  FlowControlTrace trace("s updt recv", tfc_, this);
439  remote_window_delta_ += size;
440  }
441 
442  // the application is asking for a certain amount of bytes
443  void IncomingByteStreamUpdate(size_t max_size_hint,
444  size_t have_already) override;
445 
446  int64_t remote_window_delta() const { return remote_window_delta_; }
447  int64_t local_window_delta() const { return local_window_delta_; }
449 
450  const grpc_chttp2_stream* stream() const { return s_; }
451 
452  void TestOnlyForceHugeWindow() override {
453  announced_window_delta_ = 1024 * 1024 * 1024;
454  local_window_delta_ = 1024 * 1024 * 1024;
455  remote_window_delta_ = 1024 * 1024 * 1024;
456  }
457 
458  private:
459  TransportFlowControl* const tfc_;
460  const grpc_chttp2_stream* const s_;
461 
462  void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
464  announced_window_delta_ += change;
466  }
467 };
468 
470  public:
473  double current_target) = 0;
474 };
475 
478 
479 } // namespace chttp2
480 } // namespace grpc_core
481 
482 #endif
absl::optional< XdsApi::CdsUpdate > update
Definition: cds.cc:114
Definition: bdp_estimator.h:38
Definition: pid_controller.h:35
Definition: trace.h:61
Definition: flow_control.h:57
FlowControlAction & set_send_max_frame_size_update(Urgency u, uint32_t update)
Definition: flow_control.h:94
uint32_t max_frame_size() const
Definition: flow_control.h:78
FlowControlAction & set_send_initial_window_update(Urgency u, uint32_t update)
Definition: flow_control.h:88
uint32_t initial_window_size() const
Definition: flow_control.h:77
Urgency send_stream_update() const
Definition: flow_control.h:69
Urgency send_max_frame_size_update() const
Definition: flow_control.h:74
FlowControlAction & set_send_stream_update(Urgency u)
Definition: flow_control.h:80
static const char * UrgencyString(Urgency u)
Definition: flow_control.cc:128
FlowControlAction & set_send_transport_update(Urgency u)
Definition: flow_control.h:84
Urgency
Definition: flow_control.h:59
Urgency send_transport_update() const
Definition: flow_control.h:70
void Trace(grpc_chttp2_transport *t) const
Definition: flow_control.cc:142
Urgency send_initial_window_update() const
Definition: flow_control.h:71
Definition: flow_control.h:113
~FlowControlTrace()
Definition: flow_control.h:120
FlowControlTrace(const char *reason, TransportFlowControl *tfc, StreamFlowControl *sfc)
Definition: flow_control.h:115
Definition: flow_control.h:337
int64_t local_window_delta()
Definition: flow_control.h:377
StreamFlowControlBase()
Definition: flow_control.h:339
virtual FlowControlAction UpdateAction(FlowControlAction)
Definition: flow_control.h:343
virtual void IncomingByteStreamUpdate(size_t, size_t)
Definition: flow_control.h:366
virtual ~StreamFlowControlBase()
Definition: flow_control.h:340
int64_t announced_window_delta()
Definition: flow_control.h:378
virtual grpc_error_handle RecvData(int64_t)=0
int64_t remote_window_delta()
Definition: flow_control.h:376
virtual void RecvUpdate(uint32_t)=0
int64_t announced_window_delta_
Definition: flow_control.h:384
virtual void TestOnlyForceHugeWindow()
Definition: flow_control.h:373
int64_t local_window_delta_
Definition: flow_control.h:383
virtual FlowControlAction MakeAction()=0
int64_t remote_window_delta_
Definition: flow_control.h:382
Definition: flow_control.h:391
void SentData(int64_t) override
Definition: flow_control.h:397
FlowControlAction MakeAction() override
Definition: flow_control.h:396
FlowControlAction UpdateAction(FlowControlAction action) override
Definition: flow_control.h:393
void RecvUpdate(uint32_t) override
Definition: flow_control.h:402
void IncomingByteStreamUpdate(size_t, size_t) override
Definition: flow_control.h:403
grpc_error_handle RecvData(int64_t) override
Definition: flow_control.h:398
uint32_t MaybeSendUpdate() override
Definition: flow_control.h:401
Definition: flow_control.h:409
FlowControlAction MakeAction() override
Definition: flow_control.h:417
StreamFlowControl(TransportFlowControl *tfc, const grpc_chttp2_stream *s)
Definition: flow_control.cc:218
int64_t local_window_delta() const
Definition: flow_control.h:447
int64_t remote_window_delta() const
Definition: flow_control.h:446
~StreamFlowControl() override
Definition: flow_control.h:412
void SentData(int64_t outgoing_frame_size) override
Definition: flow_control.h:423
grpc_error_handle RecvData(int64_t incoming_frame_size) override
Definition: flow_control.cc:222
void RecvUpdate(uint32_t size) override
Definition: flow_control.h:437
uint32_t MaybeSendUpdate() override
Definition: flow_control.cc:265
void IncomingByteStreamUpdate(size_t max_size_hint, size_t have_already) override
Definition: flow_control.cc:276
int64_t announced_window_delta() const
Definition: flow_control.h:448
FlowControlAction UpdateAction(FlowControlAction action) override
Definition: flow_control.cc:392
const grpc_chttp2_stream * stream() const
Definition: flow_control.h:450
void TestOnlyForceHugeWindow() override
Definition: flow_control.h:452
virtual ~TestOnlyTransportTargetWindowEstimatesMocker()
Definition: flow_control.h:471
virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(double current_target)=0
Definition: flow_control.h:144
virtual uint32_t MaybeSendUpdate(bool)=0
int64_t announced_window() const
Definition: flow_control.h:184
virtual BdpEstimator * bdp_estimator()
Definition: flow_control.h:179
virtual int64_t target_window() const
Definition: flow_control.h:183
virtual bool flow_control_enabled() const =0
virtual ~TransportFlowControlBase()
Definition: flow_control.h:147
int64_t remote_window() const
Definition: flow_control.h:182
TransportFlowControlBase()
Definition: flow_control.h:146
virtual void TestOnlyForceHugeWindow()
Definition: flow_control.h:188
int64_t target_initial_window_size_
Definition: flow_control.h:193
int64_t announced_window_
Definition: flow_control.h:194
virtual FlowControlAction PeriodicUpdate()=0
virtual grpc_error_handle RecvData(int64_t)=0
int64_t remote_window_
Definition: flow_control.h:192
virtual FlowControlAction MakeAction()=0
bool flow_control_enabled() const override
Definition: flow_control.h:206
FlowControlAction PeriodicUpdate() override
Definition: flow_control.h:211
void RecvUpdate(uint32_t) override
Definition: flow_control.h:216
grpc_error_handle RecvData(int64_t) override
Definition: flow_control.h:213
void StreamSentData(int64_t) override
Definition: flow_control.h:212
TransportFlowControlDisabled(grpc_chttp2_transport *t)
Definition: flow_control.cc:158
uint32_t MaybeSendUpdate(bool) override
Definition: flow_control.h:209
FlowControlAction MakeAction() override
Definition: flow_control.h:210
Definition: flow_control.h:221
void RecvUpdate(uint32_t size) override
Definition: flow_control.h:263
void StreamSentData(int64_t size) override
Definition: flow_control.h:247
BdpEstimator * bdp_estimator() override
Definition: flow_control.h:291
void TestOnlyForceHugeWindow() override
Definition: flow_control.h:293
FlowControlAction MakeAction() override
Definition: flow_control.h:238
void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)
Definition: flow_control.h:279
int64_t target_window() const override
Definition: flow_control.h:270
void CommitRecvData(int64_t incoming_frame_size)
Definition: flow_control.h:250
grpc_error_handle ValidateRecvData(int64_t incoming_frame_size)
Definition: flow_control.cc:206
~TransportFlowControl() override
Definition: flow_control.h:224
const grpc_chttp2_transport * transport() const
Definition: flow_control.h:277
grpc_error_handle RecvData(int64_t incoming_frame_size) override
Definition: flow_control.h:254
FlowControlAction PeriodicUpdate() override
Definition: flow_control.cc:354
bool bdp_probe() const
Definition: flow_control.h:228
TransportFlowControl(const grpc_chttp2_transport *t, bool enable_bdp_probe)
Definition: flow_control.cc:177
uint32_t MaybeSendUpdate(bool writing_anyway) override
Definition: flow_control.cc:192
void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)
Definition: flow_control.h:285
bool flow_control_enabled() const override
Definition: flow_control.h:226
#define GRPC_ERROR_NONE
The following "special" errors can be propagated without allocating memory.
Definition: error.h:228
int64_t grpc_millis
Definition: exec_ctx.h:37
grpc_core::TraceFlag grpc_flowctl_trace
grpc_chttp2_setting_id
Definition: http2_settings.h:29
grpc_error_handle error
Definition: lame_client.cc:54
TestOnlyTransportTargetWindowEstimatesMocker * g_test_only_transport_target_window_estimates_mocker
Definition: flow_control.cc:44
Round Robin Policy.
Definition: backend_metric.cc:26
Definition: flow_control.h:37
Definition: internal.h:509
Definition: internal.h:288
Definition: error_internal.h:41
#define GRPC_TRACE_FLAG_ENABLED(f)
Definition: trace.h:112
#define GPR_MIN(a, b)
useful macros that don't belong anywhere else
Definition: useful.h:24
TraceFlag * tracer
Definition: xds_api.cc:907