OpenSDN source code
sandesh_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_server.cc
7 //
8 // Sandesh server implementation
9 //
10 
11 #include <boost/bind/bind.hpp>
12 #include <boost/assign.hpp>
13 
14 #include <base/address_util.h>
15 #include <sandesh/protocol/TXMLProtocol.h>
16 #include <sandesh/sandesh_types.h>
17 #include <sandesh/sandesh.h>
18 #include <sandesh/sandesh_ctrl_types.h>
19 #include "sandesh_connection.h"
20 #include "sandesh_session.h"
21 #include "sandesh_server.h"
22 
23 using namespace std;
24 using namespace boost::asio;
25 using namespace boost::placeholders;
26 
27 const std::string SandeshServer::kStateMachineTask = "sandesh::SandeshStateMachine";
28 const std::string SandeshServer::kLifetimeMgrTask = "sandesh::LifetimeMgr";
29 const std::string SandeshServer::kSessionReaderTask = "io::ReaderTask";
30 
32 public:
34  LifetimeActor(server->lifetime_manager()), server_(server) { }
35  virtual bool MayDelete() const {
36  return true;
37  }
38  virtual void Shutdown() {
39  server_->SessionShutdown();
40  }
41  virtual void Destroy() {
42  }
43 private:
45 };
46 
48 
50  : SslServer(evm, boost::asio::ssl::context::tlsv12_server,
51  config.sandesh_ssl_enable),
52  sm_task_id_(TaskScheduler::GetInstance()->GetTaskId(kStateMachineTask)),
53  session_reader_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionReaderTask)),
54  lifetime_mgr_task_id_(TaskScheduler::GetInstance()->GetTaskId(kLifetimeMgrTask)),
55  lifetime_manager_(new LifetimeManager(lifetime_mgr_task_id_)),
56  deleter_(new DeleteActor(this)) {
57  // Set task policy for exclusion between :
58  // 1. State machine and lifetime mgr since state machine delete happens
59  // in lifetime mgr task
60  if (!task_policy_set_) {
61  TaskPolicy lm_task_policy = boost::assign::list_of
65  task_policy_set_ = true;
66  }
67  if (config.sandesh_ssl_enable) {
68  boost::asio::ssl::context *ctx = context();
69  boost::system::error_code ec;
70  ctx->set_options(boost::asio::ssl::context::default_workarounds |
71  boost::asio::ssl::context::no_tlsv1 |
72  boost::asio::ssl::context::no_sslv3 |
73  boost::asio::ssl::context::no_sslv2 |
74  boost::asio::ssl::context::no_tlsv1_1, ec);
75  if (ec.value() != 0) {
76  SANDESH_LOG(ERROR, "Error setting ssl options: " << ec.message());
77  exit(EINVAL);
78  }
79  // CA certificate
80  if (!config.ca_cert.empty()) {
81  // Verify that the peer certificate is signed by a trusted CA
82  ctx->set_verify_mode(boost::asio::ssl::verify_peer |
83  boost::asio::ssl::verify_fail_if_no_peer_cert,
84  ec);
85  if (ec.value() != 0) {
86  SANDESH_LOG(ERROR, "Error setting verification mode: " <<
87  ec.message());
88  exit(EINVAL);
89  }
90  ctx->load_verify_file(config.ca_cert, ec);
91  if (ec.value() != 0) {
92  SANDESH_LOG(ERROR, "Error loading CA certificate: " <<
93  ec.message());
94  exit(EINVAL);
95  }
96  }
97  // Server certificate
98  ctx->use_certificate_chain_file(config.server_certfile, ec);
99  if (ec.value() != 0) {
100  SANDESH_LOG(ERROR, "Error using server certificate: " <<
101  ec.message());
102  exit(EINVAL);
103  }
104  // Server private key
105  ctx->use_private_key_file(config.server_keyfile,
106  boost::asio::ssl::context::pem, ec);
107  if (ec.value() != 0) {
108  SANDESH_LOG(ERROR, "Error using server private key file: " <<
109  ec.message());
110  exit(EINVAL);
111  }
112  }
113 }
114 
117 }
118 
120  return lifetime_mgr_task_id_;
121 }
122 
125 }
126 
127 bool SandeshServer::Initialize(short port, const std::string &ip) {
128  int count = 0;
129 
130  boost::system::error_code ec;
131  boost::asio::ip::address ip_addr = AddressFromString(ip, &ec);
132  if (ec) {
133  SANDESH_LOG(ERROR, __func__ << ": Invalid server address: " <<
134  ip << " Error: " << ec);
135  return false;
136  }
137  while (count++ < kMaxInitRetries) {
138  if (TcpServer::Initialize(port, ip_addr))
139  break;
140  sleep(1);
141  }
142  if (!(count < kMaxInitRetries)) {
143  SANDESH_LOG(ERROR, "Process EXITING: TCP Server initialization failed for port " << port);
144  exit(1);
145  }
146  return true;
147 }
148 
150  std::scoped_lock lock(mutex_);
151  size_t bit = conn_bmap_.find_first();
152  if (bit == conn_bmap_.npos) {
153  bit = conn_bmap_.size();
154  conn_bmap_.resize(bit + 1, true);
155  }
156  conn_bmap_.reset(bit);
157  return bit;
158 }
159 
161  std::scoped_lock lock(mutex_);
162  conn_bmap_.set(id);
163 
164  for (size_t i = conn_bmap_.size(); i != 0; i--) {
165  if (conn_bmap_[i-1] != true) {
166  if (i != conn_bmap_.size()) {
167  conn_bmap_.resize(i);
168  }
169  return;
170  }
171  }
172  conn_bmap_.clear();
173 }
174 
177  SOL_SOCKET, SO_REUSEADDR> reuse_addr_t;
179  Socket *socket = session->socket();
180 
181  boost::system::error_code err;
182  socket->open(ip::tcp::v4(), err);
183  if (err) {
184  SANDESH_LOG(ERROR, __func__ << " Server Open Fail " << err.message());
185  }
186 
187  socket->set_option(reuse_addr_t(true), err);
188  if (err) {
189  SANDESH_LOG(ERROR, __func__ << " SetSockOpt Fail " << err.message());
190  return session;
191  }
192 
193  socket->bind(LocalEndpoint(), err);
194  if (err) {
195  SANDESH_LOG(ERROR, __func__ << " Server Bind Failure " << err.message());
196  }
197 
198  return session;
199 }
200 
202  assert(deleter_.get());
203  deleter_->Delete();
204 }
205 
206 bool SandeshServer::Compare(const Endpoint &peer_addr,
207  const SandeshConnectionPair &p) const {
208  return (peer_addr == p.second->endpoint() ? false : true);
209 }
210 
212  std::scoped_lock lock(mutex_);
213  SandeshConnectionMap::iterator loc = find_if(connection_.begin(),
214  connection_.end(), boost::bind(&SandeshServer::Compare, this,
215  boost::ref(peer_addr), _1));
216  if (loc != connection_.end()) {
217  return loc->second;
218  }
219  return NULL;
220 }
221 
223  // Use the state machine task to run the session send queue since
224  // they need to be exclusive as session delete happens from state
225  // machine
226  SslSession *session = new SandeshSession(this, socket,
229  return session;
230 }
231 
233  std::scoped_lock lock(mutex_);
234  boost::asio::ip::tcp::endpoint endpoint = connection->endpoint();
235  connection_.erase(endpoint);
236 }
237 
239  std::scoped_lock lock(mutex_);
240  SandeshConnection *connection;
241  SandeshSession *ssession = dynamic_cast<SandeshSession *>(session);
242  assert(ssession);
243  ip::tcp::endpoint remote = session->remote_endpoint();
244  SandeshConnectionMap::iterator loc = connection_.find(remote);
245 
246  if (loc == connection_.end()) {
247  SANDESH_LOG(INFO, "Server: " << __func__ << " " << "Create Connection");
248  //create a connection_
249  connection = new SandeshServerConnection(this, remote,
250  ssession->GetSessionInstance(),
251  sm_task_id_);
252  connection->Initialize();
253  connection_.insert(remote, connection);
254  } else {
255  connection = loc->second;
256  if (connection->session() != NULL) {
257  return false;
258  }
259  }
260  connection->AcceptSession(ssession);
261  return true;
262 }
263 
265  SandeshSession *session, const Sandesh *sandesh) {
266  const SandeshCtrlClientToServer *snh =
267  dynamic_cast<const SandeshCtrlClientToServer *>(sandesh);
268  if (!snh) {
269  SANDESH_LOG(DEBUG, "Received Ctrl Message with wrong type " << sandesh->Name());
270  return false;
271  }
272  SANDESH_LOG(DEBUG, "Received Ctrl Message from " << snh->get_module_name());
273  std::vector<UVETypeInfo> vu;
274  SandeshCtrlServerToClient::Request(vu, true, "ctrl", session->connection());
275  return true;
276 }
277 
279  return deleter_.get();
280 }
281 
283  return lifetime_manager_.get();
284 }
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
void AcceptSession(SandeshSession *session)
SandeshSession * session() const
DeleteActor(SandeshServer *server)
virtual bool MayDelete() const
static const std::string kLifetimeMgrTask
int AllocConnectionIndex()
virtual TcpSession * CreateSession()
int session_writer_task_id() const
virtual void SessionShutdown()
boost::scoped_ptr< LifetimeManager > lifetime_manager_
SandeshServer(EventManager *evm, const SandeshConfig &config)
int lifetime_mgr_task_id()
virtual SslSession * AllocSession(SslSocket *socket)
virtual bool Initialize(short port, const std::string &ip="0.0.0.0")
SandeshConnectionMap connection_
static const std::string kStateMachineTask
LifetimeActor * deleter()
bool Compare(const Endpoint &peer_addr, const SandeshConnectionPair &) const
static bool task_policy_set_
SandeshConnection * FindConnection(const Endpoint &peer_addr)
virtual ~SandeshServer()
boost::dynamic_bitset conn_bmap_
void FreeConnectionIndex(int)
virtual bool ReceiveSandeshCtrlMsg(SandeshStateMachine *state_machine, SandeshSession *session, const Sandesh *sandesh)
LifetimeManager * lifetime_manager()
virtual bool AcceptSession(TcpSession *session)
boost::scoped_ptr< DeleteActor > deleter_
int session_reader_task_id() const
void RemoveConnection(SandeshConnection *connection)
int session_reader_task_id_
static const std::string kSessionReaderTask
boost::ptr_container_detail::ref_pair< boost::asio::ip::basic_endpoint< boost::asio::ip::tcp >, SandeshConnection *const > SandeshConnectionPair
std::mutex mutex_
int lifetime_mgr_task_id_
static const int kMaxInitRetries
SandeshConnection * connection()
virtual int GetSessionInstance() const
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
Definition: ssl_server.h:16
boost::asio::ssl::context * context()
Definition: ssl_server.cc:43
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:617
static TaskScheduler * GetInstance()
Definition: task.cc:554
void Shutdown()
Definition: tcp_server.cc:144
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_server.h:30
Endpoint LocalEndpoint() const
Definition: tcp_server.cc:308
virtual bool Initialize(unsigned short port)
Definition: tcp_server.cc:60
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:190
void ClearSessions()
Definition: tcp_server.cc:160
boost::asio::ip::tcp::socket Socket
Definition: tcp_server.h:31
virtual Socket * socket() const
Definition: tcp_session.h:82
Endpoint remote_endpoint() const
Definition: tcp_session.h:131
static EventManager evm
#define SANDESH_LOG(_Level, _Msg)
Definition: cpp/sandesh.h:476
unsigned int boolean
Definition: mcast_common.h:11
std::string server_keyfile
std::string server_certfile
std::string ca_cert
The class is used to specify a Task label for formulating a task exclusion list (an execution policy)...
Definition: task.h:246
std::vector< TaskExclusion > TaskPolicy
Defines a type to store an execution policy (a list of task exclusions).
Definition: task.h:270