GRPC Core  18.0.0
transport.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
20 #define GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H
21 
23 
24 #include <stddef.h>
25 
38 
39 /* Minimum and maximum protocol accepted versions. */
40 #define GRPC_PROTOCOL_VERSION_MAX_MAJOR 2
41 #define GRPC_PROTOCOL_VERSION_MAX_MINOR 1
42 #define GRPC_PROTOCOL_VERSION_MIN_MAJOR 2
43 #define GRPC_PROTOCOL_VERSION_MIN_MINOR 1
44 
45 /* forward declarations */
46 
47 typedef struct grpc_transport grpc_transport;
48 
49 /* grpc_stream doesn't actually exist. It's used as a typesafe
50  opaque pointer for whatever data the transport wants to track
51  for a stream. */
52 typedef struct grpc_stream grpc_stream;
53 
55 
56 typedef struct grpc_stream_refcount {
59 #ifndef NDEBUG
60  const char* object_type;
61 #endif
64 
65 #ifndef NDEBUG
66 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
67  grpc_iomgr_cb_func cb, void* cb_arg,
68  const char* object_type);
69 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
70  grpc_stream_ref_init(rc, ir, cb, cb_arg, objtype)
71 #else
72 void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
73  grpc_iomgr_cb_func cb, void* cb_arg);
74 #define GRPC_STREAM_REF_INIT(rc, ir, cb, cb_arg, objtype) \
75  do { \
76  grpc_stream_ref_init(rc, ir, cb, cb_arg); \
77  (void)(objtype); \
78  } while (0)
79 #endif
80 
81 #ifndef NDEBUG
82 inline void grpc_stream_ref(grpc_stream_refcount* refcount,
83  const char* reason) {
85  gpr_log(GPR_DEBUG, "%s %p:%p REF %s", refcount->object_type, refcount,
86  refcount->destroy.cb_arg, reason);
87  }
88  refcount->refs.RefNonZero(DEBUG_LOCATION, reason);
89 }
90 #else
91 inline void grpc_stream_ref(grpc_stream_refcount* refcount) {
92  refcount->refs.RefNonZero();
93 }
94 #endif
95 
97 
98 #ifndef NDEBUG
100  const char* reason) {
102  gpr_log(GPR_DEBUG, "%s %p:%p UNREF %s", refcount->object_type, refcount,
103  refcount->destroy.cb_arg, reason);
104  }
105  if (GPR_UNLIKELY(refcount->refs.Unref(DEBUG_LOCATION, reason))) {
106  grpc_stream_destroy(refcount);
107  }
108 }
109 #else
110 inline void grpc_stream_unref(grpc_stream_refcount* refcount) {
111  if (GPR_UNLIKELY(refcount->refs.Unref())) {
112  grpc_stream_destroy(refcount);
113  }
114 }
115 #endif
116 
117 /* Wrap a buffer that is owned by some stream object into a slice that shares
118  the same refcount */
120  void* buffer, size_t length);
121 
123  uint64_t framing_bytes = 0;
124  uint64_t data_bytes = 0;
125  uint64_t header_bytes = 0;
126 };
127 
131 };
132 
135 
138 
139 // This struct (which is present in both grpc_transport_stream_op_batch
140 // and grpc_transport_op_batch) is a convenience to allow filters or
141 // transports to schedule a closure related to a particular batch without
142 // having to allocate memory. The general pattern is to initialize the
143 // closure with the callback arg set to the batch and extra_arg set to
144 // whatever state is associated with the handler (e.g., the call element
145 // or the transport stream object).
146 //
147 // Note that this can only be used by the current handler of a given
148 // batch on the way down the stack (i.e., whichever filter or transport is
149 // currently handling the batch). Once a filter or transport passes control
150 // of the batch to the next handler, it cannot depend on the contents of
151 // this struct anymore, because the next handler may reuse it.
153  void* extra_arg = nullptr;
155  grpc_handler_private_op_data() { memset(&closure, 0, sizeof(closure)); }
156 };
157 
160 
161 /* Transport stream op: a set of operations to perform on a transport
162  against a single stream */
165  : send_initial_metadata(false),
166  send_trailing_metadata(false),
167  send_message(false),
168  recv_initial_metadata(false),
169  recv_message(false),
170  recv_trailing_metadata(false),
171  cancel_stream(false),
172  is_traced(false) {}
173 
184 
187 
190 
193 
195  bool send_message : 1;
196 
199 
201  bool recv_message : 1;
202 
206 
208  bool cancel_stream : 1;
209 
211  bool is_traced : 1;
212 
213  /***************************************************************************
214  * remaining fields are initialized and used at the discretion of the
215  * current handler of the op */
216 
218 };
219 
223  : context(context) {}
225  // We don't really own `send_message`, so release ownership and let the
226  // owner clean the data.
227  (void)send_message.send_message.release();
228  }
229 
230  struct {
235  // If non-NULL, will be set by the transport to the peer string (a char*).
236  // The transport retains ownership of the string.
237  // Note: This pointer may be used by the transport after the
238  // send_initial_metadata op is completed. It must remain valid
239  // until the call is destroyed.
240  gpr_atm* peer_string = nullptr;
242 
243  struct {
245  // Set by the transport to true if the stream successfully wrote the
246  // trailing metadata. If this is not set but there was a send trailing
247  // metadata op present, this can indicate that a server call can be marked
248  // as a cancellation (since the stream was write-closed before status could
249  // be delivered).
250  bool* sent = nullptr;
252 
253  struct {
254  // The transport (or a filter that decides to return a failure before
255  // the op gets down to the transport) takes ownership.
256  // The batch's on_complete will not be called until after the byte
257  // stream is orphaned.
259  // Set by the transport if the stream has been closed for writes. If this
260  // is set and send message op is present, we set the operation to be a
261  // failure without sending a cancel OP down the stack. This is so that the
262  // status of the call does not get overwritten by the Cancel OP, which would
263  // be especially problematic if we had received a valid status from the
264  // server.
265  // For send_initial_metadata, it is fine for the status to be overwritten
266  // because at that point, the client will not have received a status.
267  // For send_trailing_metadata, we might overwrite the status if we have
268  // non-zero metadata to send. This is fine because the API does not allow
269  // the client to send trailing metadata.
270  bool stream_write_closed = false;
272 
273  struct {
275  // Flags are used only on the server side. If non-null, will be set to
276  // a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
277  // indicate if the call is idempotent).
278  uint32_t* recv_flags = nullptr;
281  // If not NULL, will be set to true if trailing metadata is
282  // immediately available. This may be a signal that we received a
283  // Trailers-Only response.
285  // If non-NULL, will be set by the transport to the peer string (a char*).
286  // The transport retains ownership of the string.
287  // Note: This pointer may be used by the transport after the
288  // recv_initial_metadata op is completed. It must remain valid
289  // until the call is destroyed.
290  gpr_atm* peer_string = nullptr;
292 
293  struct {
294  // Will be set by the transport to point to the byte stream
295  // containing a received message.
296  // Will be NULL if trailing metadata is received instead of a message.
298  // Was this recv_message failed for reasons other than a clean end-of-stream
303 
304  struct {
310 
321  struct {
322  // Error contract: the transport that gets this op must cause cancel_error
323  // to be unref'ed after processing it
326 
327  /* Indexes correspond to grpc_context_index enum values */
329 };
330 
332 typedef struct grpc_transport_op {
340  nullptr;
353  bool set_accept_stream = false;
354  void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
355  const void* server_data) = nullptr;
362  struct {
367  grpc_closure* on_ack = nullptr;
369  // If true, will reset the channel's connection backoff.
370  bool reset_connect_backoff = false;
371 
372  /***************************************************************************
373  * remaining fields are initialized and used at the discretion of the
374  * transport implementation */
375 
378 
379 /* Returns the amount of memory required to store a grpc_stream for this
380  transport */
381 size_t grpc_transport_stream_size(grpc_transport* transport);
382 
383 /* Initialize transport data for a stream.
384 
385  Returns 0 on success, any other (transport-defined) value for failure.
386  May assume that stream contains all-zeros.
387 
388  Arguments:
389  transport - the transport on which to create this stream
390  stream - a pointer to uninitialized memory to initialize
391  server_data - either NULL for a client initiated stream, or a pointer
392  supplied from the accept_stream callback function */
394  grpc_stream_refcount* refcount,
395  const void* server_data,
397 
398 void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
399  grpc_polling_entity* pollent);
400 
401 /* Destroy transport data for a stream.
402 
403  Requires: a recv_batch with final_state == GRPC_STREAM_CLOSED has been
404  received by the up-layer. Must not be called in the same call stack as
405  recv_frame.
406 
407  Arguments:
408  transport - the transport on which to create this stream
409  stream - the grpc_stream to destroy (memory is still owned by the
410  caller, but any child memory must be cleaned up) */
412  grpc_stream* stream,
413  grpc_closure* then_schedule_closure);
414 
418 
422 
423 /* Send a batch of operations on a transport
424 
425  Takes ownership of any objects contained in ops.
426 
427  Arguments:
428  transport - the transport on which to initiate the stream
429  stream - the stream on which to send the operations. This must be
430  non-NULL and previously initialized by the same transport.
431  op - a grpc_transport_stream_op_batch specifying the op to perform
432  */
434  grpc_stream* stream,
436 
438  grpc_transport_op* op);
439 
440 /* Send a ping on a transport
441 
442  Calls cb with user data when a response is received. */
444 
445 /* Advise peer of pending connection termination. */
447  grpc_slice debug_data);
448 
449 /* Destroy the transport */
450 void grpc_transport_destroy(grpc_transport* transport);
451 
452 /* Get the endpoint used by \a transport */
454 
455 /* Allocate a grpc_transport_op, and preconfigure the on_complete closure to
456  \a on_complete and then delete the returned transport op */
458 /* Allocate a grpc_transport_stream_op_batch, and preconfigure the on_complete
459  closure
460  to \a on_complete and then delete the returned transport op */
462  grpc_closure* on_complete);
463 
464 namespace grpc_core {
465 // This is the key to be used for loading/storing keepalive_throttling in the
466 // absl::Status object.
467 constexpr const char* kKeepaliveThrottlingKey =
468  "grpc.internal.keepalive_throttling";
469 } // namespace grpc_core
470 
471 #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */
Definition: arena.h:44
Definition: call_combiner.h:50
Definition: connectivity_state.h:51
Definition: ref_counted.h:47
void RefNonZero()
Definition: ref_counted.h:99
bool Unref()
Definition: ref_counted.h:155
Definition: trace.h:61
bool enabled()
Definition: trace.h:80
void(* grpc_iomgr_cb_func)(void *arg, grpc_error_handle error)
gRPC Callback definition.
Definition: closure.h:53
#define DEBUG_LOCATION
Definition: debug_location.h:41
#define GRPC_ERROR_NONE
The following "special" errors can be propagated without allocating memory.
Definition: error.h:228
intptr_t gpr_atm
Definition: atm_gcc_atomic.h:30
#define GPR_DEBUG
Macros to build log contexts at various severity levels.
Definition: log.h:53
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity, const char *format,...) GPR_PRINT_FORMAT_CHECK(4
Log a message.
#define GPR_UNLIKELY(x)
Definition: port_platform.h:660
grpc_status_code
Definition: status.h:26
grpc_connectivity_state
Connectivity state of a channel.
Definition: connectivity_state.h:27
@ GRPC_CHANNEL_IDLE
channel is idle
Definition: connectivity_state.h:29
CallCombiner * call_combiner
Definition: lame_client.cc:60
grpc_error_handle error
Definition: lame_client.cc:54
Round Robin Policy.
Definition: backend_metric.cc:26
std::unique_ptr< T, Deleter > OrphanablePtr
Definition: orphanable.h:67
constexpr const char * kKeepaliveThrottlingKey
Definition: transport.h:467
struct grpc_pollset_set grpc_pollset_set
Definition: pollset_set.h:31
grpc_transport_stream_op_batch * batch
Definition: retry_filter.cc:208
Definition: context.h:44
A closure over a grpc_iomgr_cb_func.
Definition: closure.h:56
void * cb_arg
Arguments to be passed to "cb".
Definition: closure.h:71
Definition: endpoint.h:106
Definition: error_internal.h:41
Definition: transport.h:152
grpc_closure closure
Definition: transport.h:154
grpc_handler_private_op_data()
Definition: transport.h:155
void * extra_arg
Definition: transport.h:153
Definition: metadata_batch.h:52
Definition: polling_entity.h:37
Definition: pollset_custom.cc:40
Definition: slice_internal.h:100
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1].
Definition: slice.h:60
Definition: transport.h:56
grpc_closure destroy
Definition: transport.h:58
grpc_slice_refcount slice_refcount
Definition: transport.h:62
grpc_core::RefCount refs
Definition: transport.h:57
const char * object_type
Definition: transport.h:60
Definition: transport.h:122
uint64_t header_bytes
Definition: transport.h:125
uint64_t data_bytes
Definition: transport.h:124
uint64_t framing_bytes
Definition: transport.h:123
Transport op: a set of operations to perform on a transport as a whole.
Definition: transport.h:332
grpc_error_handle disconnect_with_error
should the transport be disconnected Error contract: the transport that gets this op must cause disco...
Definition: transport.h:344
grpc_closure * on_initiate
Ping may be delayed by the transport, on_initiate callback will be called when the ping is actually b...
Definition: transport.h:365
bool reset_connect_backoff
Definition: transport.h:370
grpc_closure * on_consumed
Called when processing of this op is done.
Definition: transport.h:334
grpc_connectivity_state start_connectivity_watch_state
Definition: transport.h:338
grpc_core::OrphanablePtr< grpc_core::ConnectivityStateWatcherInterface > start_connectivity_watch
connectivity monitoring - set connectivity_state to NULL to unsubscribe
Definition: transport.h:337
void * set_accept_stream_user_data
Definition: transport.h:356
grpc_pollset_set * bind_pollset_set
add this transport to a pollset_set
Definition: transport.h:360
grpc_closure * on_ack
Called when the ping ack is received.
Definition: transport.h:367
grpc_handler_private_op_data handler_private
Definition: transport.h:376
struct grpc_transport_op::@39 send_ping
send a ping, if either on_initiate or on_ack is not NULL
void(* set_accept_stream_fn)(void *user_data, grpc_transport *transport, const void *server_data)
Definition: transport.h:354
bool set_accept_stream
set the callback for accepting new streams; this is a permanent callback, unlike the other one-shot c...
Definition: transport.h:353
grpc_core::ConnectivityStateWatcherInterface * stop_connectivity_watch
Definition: transport.h:339
grpc_error_handle goaway_error
what should the goaway contain? Error contract: the transport that gets this op must cause goaway_err...
Definition: transport.h:348
grpc_pollset * bind_pollset
add this transport to a pollset
Definition: transport.h:358
Definition: transport.h:220
grpc_metadata_batch * recv_initial_metadata
Definition: transport.h:274
uint32_t * recv_flags
Definition: transport.h:278
bool * trailing_metadata_available
Definition: transport.h:284
~grpc_transport_stream_op_batch_payload()
Definition: transport.h:224
grpc_core::OrphanablePtr< grpc_core::ByteStream > send_message
Definition: transport.h:258
grpc_transport_stream_stats * collect_stats
Definition: transport.h:306
grpc_metadata_batch * send_trailing_metadata
Definition: transport.h:244
grpc_closure * recv_message_ready
Should be enqueued when one message is ready to be processed.
Definition: transport.h:301
grpc_transport_stream_op_batch_payload(grpc_call_context_element *context)
Definition: transport.h:221
grpc_closure * recv_initial_metadata_ready
Should be enqueued when initial metadata is ready to be processed.
Definition: transport.h:280
gpr_atm * peer_string
Definition: transport.h:240
grpc_core::OrphanablePtr< grpc_core::ByteStream > * recv_message
Definition: transport.h:297
bool * sent
Definition: transport.h:250
bool * call_failed_before_recv_message
Definition: transport.h:299
grpc_metadata_batch * recv_trailing_metadata
Definition: transport.h:305
struct grpc_transport_stream_op_batch_payload::@38 cancel_stream
Forcefully close this stream.
grpc_metadata_batch * send_initial_metadata
Definition: transport.h:231
grpc_closure * recv_trailing_metadata_ready
Should be enqueued when trailing metadata is ready to be processed.
Definition: transport.h:308
grpc_error_handle cancel_error
Definition: transport.h:324
uint32_t send_initial_metadata_flags
Iff send_initial_metadata != NULL, flags associated with send_initial_metadata: a bitfield of GRPC_IN...
Definition: transport.h:234
bool stream_write_closed
Definition: transport.h:270
grpc_call_context_element * context
Definition: transport.h:328
Definition: transport.h:163
grpc_transport_stream_op_batch()
Definition: transport.h:164
bool is_traced
Is this stream traced.
Definition: transport.h:211
grpc_transport_stream_op_batch_payload * payload
Values for the stream op (fields set are determined by flags above)
Definition: transport.h:186
bool recv_message
Receive message data from the stream, into provided byte stream.
Definition: transport.h:201
bool cancel_stream
Cancel this stream with the provided error.
Definition: transport.h:208
grpc_handler_private_op_data handler_private
Definition: transport.h:217
bool recv_trailing_metadata
Receive trailing metadata from the stream, into provided metadata batch.
Definition: transport.h:205
grpc_closure * on_complete
Should be scheduled when all of the non-recv operations in the batch are complete.
Definition: transport.h:183
bool send_trailing_metadata
Send trailing metadata to the peer, from the provided metadata batch.
Definition: transport.h:192
bool send_message
Send message data to the peer, from the provided byte stream.
Definition: transport.h:195
bool send_initial_metadata
Send initial metadata to the peer, from the provided metadata batch.
Definition: transport.h:189
bool recv_initial_metadata
Receive initial metadata from the stream, into provided metadata batch.
Definition: transport.h:198
Definition: transport.h:128
grpc_transport_one_way_stats outgoing
Definition: transport.h:130
grpc_transport_one_way_stats incoming
Definition: transport.h:129
Definition: transport_impl.h:66
grpc_transport_op * grpc_make_transport_op(grpc_closure *on_complete)
Definition: transport.cc:229
grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, void *buffer, size_t length)
Definition: transport.cc:68
void grpc_transport_perform_stream_op(grpc_transport *transport, grpc_stream *stream, grpc_transport_stream_op_batch *op)
Definition: transport.cc:136
void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_iomgr_cb_func cb, void *cb_arg, const char *object_type)
Definition: transport.cc:84
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason)
Definition: transport.h:82
void grpc_transport_move_one_way_stats(grpc_transport_one_way_stats *from, grpc_transport_one_way_stats *to)
Definition: transport.cc:107
size_t grpc_transport_stream_size(grpc_transport *transport)
Definition: transport.cc:120
grpc_core::DebugOnlyTraceFlag grpc_trace_stream_refcount
void grpc_stream_unref(grpc_stream_refcount *refcount, const char *reason)
Definition: transport.h:99
std::string grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op)
Definition: transport_op_string.cc:66
struct grpc_stream grpc_stream
Definition: transport.h:52
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, grpc_slice debug_data)
void grpc_transport_move_stats(grpc_transport_stream_stats *from, grpc_transport_stream_stats *to)
Definition: transport.cc:114
void grpc_transport_set_pops(grpc_transport *transport, grpc_stream *stream, grpc_polling_entity *pollent)
Definition: transport.cc:147
void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb)
std::string grpc_transport_op_string(grpc_transport_op *op)
Definition: transport_op_string.cc:118
struct grpc_stream_refcount grpc_stream_refcount
void grpc_stream_destroy(grpc_stream_refcount *refcount)
Definition: transport.cc:42
grpc_endpoint * grpc_transport_get_endpoint(grpc_transport *transport)
Definition: transport.cc:168
void grpc_transport_destroy_stream(grpc_transport *transport, grpc_stream *stream, grpc_closure *then_schedule_closure)
Definition: transport.cc:162
void grpc_transport_stream_op_batch_finish_with_failure(grpc_transport_stream_op_batch *batch, grpc_error_handle error, grpc_core::CallCombiner *call_combiner)
Definition: transport.cc:179
grpc_transport_stream_op_batch * grpc_make_transport_stream_op(grpc_closure *on_complete)
Definition: transport.cc:252
int grpc_transport_init_stream(grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, const void *server_data, grpc_core::Arena *arena)
Definition: transport.cc:128
void grpc_transport_destroy(grpc_transport *transport)
Definition: transport.cc:124
void grpc_transport_perform_op(grpc_transport *transport, grpc_transport_op *op)
Definition: transport.cc:142
struct grpc_transport_op grpc_transport_op
Transport op: a set of operations to perform on a transport as a whole.
upb_arena * arena
Definition: xds_api.cc:909