OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ovsdb_client_tcp.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <boost/bind.hpp>
6 
7 #include <oper/agent_sandesh.h>
8 #include <ovsdb_types.h>
9 #include <ovsdb_client_tcp.h>
11 
17 
18 OvsdbClientTcp::OvsdbClientTcp(Agent *agent, IpAddress tor_ip, int tor_port,
19  IpAddress tsn_ip, int keepalive_interval, int ha_stale_route_interval,
20  OvsPeerManager *manager) : TcpServer(agent->event_manager()),
21  OvsdbClient(manager, keepalive_interval, ha_stale_route_interval),
22  agent_(agent), session_(NULL), server_ep_(tor_ip, tor_port),
23  tsn_ip_(tsn_ip.to_v4()), shutdown_(false) {
24 }
25 
27 }
28 
33 }
34 
37  this, socket);
38  session->set_observer(boost::bind(&OvsdbClientTcp::OnSessionEvent,
39  this, _1, _2));
40  return session;
41 }
42 
44  TcpSession::Event event) {
45  OvsdbClientTcpSession *tcp = static_cast<OvsdbClientTcpSession *>(session);
46  tcp->EnqueueEvent(event);
47 }
48 
49 const std::string OvsdbClientTcp::protocol() {
50  return "TCP";
51 }
52 
53 const std::string OvsdbClientTcp::server() {
54  return server_ep_.address().to_string();
55 }
56 
58  return server_ep_.port();
59 }
60 
62  return tsn_ip_;
63 }
64 
66  if (shutdown_)
67  return;
68  shutdown_ = true;
70  static_cast<OvsdbClientTcpSession *>(session_);
71  tcp->TriggerClose();
72 }
73 
74 const boost::asio::ip::tcp::endpoint &OvsdbClientTcp::server_ep() const {
75  return server_ep_;
76 }
77 
79  // match both ip and port with available session
80  // if port is not provided match only ip
81  if (server_ep_.address().to_v4() == ip &&
82  (port == 0 || server_ep_.port() == port)) {
83  return static_cast<OvsdbClientSession *>(
84  static_cast<OvsdbClientTcpSession *>(session_));
85  }
86  return NULL;
87 }
88 
90  if (session_ == NULL) {
91  return NULL;
92  }
93  return static_cast<OvsdbClientSession *>(
94  static_cast<OvsdbClientTcpSession *>(session_));
95 }
96 
97 void OvsdbClientTcp::AddSessionInfo(SandeshOvsdbClient &client){
98  SandeshOvsdbClientSession session;
99  std::vector<SandeshOvsdbClientSession> session_list;
100  if (session_ != NULL) {
101  OvsdbClientTcpSession *tcp =
102  static_cast<OvsdbClientTcpSession *>(session_);
103  tcp->AddSessionInfo(session);
104  }
105  session_list.push_back(session);
106  client.set_sessions(session_list);
107 }
108 
110  OvsPeerManager *manager, TcpServer *server, Socket *sock,
111  bool async_ready) : OvsdbClientSession(agent, manager),
112  TcpSession(server, sock, async_ready), status_("Init"),
113  client_reconnect_timer_(TimerManager::CreateTimer(
114  *(agent->event_manager())->io_service(),
115  "OVSDB Client TCP reconnect Timer",
116  agent->task_scheduler()->GetTaskId("Agent::KSync"), 0)) {
117 
119  boost::bind(&OvsdbClientTcpSession::RecvMsg, this, _1, _2));
120 
121  // Process session events in KSync workqueue task context,
123  agent->task_scheduler()->GetTaskId("Agent::KSync"), 0,
124  boost::bind(&OvsdbClientTcpSession::ProcessSessionEvent, this, _1));
125  session_event_queue_->set_name("OVSDB TCP session event queue");
126 }
127 
129  delete reader_;
131  delete session_event_queue_;
133 }
134 
136  reader_->OnRead(buffer);
137 }
138 
139 void OvsdbClientTcpSession::SendMsg(u_int8_t *buf, std::size_t len) {
140  OVSDB_PKT_TRACE(Trace, "Sending: " + std::string((char *)buf, len));
141  Send(buf, len, NULL);
142 }
143 
144 bool OvsdbClientTcpSession::RecvMsg(const u_int8_t *buf, std::size_t len) {
145  OVSDB_PKT_TRACE(Trace, "Received: " + std::string((const char*)buf, len));
146  MessageProcess(buf, len);
147  return true;
148 }
149 
151  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
152  return ovs_server->keepalive_interval();
153 }
154 
155 const boost::system::error_code &
157  return close_reason();
158 }
159 
161  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
162  return ovs_server->connection_table();
163 }
164 
166  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
167  return ovs_server->ksync_obj_manager();
168 }
169 
171  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
172  return ovs_server->tsn_ip();
173 }
174 
176  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
177  ovs_server->DeleteSession(this);
178 }
179 
181  // Close the session and return
182  Close();
183 
184  // tcp session will not notify event for self closed session
185  // generate explicit event
186  OvsdbSessionEvent ovs_event;
187  ovs_event.event = TcpSession::CLOSE;
188  session_event_queue_->Enqueue(ovs_event);
189 }
190 
192  return remote_endpoint().address().to_v4();
193 }
194 
196  return remote_endpoint().port();
197 }
198 
200  OvsdbClientTcp *ovs_server =
201  static_cast<OvsdbClientTcp *>(server());
202  boost::system::error_code ec;
203  switch (ovs_event.event) {
205  assert(client_reconnect_timer_->fired() == false);
206  /* Failed to Connect, Try Again! */
208  boost::bind(&OvsdbClientTcpSession::ReconnectTimerCb, this)) == false ) {
209  assert(0);
210  }
211  set_status("Reconnecting");
212  break;
213  case TcpSession::CLOSE:
214  {
215  // Trigger close for the current session, to allocate
216  // and start a new one.
217  OnClose();
218  if (ovs_server->shutdown_ == false) {
219  ovs_server->session_ = ovs_server->CreateSession();
220  ovs_server->Connect(ovs_server->session_,
221  ovs_server->server_ep());
222  } else {
223  ovs_server->session_ = NULL;
224  }
225  }
226  break;
228  if (!ovs_server->pre_connect_complete_cb_.empty()) {
229  ovs_server->pre_connect_complete_cb_(this);
230  }
231  if (!IsClosed()) {
232  ec = SetSocketOptions();
233  assert(ec.value() == 0);
234  set_status("Established");
235  OnEstablish();
236  if (!ovs_server->connect_complete_cb_.empty()) {
237  ovs_server->connect_complete_cb_(this);
238  }
239  } else {
240  OVSDB_SESSION_TRACE(Trace, this, "Skipping connection complete on"
241  " closed session");
242  }
243  break;
244  default:
245  break;
246  }
247  return true;
248 }
249 
251  OvsdbSessionEvent ovs_event;
252  ovs_event.event = event;
253  session_event_queue_->Enqueue(ovs_event);
254 }
255 
257  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
258  ovs_server->Connect(this, ovs_server->server_ep());
259  return false;
260 }
261 
263  TcpSession *session, ReceiveCallback callback) :
264  TcpMessageReader(session, callback) {
265 }
266 
268 }
269 
271  size_t size = TcpSession::BufferSize(buffer);
272  int remain = size - offset;
273  if (remain < GetHeaderLenSize()) {
274  return -1;
275  }
276 
277  return remain;
278 }
279 
void Close()
Definition: tcp_session.cc:354
boost::asio::ip::tcp::endpoint server_ep_
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
virtual void DeleteSession(TcpSession *session)
Definition: tcp_server.cc:197
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:311
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
bool RecvMsg(const u_int8_t *buf, std::size_t len)
OvsdbClientSession * FindSession(Ip4Address ip, uint16_t port)
boost::asio::ip::tcp::socket Socket
Definition: tcp_server.h:31
virtual void Connect(TcpSession *session, Endpoint remote)
Definition: tcp_server.cc:474
boost::asio::ip::address IpAddress
Definition: address.h:13
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:116
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:188
void RegisterConnectionTable(Agent *agent)
Definition: ovsdb_client.cc:36
const boost::system::error_code & close_reason() const
Definition: tcp_session.h:145
virtual uint16_t remote_port() const
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:428
OvsdbClientSession * NextSession(OvsdbClientSession *session)
int GetTaskId(const std::string &name)
Definition: task.cc:856
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:60
void set_observer(EventObserver observer)
Definition: tcp_session.cc:218
const boost::system::error_code & ovsdb_close_reason() const
void AddSessionInfo(SandeshOvsdbClient &client)
bool ProcessSessionEvent(OvsdbSessionEvent event)
TaskScheduler * task_scheduler() const
Definition: agent.h:1120
#define OVSDB_PKT_TRACE(obj,...)
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:671
bool fired() const
Definition: timer.h:91
Definition: agent.h:358
OvsdbClientTcpSessionReader * reader_
boost::asio::const_buffer Buffer
Definition: tcp_session.h:310
const std::string server()
int keepalive_interval() const
Definition: ovsdb_client.cc:48
void AddSessionInfo(SandeshOvsdbClientSession &session)
void set_status(std::string status)
Definition: trace.h:220
friend class OvsdbClientTcpSession
virtual TcpSession * AllocSession(Socket *socket)
const std::string protocol()
virtual void OnRead(Buffer buffer)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
#define OVSDB_SESSION_TRACE(obj, session,...)
virtual int MsgLength(Buffer buffer, int offset)
boost::asio::ip::address_v4 Ip4Address
Definition: address.h:14
bool IsClosed() const
Definition: tcp_session.h:125
static const uint32_t TcpReconnectWait
ConnectionStateTable * connection_table()
Definition: ovsdb_client.cc:40
virtual Ip4Address remote_ip() const
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
const boost::asio::ip::tcp::endpoint & server_ep() const
TcpServer * server()
Definition: tcp_session.h:88
Endpoint remote_endpoint() const
Definition: tcp_session.h:135
void EnqueueEvent(TcpSession::Event event)
void MessageProcess(const u_int8_t *buf, std::size_t len)
SessionEventCb connect_complete_cb_
Definition: ovsdb_client.h:66
WorkQueue< OvsdbSessionEvent > * session_event_queue_
KSyncObjectManager * ksync_obj_manager()
SessionEventCb pre_connect_complete_cb_
Definition: ovsdb_client.h:67
OvsPeerManager * peer_manager_
Definition: ovsdb_client.h:64
OvsdbClientTcpSession(Agent *agent, OvsPeerManager *manager, TcpServer *server, Socket *sock, bool async_ready=true)
KSyncObjectManager * ksync_obj_manager()
Definition: ovsdb_client.cc:44
void SendMsg(u_int8_t *buf, std::size_t len)
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void set_name(const std::string &name)
Definition: queue_task.h:307
ConnectionStateTable * connection_table()
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:868
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
OvsdbClientTcpSessionReader(TcpSession *session, ReceiveCallback callback)