OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ovsdb_client_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <assert.h>
6 
7 extern "C" {
8 #include <ovsdb_wrapper.h>
9 };
10 
11 #include <cmn/agent.h>
12 #include <ovsdb_types.h>
14 #include <ovsdb_client_idl.h>
15 #include <ovsdb_client_session.h>
16 #include <ovsdb_route_peer.h>
17 
18 #include <cstddef>
19 
23 
24 int OvsdbClientSession::ovsdb_io_task_id_ = -1;
25 
26 OvsdbClientSession::OvsdbClientSession(Agent *agent, OvsPeerManager *manager) :
27  client_idl_(NULL), agent_(agent), manager_(manager), parser_(NULL),
28  connection_time_("-") {
29 
30  idl_inited_ = false;
31  // initialize ovsdb_io task id on first constructor.
32  if (ovsdb_io_task_id_ == -1) {
33  ovsdb_io_task_id_ = agent->task_scheduler()->GetTaskId("OVSDB::IO");
34  }
35 }
36 
38 }
39 
40 // This is invoked from OVSDB::IO task context. Handle the keepalive messages
41 // in The OVSDB::IO task context itself. OVSDB::IO should not have exclusion
42 // with any of the tasks
43 void OvsdbClientSession::MessageProcess(const u_int8_t *buf, std::size_t len) {
44  std::size_t used = 0;
45  // Multiple json message may be clubbed together, need to keep reading
46  // the buffer till whole message is consumed.
47  while (used != len) {
48  if (parser_ == NULL) {
50  }
51  const u_int8_t *pkt = buf + used;
52  std::size_t pkt_len = len - used;
53  std::size_t read;
54  read = ovsdb_wrapper_json_parser_feed(parser_, (const char *)pkt,
55  pkt_len);
56  used +=read;
57 
58  /* If we have complete JSON, attempt to parse it as JSON-RPC. */
60  struct json *json = ovsdb_wrapper_json_parser_finish(parser_);
61  parser_ = NULL;
62  struct jsonrpc_msg *msg = NULL;
63  char *error = ovsdb_wrapper_jsonrpc_msg_from_json(json, &msg);
64 
65  if (error) {
66  assert(msg == NULL);
67 
69  "Error parsing incoming message: " +
70  std::string(error));
71  free(error);
72  // trigger close due to message parse failure.
73  TriggerClose();
74 
75  // bail out, skip processing further.
76  return;
77  }
78 
79  if (ovsdb_wrapper_msg_echo_req(msg)) {
80  // Echo request from ovsdb-server, reply inline so that
81  // ovsdb-server knows that connection is still active
82  struct jsonrpc_msg *reply;
84  SendJsonRpc(reply);
85  }
86 
87  // If idl is inited and active, handover msg to IDL for processing
88  // we even enqueue processed echo req message to workqueue, to
89  // track session activity in IDL.
90  if (idl_inited_ == true && !client_idl_->deleted()) {
91  client_idl_->MessageProcess(msg);
92  continue;
93  }
94 
96  } else {
97  // enqueue a NULL message to idl (to track activity), so that
98  // if we keep on reading partial data for a very long time
99  // particularly during initial response for monitor request
100  // with scaled config/routes in OVSDB-server
101  if (idl_inited_ == true && !client_idl_->deleted()) {
102  client_idl_->MessageProcess(NULL);
103  }
104 
105  }
106  }
107 }
108 
109 void OvsdbClientSession::SendJsonRpc(struct jsonrpc_msg *msg) {
110  struct json *json_msg = ovsdb_wrapper_jsonrpc_msg_to_json(msg);
111  char *s = ovsdb_wrapper_json_to_string(json_msg, 0);
112  ovsdb_wrapper_json_destroy(json_msg);
113 
114  SendMsg((u_int8_t *)s, strlen(s));
115  // release the memory allocated by ovsdb_wrapper_json_to_string
116  free(s);
117 }
118 
121  OVSDB_SESSION_TRACE(Trace, this, "Connection to client established");
123  idl_inited_ = true;
124  client_idl_->OnEstablish();
125 }
126 
128  if (ovsdb_close_reason().value() == 0) {
129  OVSDB_SESSION_TRACE(Trace, this, "Connection to client Closed");
130  } else {
131  OVSDB_SESSION_TRACE(Trace, this, "Connection to client Closed due to "
132  + ovsdb_close_reason().message());
133  }
134  if (!idl_inited_) {
135  return;
136  }
137  client_idl_->TriggerDeletion();
138 }
139 
141  return client_idl_.get();
142 }
143 
144 void OvsdbClientSession::AddSessionInfo(SandeshOvsdbClientSession &session) {
145  session.set_status(status());
146  session.set_remote_ip(remote_ip().to_string());
147  session.set_remote_port(remote_port());
148  SandeshOvsdbTxnStats sandesh_stats;
149  if (client_idl_.get() != NULL) {
150  const OvsdbClientIdl::TxnStats &stats = client_idl_->stats();
151  sandesh_stats.set_txn_initiated(stats.txn_initiated);
152  sandesh_stats.set_txn_succeeded(stats.txn_succeeded);
153  sandesh_stats.set_txn_failed(stats.txn_failed);
154  sandesh_stats.set_txn_pending(client_idl_->pending_txn_count());
155  sandesh_stats.set_pending_send_msg(
156  client_idl_->pending_send_msg_count());
157  } else {
158  sandesh_stats.set_txn_initiated(0);
159  sandesh_stats.set_txn_succeeded(0);
160  sandesh_stats.set_txn_failed(0);
161  sandesh_stats.set_txn_pending(0);
162  sandesh_stats.set_pending_send_msg(0);
163  }
164  session.set_connection_time(connection_time_);
165  session.set_txn_stats(sandesh_stats);
166 }
167 
bool ovsdb_wrapper_json_parser_is_done(const struct json_parser *)
struct json * ovsdb_wrapper_json_parser_finish(struct json_parser *)
virtual std::string status()=0
void SendJsonRpc(struct jsonrpc_msg *msg)
virtual uint16_t remote_port() const =0
char * ovsdb_wrapper_jsonrpc_msg_from_json(struct json *, struct jsonrpc_msg **)
virtual Ip4Address remote_ip() const =0
struct jsonrpc_msg * ovsdb_wrapper_jsonrpc_create_reply(struct jsonrpc_msg *msg)
int GetTaskId(const std::string &name)
Definition: task.cc:856
bool ovsdb_wrapper_msg_echo_req(struct jsonrpc_msg *msg)
virtual void TriggerClose()=0
struct json * ovsdb_wrapper_jsonrpc_msg_to_json(struct jsonrpc_msg *)
tbb::atomic< bool > idl_inited_
virtual void SendMsg(u_int8_t *buf, std::size_t len)=0
TaskScheduler * task_scheduler() const
Definition: agent.h:1120
Definition: agent.h:358
void AddSessionInfo(SandeshOvsdbClientSession &session)
Definition: trace.h:220
virtual const boost::system::error_code & ovsdb_close_reason() const =0
void ovsdb_wrapper_jsonrpc_msg_destroy(struct jsonrpc_msg *msg)
char * ovsdb_wrapper_json_to_string(const struct json *, int)
#define OVSDB_SESSION_TRACE(obj, session,...)
struct json_parser * parser_
struct json_parser * ovsdb_wrapper_json_parser_create(int)
void ovsdb_wrapper_json_destroy(struct json *)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
void MessageProcess(const u_int8_t *buf, std::size_t len)
size_t ovsdb_wrapper_json_parser_feed(struct json_parser *, const char *, size_t)
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54