OpenSDN source code
ovsdb_client_tcp.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <boost/bind/bind.hpp>
6 
7 #include <oper/agent_sandesh.h>
8 #include <ovsdb_types.h>
9 #include <ovsdb_client_tcp.h>
11 
12 using namespace boost::placeholders;
13 
19 
20 OvsdbClientTcp::OvsdbClientTcp(Agent *agent, IpAddress tor_ip, int tor_port,
21  IpAddress tsn_ip, int keepalive_interval, int ha_stale_route_interval,
22  OvsPeerManager *manager) : TcpServer(agent->event_manager()),
23  OvsdbClient(manager, keepalive_interval, ha_stale_route_interval),
24  agent_(agent), session_(NULL), server_ep_(tor_ip, tor_port),
25  tsn_ip_(tsn_ip.to_v4()), shutdown_(false) {
26 }
27 
29 }
30 
35 }
36 
39  this, socket);
40  session->set_observer(boost::bind(&OvsdbClientTcp::OnSessionEvent,
41  this, _1, _2));
42  return session;
43 }
44 
46  TcpSession::Event event) {
47  OvsdbClientTcpSession *tcp = static_cast<OvsdbClientTcpSession *>(session);
48  tcp->EnqueueEvent(event);
49 }
50 
51 const std::string OvsdbClientTcp::protocol() {
52  return "TCP";
53 }
54 
55 const std::string OvsdbClientTcp::server() {
56  return server_ep_.address().to_string();
57 }
58 
60  return server_ep_.port();
61 }
62 
64  return tsn_ip_;
65 }
66 
68  if (shutdown_)
69  return;
70  shutdown_ = true;
72  static_cast<OvsdbClientTcpSession *>(session_);
73  tcp->TriggerClose();
74 }
75 
76 const boost::asio::ip::tcp::endpoint &OvsdbClientTcp::server_ep() const {
77  return server_ep_;
78 }
79 
81  // match both ip and port with available session
82  // if port is not provided match only ip
83  if (server_ep_.address().to_v4() == ip &&
84  (port == 0 || server_ep_.port() == port)) {
85  return static_cast<OvsdbClientSession *>(
86  static_cast<OvsdbClientTcpSession *>(session_));
87  }
88  return NULL;
89 }
90 
92  if (session_ == NULL) {
93  return NULL;
94  }
95  return static_cast<OvsdbClientSession *>(
96  static_cast<OvsdbClientTcpSession *>(session_));
97 }
98 
99 void OvsdbClientTcp::AddSessionInfo(SandeshOvsdbClient &client){
100  SandeshOvsdbClientSession session;
101  std::vector<SandeshOvsdbClientSession> session_list;
102  if (session_ != NULL) {
103  OvsdbClientTcpSession *tcp =
104  static_cast<OvsdbClientTcpSession *>(session_);
105  tcp->AddSessionInfo(session);
106  }
107  session_list.push_back(session);
108  client.set_sessions(session_list);
109 }
110 
112  OvsPeerManager *manager, TcpServer *server, Socket *sock,
113  bool async_ready) : OvsdbClientSession(agent, manager),
114  TcpSession(server, sock, async_ready), status_("Init"),
115  client_reconnect_timer_(TimerManager::CreateTimer(
116  *(agent->event_manager())->io_service(),
117  "OVSDB Client TCP reconnect Timer",
118  agent->task_scheduler()->GetTaskId("Agent::KSync"), 0)) {
119 
121  boost::bind(&OvsdbClientTcpSession::RecvMsg, this, _1, _2));
122 
123  // Process session events in KSync workqueue task context,
125  agent->task_scheduler()->GetTaskId("Agent::KSync"), 0,
126  boost::bind(&OvsdbClientTcpSession::ProcessSessionEvent, this, _1));
127  session_event_queue_->set_name("OVSDB TCP session event queue");
128 }
129 
131  delete reader_;
133  delete session_event_queue_;
135 }
136 
138  reader_->OnRead(buffer);
139 }
140 
141 void OvsdbClientTcpSession::SendMsg(u_int8_t *buf, std::size_t len) {
142  OVSDB_PKT_TRACE(Trace, "Sending: " + std::string((char *)buf, len));
143  Send(buf, len, NULL);
144 }
145 
146 bool OvsdbClientTcpSession::RecvMsg(const u_int8_t *buf, std::size_t len) {
147  OVSDB_PKT_TRACE(Trace, "Received: " + std::string((const char*)buf, len));
148  MessageProcess(buf, len);
149  return true;
150 }
151 
153  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
154  return ovs_server->keepalive_interval();
155 }
156 
157 const boost::system::error_code &
159  return close_reason();
160 }
161 
163  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
164  return ovs_server->connection_table();
165 }
166 
168  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
169  return ovs_server->ksync_obj_manager();
170 }
171 
173  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
174  return ovs_server->tsn_ip();
175 }
176 
178  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
179  ovs_server->DeleteSession(this);
180 }
181 
183  // Close the session and return
184  Close();
185 
186  // tcp session will not notify event for self closed session
187  // generate explicit event
188  OvsdbSessionEvent ovs_event;
189  ovs_event.event = TcpSession::CLOSE;
190  session_event_queue_->Enqueue(ovs_event);
191 }
192 
194  return remote_endpoint().address().to_v4();
195 }
196 
198  return remote_endpoint().port();
199 }
200 
202  OvsdbClientTcp *ovs_server =
203  static_cast<OvsdbClientTcp *>(server());
204  boost::system::error_code ec;
205  switch (ovs_event.event) {
207  assert(client_reconnect_timer_->fired() == false);
208  /* Failed to Connect, Try Again! */
210  boost::bind(&OvsdbClientTcpSession::ReconnectTimerCb, this)) == false ) {
211  assert(0);
212  }
213  set_status("Reconnecting");
214  break;
215  case TcpSession::CLOSE:
216  {
217  // Trigger close for the current session, to allocate
218  // and start a new one.
219  OnClose();
220  if (ovs_server->shutdown_ == false) {
221  ovs_server->session_ = ovs_server->CreateSession();
222  ovs_server->Connect(ovs_server->session_,
223  ovs_server->server_ep());
224  } else {
225  ovs_server->session_ = NULL;
226  }
227  }
228  break;
230  if (!ovs_server->pre_connect_complete_cb_.empty()) {
231  ovs_server->pre_connect_complete_cb_(this);
232  }
233  if (!IsClosed()) {
234  ec = SetSocketOptions();
235  assert(ec.value() == 0);
236  set_status("Established");
237  OnEstablish();
238  if (!ovs_server->connect_complete_cb_.empty()) {
239  ovs_server->connect_complete_cb_(this);
240  }
241  } else {
242  OVSDB_SESSION_TRACE(Trace, this, "Skipping connection complete on"
243  " closed session");
244  }
245  break;
246  default:
247  break;
248  }
249  return true;
250 }
251 
253  OvsdbSessionEvent ovs_event;
254  ovs_event.event = event;
255  session_event_queue_->Enqueue(ovs_event);
256 }
257 
259  OvsdbClientTcp *ovs_server = static_cast<OvsdbClientTcp *>(server());
260  ovs_server->Connect(this, ovs_server->server_ep());
261  return false;
262 }
263 
265  TcpSession *session, ReceiveCallback callback) :
266  TcpMessageReader(session, callback) {
267 }
268 
270 }
271 
273  size_t size = TcpSession::BufferSize(buffer);
274  int remain = size - offset;
275  if (remain < GetHeaderLenSize()) {
276  return -1;
277  }
278 
279  return remain;
280 }
281 
boost::asio::ip::address IpAddress
Definition: address.h:13
boost::asio::ip::address_v4 Ip4Address
Definition: address.h:14
Definition: agent.h:360
TaskScheduler * task_scheduler() const
Definition: agent.h:1122
void AddSessionInfo(SandeshOvsdbClientSession &session)
void MessageProcess(const u_int8_t *buf, std::size_t len)
OvsdbClientTcpSessionReader(TcpSession *session, ReceiveCallback callback)
virtual int MsgLength(Buffer buffer, int offset)
virtual uint16_t remote_port() const
WorkQueue< OvsdbSessionEvent > * session_event_queue_
bool RecvMsg(const u_int8_t *buf, std::size_t len)
void EnqueueEvent(TcpSession::Event event)
virtual void OnRead(Buffer buffer)
static const uint32_t TcpReconnectWait
ConnectionStateTable * connection_table()
virtual Ip4Address remote_ip() const
bool ProcessSessionEvent(OvsdbSessionEvent event)
void SendMsg(u_int8_t *buf, std::size_t len)
OvsdbClientTcpSession(Agent *agent, OvsPeerManager *manager, TcpServer *server, Socket *sock, bool async_ready=true)
void set_status(std::string status)
OvsdbClientTcpSessionReader * reader_
KSyncObjectManager * ksync_obj_manager()
const boost::system::error_code & ovsdb_close_reason() const
boost::asio::ip::tcp::endpoint server_ep_
void AddSessionInfo(SandeshOvsdbClient &client)
const boost::asio::ip::tcp::endpoint & server_ep() const
friend class OvsdbClientTcpSession
const std::string server()
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
const std::string protocol()
virtual TcpSession * AllocSession(Socket *socket)
OvsdbClientSession * NextSession(OvsdbClientSession *session)
OvsdbClientSession * FindSession(Ip4Address ip, uint16_t port)
OvsPeerManager * peer_manager_
Definition: ovsdb_client.h:64
SessionEventCb pre_connect_complete_cb_
Definition: ovsdb_client.h:67
SessionEventCb connect_complete_cb_
Definition: ovsdb_client.h:66
ConnectionStateTable * connection_table()
Definition: ovsdb_client.cc:40
void RegisterConnectionTable(Agent *agent)
Definition: ovsdb_client.cc:36
int keepalive_interval() const
Definition: ovsdb_client.cc:48
KSyncObjectManager * ksync_obj_manager()
Definition: ovsdb_client.cc:44
int GetTaskId(const std::string &name)
Definition: task.cc:861
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:672
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:307
boost::asio::const_buffer Buffer
Definition: tcp_session.h:306
virtual void Connect(TcpSession *session, Endpoint remote)
Definition: tcp_server.cc:476
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:190
virtual void DeleteSession(TcpSession *session)
Definition: tcp_server.cc:199
boost::asio::ip::tcp::socket Socket
Definition: tcp_server.h:31
@ CONNECT_COMPLETE
Definition: tcp_session.h:46
@ CONNECT_FAILED
Definition: tcp_session.h:47
TcpServer * server()
Definition: tcp_session.h:84
const boost::system::error_code & close_reason() const
Definition: tcp_session.h:141
bool IsClosed() const
Definition: tcp_session.h:121
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:112
void set_observer(EventObserver observer)
Definition: tcp_session.cc:219
Endpoint remote_endpoint() const
Definition: tcp_session.h:131
boost::asio::const_buffer Buffer
Definition: tcp_session.h:60
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:869
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:56
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:429
void Close()
Definition: tcp_session.cc:355
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:221
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:107
bool fired() const
Definition: timer.h:94
Definition: trace.h:288
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void set_name(const std::string &name)
Definition: queue_task.h:307
#define OVSDB_PKT_TRACE(obj,...)
#define OVSDB_SESSION_TRACE(obj, session,...)