OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sandesh_connection.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_connection.cc
7 //
8 // Sandesh connection management implementation
9 //
10 
11 #include <io/tcp_session.h>
12 #include <io/tcp_server.h>
13 #include <base/logging.h>
14 #include <sandesh/protocol/TXMLProtocol.h>
15 
16 #include <sandesh/sandesh_types.h>
17 #include <sandesh/sandesh.h>
18 #include <sandesh/sandesh_ctrl_types.h>
19 #include <sandesh/common/vns_types.h>
20 #include <sandesh/common/vns_constants.h>
21 #include "sandesh_uve.h"
22 #include "sandesh_session.h"
23 #include "sandesh_client.h"
24 #include "sandesh_server.h"
25 #include "sandesh_connection.h"
26 
27 using namespace std;
28 using boost::system::error_code;
29 
30 #define CONNECTION_LOG(_Level, _Msg) \
31  do { \
32  if (LoggingDisabled()) break; \
33  log4cplus::Logger _Xlogger = Sandesh::logger(); \
34  if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
35  log4cplus::tostringstream _Xbuf; \
36  if (!state_machine()->generator_key().empty()) { \
37  _Xbuf << state_machine()->generator_key() << " (" << \
38  GetTaskInstance() << ") "; \
39  } \
40  if (session()) { \
41  _Xbuf << session()->ToString() << " "; \
42  } \
43  _Xbuf << _Msg; \
44  _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
45  _Xbuf.str()); \
46  } \
47  } while (false)
48 
49 SandeshConnection::SandeshConnection(const char *prefix, TcpServer *server,
50  Endpoint endpoint, int task_instance, int task_id)
51  : is_deleted_(false),
52  server_(server),
53  endpoint_(endpoint),
54  admin_down_(false),
55  session_(NULL),
56  task_instance_(task_instance),
57  task_id_(task_id),
58  state_machine_(new SandeshStateMachine(prefix, this)) {
59 }
60 
62 }
63 
65  session_ = session;
66 }
67 
69  return session_;
70 }
71 
72 bool SandeshConnection::ReceiveMsg(const std::string &msg, SandeshSession *session) {
73  return state_machine_->OnSandeshMessage(session, msg);
74 }
75 
77  if (!session_) {
78  snh->Release();
79  return false;
80  }
81  ssm::SsmState state = state_machine_->get_state();
82  if (state != ssm::SERVER_INIT && state != ssm::ESTABLISHED) {
83  snh->Release();
84  return false;
85  }
86  // XXX No bounded work queue
87  SandeshElement element(snh);
88  session_->send_queue()->Enqueue(element);
89  return true;
90 }
91 
93  session->SetReceiveMsgCb(boost::bind(&SandeshConnection::ReceiveMsg, this, _1, _2));
94  session->SetConnection(this);
95  state_machine_->PassiveOpen(session);
96 }
97 
99  return state_machine_.get();
100 }
101 
103  if (admin_down_ != down) {
104  admin_down_ = down;
105  state_machine_->SetAdminState(down);
106  }
107 }
108 
110  is_deleted_ = true;
111  deleter()->Delete();
112 }
113 
115  // XXX Do we have any dependencies?
116  return true;
117 }
118 
120 public:
122  : LifetimeActor(server->lifetime_manager()),
123  server_(server), parent_(parent) {
124  }
125  virtual bool MayDelete() const {
126  return parent_->MayDelete();
127  }
128  virtual void Shutdown() {
129  SandeshSession *session = NULL;
130  if (parent_->state_machine()) {
131  session = parent_->state_machine()->session();
133  }
134  if (session) {
135  server_->DeleteSession(session);
136  }
137  parent_->is_deleted_ = true;
138  }
139  virtual void Destroy() {
140  parent_->Destroy();
141  }
142 
143 private:
146 };
147 
149  TcpServer *server, Endpoint endpoint, int task_instance, int task_id)
150  : SandeshConnection("SandeshServer: ", server, endpoint, task_instance, task_id),
151  deleter_(new DeleteActor(dynamic_cast<SandeshServer *>(server), this)),
152  server_delete_ref_(this, dynamic_cast<SandeshServer *>(server)->deleter()) {
153 }
154 
156 }
157 
159  deleter_->Delete();
160 }
161 
163  const SandeshHeader &header, const std::string sandesh_name,
164  const uint32_t header_offset) {
165  SandeshServer *sserver = dynamic_cast<SandeshServer *>(server());
166  if (!sserver) {
167  CONNECTION_LOG(ERROR, __func__ << " No Server");
168  return false;
169  }
170  Sandesh *ctrl_snh = SandeshSession::DecodeCtrlSandesh(msg, header, sandesh_name,
171  header_offset);
172  bool ret = sserver->ReceiveSandeshCtrlMsg(state_machine(), session(), ctrl_snh);
173  ctrl_snh->Release();
174  return ret;
175 }
176 
178  SandeshServer *sserver = dynamic_cast<SandeshServer *>(server());
179  if (!sserver) {
180  CONNECTION_LOG(ERROR, __func__ << " No Server");
181  return false;
182  }
183  return sserver->ReceiveResourceUpdate(session(), rsc);
184 }
185 
187  const SandeshMessage *msg, bool resource) {
188  SandeshServer *sserver = dynamic_cast<SandeshServer *>(server());
189  if (!sserver) {
190  CONNECTION_LOG(ERROR, __func__ << " No Server");
191  return false;
192  }
193  sserver->ReceiveSandeshMsg(session(), msg, resource);
194  return true;
195 }
196 
198  SandeshServer *sserver = dynamic_cast<SandeshServer *>(server());
199  if (!sserver) {
200  CONNECTION_LOG(ERROR, __func__ << " No Server");
201  return;
202  }
203  sserver->DisconnectSession(sess);
204 }
205 
207  SandeshServer *sserver = dynamic_cast<SandeshServer *>(server());
208  if (!sserver) {
209  CONNECTION_LOG(ERROR, __func__ << " No Server");
210  return NULL;
211  }
212  return sserver->lifetime_manager();
213 }
214 
216  SandeshServer *sserver = dynamic_cast<SandeshServer *>(server());
217  if (!sserver) {
218  CONNECTION_LOG(ERROR, __func__ << " No Server");
219  return;
220  }
221  int index = GetTaskInstance();
222  if (index != -1) {
223  sserver->FreeConnectionIndex(index);
224  }
225  // Deletes self - should always be the last
226  sserver->RemoveConnection(this);
227 };
228 
230  return deleter_.get();
231 }
virtual LifetimeManager * lifetime_manager()
boost::scoped_ptr< DeleteActor > deleter_
#define CONNECTION_LOG(_Level, _Msg)
void RemoveConnection(SandeshConnection *connection)
virtual void DeleteSession(TcpSession *session)
Definition: tcp_server.cc:197
TcpServer * server()
SandeshConnection(const char *prefix, TcpServer *server, Endpoint endpoint, int task_instance, int task_id)
SandeshSession * session() const
void AcceptSession(SandeshSession *session)
virtual LifetimeActor * deleter()=0
virtual bool ReceiveSandeshCtrlMsg(SandeshStateMachine *state_machine, SandeshSession *session, const Sandesh *sandesh)
SandeshServerConnection(TcpServer *server, Endpoint endpoint, int task_instance, int task_id)
boost::asio::ip::tcp::endpoint Endpoint
virtual void DisconnectSession(SandeshSession *session)
SandeshSession * session()
void SetAdminState(bool down)
void set_session(SandeshSession *session)
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
SandeshSession * session_
virtual bool ReceiveSandeshMsg(SandeshSession *session, const SandeshMessage *msg, bool resource)=0
virtual bool ReceiveResourceUpdate(SandeshSession *session, bool rsc)
DeleteActor(SandeshServer *server, SandeshServerConnection *parent)
void SetConnection(SandeshConnection *connection)
virtual LifetimeActor * deleter()
Sandesh::SandeshQueue * send_queue()
virtual void Release()
Definition: p/sandesh.h:266
virtual bool ProcessResourceUpdate(bool res)
int GetTaskInstance() const
virtual void ProcessDisconnect(SandeshSession *session)
virtual void Delete()
Definition: lifetime.cc:38
virtual bool ProcessSandeshMessage(const SandeshMessage *msg, bool resource)
SandeshStateMachine * state_machine() const
virtual bool ReceiveMsg(const std::string &msg, SandeshSession *session)
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
virtual bool ProcessSandeshCtrlMessage(const std::string &msg, const SandeshHeader &header, const std::string sandesh_name, const uint32_t header_offset)
LifetimeManager * lifetime_manager()
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
bool SendSandesh(Sandesh *snh)
void FreeConnectionIndex(int)
boost::scoped_ptr< SandeshStateMachine > state_machine_