GRPC Core  18.0.0
threadpool.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
21 
23 
24 #include <grpc/grpc.h>
25 
26 #include "src/core/lib/gprpp/thd.h"
28 
29 namespace grpc_core {
30 
31 // A base abstract base class for threadpool.
32 // Threadpool is an executor that maintains a pool of threads sitting around
33 // and waiting for closures. A threadpool also maintains a queue of pending
34 // closures, when closures appearing in the queue, the threads in pool will
35 // pull them out and execute them.
37  public:
38  // Waits for all pending closures to complete, then shuts down thread pool.
39  virtual ~ThreadPoolInterface() {}
40 
41  // Schedules a given closure for execution later.
42  // Depending on specific subclass implementation, this routine might cause
43  // current thread to be blocked (in case of unable to schedule).
44  // Closure should contain a function pointer and arguments it will take, more
45  // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
47 
48  // Returns the current number of pending closures
49  virtual int num_pending_closures() const = 0;
50 
51  // Returns the capacity of pool (number of worker threads in pool)
52  virtual int pool_capacity() const = 0;
53 
54  // Thread option accessor
55  virtual const Thread::Options& thread_options() const = 0;
56 
57  // Returns the thread name for threads in this ThreadPool.
58  virtual const char* thread_name() const = 0;
59 };
60 
61 // Worker thread for threadpool. Executes closures in the queue, until getting a
62 // NULL closure.
64  public:
65  ThreadPoolWorker(const char* thd_name, MPMCQueueInterface* queue,
66  Thread::Options& options, int index)
67  : queue_(queue), thd_name_(thd_name), index_(index) {
68  thd_ = Thread(
69  thd_name, [](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); },
70  this, nullptr, options);
71  }
72 
74 
75  void Start() { thd_.Start(); }
76  void Join() { thd_.Join(); }
77 
78  private:
79  // struct for tracking stats of thread
80  struct Stats {
81  gpr_timespec sleep_time;
82  Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
83  };
84 
85  void Run(); // Pulls closures from queue and executes them
86 
87  MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from
88  Thread thd_; // Thread wrapped in
89  Stats stats_; // Stats to be collected in run time
90  const char* thd_name_; // Name of thread
91  int index_; // Index in thread pool
92 };
93 
94 // A fixed size thread pool implementation of abstract thread pool interface.
95 // In this implementation, the number of threads in pool is fixed, but the
96 // capacity of closure queue is unlimited.
98  public:
99  // Creates a thread pool with size of "num_threads", with default thread name
100  // "ThreadPoolWorker" and all thread options set to default. If the given size
101  // is 0 or less, there will be 1 worker thread created inside pool.
102  explicit ThreadPool(int num_threads);
103 
104  // Same as ThreadPool(int num_threads) constructor, except
105  // that it also sets "thd_name" as the name of all threads in the thread pool.
106  ThreadPool(int num_threads, const char* thd_name);
107 
108  // Same as ThreadPool(const char *thd_name, int num_threads) constructor,
109  // except that is also set thread_options for threads.
110  // Notes for stack size:
111  // If the stack size field of the passed in Thread::Options is set to default
112  // value 0, default ThreadPool stack size will be used. The current default
113  // stack size of this implementation is 1952K for mobile platform and 64K for
114  // all others.
115  ThreadPool(int num_threads, const char* thd_name,
117 
118  // Waits for all pending closures to complete, then shuts down thread pool.
119  ~ThreadPool() override;
120 
121  // Adds given closure into pending queue immediately. Since closure queue has
122  // infinite length, this routine will not block.
123  void Add(grpc_completion_queue_functor* closure) override;
124 
125  int num_pending_closures() const override;
126  int pool_capacity() const override;
127  const Thread::Options& thread_options() const override;
128  const char* thread_name() const override;
129 
130  private:
131  int num_threads_ = 0;
132  const char* thd_name_ = nullptr;
133  Thread::Options thread_options_;
134  ThreadPoolWorker** threads_ = nullptr; // Array of worker threads
135  MPMCQueueInterface* queue_ = nullptr; // Closure queue
136 
137  Atomic<bool> shut_down_{false}; // Destructor has been called if set to true
138 
139  void SharedThreadPoolConstructor();
140  // For ThreadPool, default stack size for mobile platform is 1952K. for other
141  // platforms is 64K.
142  size_t DefaultStackSize();
143  // Internal Use Only for debug checking.
144  void AssertHasNotBeenShutDown();
145 };
146 
147 } // namespace grpc_core
148 
149 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */
Definition: mpmcqueue.h:34
Definition: thd.h:48
Definition: thd.h:46
void Join()
Definition: thd.h:144
void Start()
Definition: thd.h:128
Definition: threadpool.h:97
int pool_capacity() const override
Definition: threadpool.cc:129
int num_pending_closures() const override
Definition: threadpool.cc:127
~ThreadPool() override
Definition: threadpool.cc:103
void Add(grpc_completion_queue_functor *closure) override
Definition: threadpool.cc:122
const Thread::Options & thread_options() const override
Definition: threadpool.cc:131
const char * thread_name() const override
Definition: threadpool.cc:135
ThreadPool(int num_threads)
Definition: threadpool.cc:78
Definition: threadpool.h:36
virtual void Add(grpc_completion_queue_functor *closure)=0
virtual ~ThreadPoolInterface()
Definition: threadpool.h:39
virtual int pool_capacity() const =0
virtual const Thread::Options & thread_options() const =0
virtual const char * thread_name() const =0
virtual int num_pending_closures() const =0
Definition: threadpool.h:63
void Start()
Definition: threadpool.h:75
void Join()
Definition: threadpool.h:76
ThreadPoolWorker(const char *thd_name, MPMCQueueInterface *queue, Thread::Options &options, int index)
Definition: threadpool.h:65
~ThreadPoolWorker()
Definition: threadpool.h:73
@ GPR_TIMESPAN
Unmeasurable clock type: no base, created by taking the difference between two times.
Definition: gpr_types.h:42
Round Robin Policy.
Definition: backend_metric.cc:26
grpc_closure closure
Definition: server.cc:460
Analogous to struct timespec.
Definition: gpr_types.h:47
Specifies an interface class to be used as a tag for callback-based completion queues.
Definition: grpc_types.h:757
GPRAPI gpr_timespec gpr_time_0(gpr_clock_type type)
Time constants.
Definition: time.cc:46