OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ksync_tx_queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 // ksync_tx_queue.h
6 //
7 // Implmentation of a shared transmit queue between agent and KSync.
8 //
9 // KSync i/o operations are done on KSync Netlink socket. Even if the socket is
10 // set to non-blocking mode, the KSync socket i/o call will block till VRouter
11 // completes message processing. So, netlink i/o is not done in agent context.
12 // All messages are enqueued to a transmit and messages and i/o is done thru
13 // the transmit queue. This ensures agent doesnt block due to ksync processing
14 //
15 // There are two implementations of transmit queue
16 //
17 // WorkQueue based
18 // ---------------
19 // WorkQueue based implementation is used only for UT cases. WorkQueue based
20 // implementation is not used in production enviroment due to performance
21 // reasons. When we have slow producer and fast consumer, WorkQueue will
22 // result in spawning of a "task" for every message. The task spawning happens
23 // in producer context. In case of flow processing, agent is slow producer and
24 // vrouter is fast consumer. As a result, using WorkQueue will result in many
25 // task spawns thereby introducing latencies.
26 //
27 // We retain WorkQueue based implementations since UT code depends on
28 // WaitForIdle() APIs to continue with validations. Using Event-FD based
29 // implementations cannot rely on WaitForIdle()
30 //
31 //
32 // Event-FD based
33 // --------------
34 // This implementation is based on event_fd. The producer will add the entry
35 // into a tbb::concurrent_queue and notify the consumer on an event_fd. The
36 // consumer will block with "read" on event_fd. On getting an event, it will
37 // drain the queue of all messages in a tight loop. Consumer blocks on "read"
38 // when there is no data in the queue. This is an efficient implementation of
39 // queue between agent and ksync
40 //
41 #ifndef controller_src_ksync_ksync_tx_queue_h
42 #define controller_src_ksync_ksync_tx_queue_h
43 
44 #include <sys/eventfd.h>
45 #include <pthread.h>
46 #include <algorithm>
47 #include <vector>
48 #include <set>
49 
50 #include <tbb/atomic.h>
51 #include <tbb/concurrent_queue.h>
52 class KSyncSock;
53 class IoContext;
54 
55 class KSyncTxQueue {
56 public:
57  typedef tbb::concurrent_queue<IoContext *> Queue;
58 
59  KSyncTxQueue(KSyncSock *sock);
60  ~KSyncTxQueue();
61 
62  void Init(bool use_work_queue, const std::string &cpu_pin_policy);
63  void Shutdown();
64  bool Run();
65 
66  size_t enqueues() const { return enqueues_; }
67  size_t dequeues() const { return dequeues_; }
68  uint32_t write_events() const { return write_events_; }
69  uint32_t read_events() const { return read_events_; }
70  size_t queue_len() const { return queue_len_; }
71  uint64_t busy_time() const { return busy_time_; }
72  uint32_t max_queue_len() const { return max_queue_len_; }
73  void set_measure_busy_time(bool val) const { measure_busy_time_ = val; }
74  void ClearStats() const {
75  max_queue_len_ = 0;
76  enqueues_ = 0;
77  dequeues_ = 0;
78  busy_time_ = 0;
79  read_events_ = 0;
80  }
81 
82  bool Enqueue(IoContext *io_context) {
83  return EnqueueInternal(io_context);
84  }
85 
86 private:
87  bool EnqueueInternal(IoContext *io_context);
88 
90  int event_fd_;
91  // CPU pinning policy for netlink task
92  std::string cpu_pin_policy_;
95  tbb::atomic<bool> shutdown_;
96  pthread_t event_thread_;
97  tbb::atomic<size_t> queue_len_;
98  mutable size_t max_queue_len_;
99 
100  mutable size_t enqueues_;
101  mutable size_t dequeues_;
102  mutable size_t write_events_;
103  mutable size_t read_events_;
104  mutable uint64_t busy_time_;
105  mutable bool measure_busy_time_;
106 
108 };
109 
110 #endif // controller_src_ksync_ksync_tx_queue_h
bool EnqueueInternal(IoContext *io_context)
DISALLOW_COPY_AND_ASSIGN(KSyncTxQueue)
std::string cpu_pin_policy_
KSyncSock * sock_
size_t max_queue_len_
void Init(bool use_work_queue, const std::string &cpu_pin_policy)
uint32_t read_events() const
bool measure_busy_time_
bool Enqueue(IoContext *io_context)
uint32_t max_queue_len() const
KSyncTxQueue(KSyncSock *sock)
size_t enqueues() const
uint32_t write_events() const
size_t read_events_
uint64_t busy_time() const
size_t dequeues() const
tbb::concurrent_queue< IoContext * > Queue
uint64_t busy_time_
tbb::atomic< bool > shutdown_
size_t queue_len() const
size_t write_events_
tbb::atomic< size_t > queue_len_
pthread_t event_thread_
void set_measure_busy_time(bool val) const
void ClearStats() const
WorkQueue< IoContext * > * work_queue_