OpenSDN source code
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ifmap_xmpp.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <sandesh/request_pipeline.h>
6 
7 #include "ifmap/ifmap_xmpp.h"
8 #include <boost/foreach.hpp>
9 #include <boost/assign/list_of.hpp>
10 
11 #include "base/util.h"
12 #include "base/logging.h"
13 
14 #include "ifmap/ifmap_factory.h"
15 #include "ifmap/ifmap_log.h"
17 #include "ifmap/ifmap_server_show_types.h"
18 #include "ifmap/ifmap_log_types.h"
20 
21 #include "xmpp/xmpp_connection.h"
22 #include "xmpp/xmpp_server.h"
23 
24 const std::string IFMapXmppChannel::NoFqnSet = "NoFqnSet";
25 
26 // There are 3 task interactions:
27 // "xmpp::StateMachine" gives all the channel triggers.
28 // "db::IFMapTable" does all the work related to those triggers - except Ready
29 // "bgp::Config" does the final channel-unregister
30 //
31 // "xmpp::StateMachine" task gives us 5 triggers:
32 // 1. Ready/NotReady indicating the channel create/delete
33 // 2. VR-subscribe indicating the existence of the agent
34 // 3. VM-sub/unsub indicating the create/delete of a virtual-machine
35 // Process all the triggers in the context of the db::IFMapTable task - except
36 // the 'ready' trigger that is processed right away.
37 // #1 must be processed via the IFMapChannelManager.
38 // #2/#3 must be processed via the IFMapXmppChannel since they are
39 // channel-specific
40 //
41 // "bgp::Config":
42 // The NotReady trigger will cleanup the channel related resources and then
43 // Unregister via ProcessChannelUnregister() in the context of bgp::Config.
44 class ChannelEventProcTask : public Task {
45 public:
46  // To be used for #1
49  : Task(TaskScheduler::GetInstance()->GetTaskId("db::IFMapTable"), 0),
51  }
52 
53  // To be used for #2/#3
55  IFMapXmppChannel *chnl)
56  : Task(TaskScheduler::GetInstance()->GetTaskId("db::IFMapTable"), 0),
57  event_info_(ev), ifmap_chnl_(chnl) {
58  }
59 
60  virtual bool Run() {
63  } else if (event_info_.event == XCE_VR_SUBSCRIBE) {
65  } else if (event_info_.event == XCE_VM_SUBSCRIBE) {
67  } else if (event_info_.event == XCE_VM_UNSUBSCRIBE) {
69  }
70 
71  return true;
72  }
73  std::string Description() const { return "ChannelEventProcTask"; }
74 
75 private:
79 };
80 
82 public:
84 
85  virtual bool SendUpdate(const std::string &msg);
86 
87  virtual std::string ToString() const { return identifier_; }
88  virtual const std::string &identifier() const { return identifier_; }
89  const std::string &hostname() const { return hostname_; }
91 
92  void SetIdentifier(const std::string &identifier) {
94  }
95 
96 private:
98  std::string hostname_; // hostname
99  std::string identifier_; // FQN
100 };
101 
103  : parent_(parent), hostname_(parent_->channel_->ToString()) {
104 }
105 
106 bool IFMapXmppChannel::IFMapSender::SendUpdate(const std::string &msg) {
107  bool sent = parent_->channel_->Send(
108  reinterpret_cast<const uint8_t *>(msg.data()), msg.size(), &msg,
109  xmps::CONFIG,
110  boost::bind(&IFMapXmppChannel::WriteReadyCb, parent_, _1));
111 
112  if (sent) {
113  incr_msgs_sent();
114  incr_bytes_sent(msg.size());
115  } else {
116  set_send_is_blocked(true);
117  incr_msgs_blocked();
118  }
119  return sent;
120 }
121 
122 void IFMapXmppChannel::WriteReadyCb(const boost::system::error_code &ec) {
125 }
126 
128  IFMapChannelManager *ifmap_channel_manager)
129  : peer_id_(xmps::CONFIG), channel_(channel), ifmap_server_(server),
130  ifmap_channel_manager_(ifmap_channel_manager),
131  ifmap_client_(new IFMapSender(this)), client_added_(false),
132  channel_name_(channel->connection()->ToUVEKey()) {
133 
134  ifmap_client_->SetName(channel->connection()->ToUVEKey());
136  boost::bind(&IFMapXmppChannel::ReceiveUpdate,
137  this, _1));
138 }
139 
141  delete ifmap_client_;
142  // Enqueue Unregister and process in the context of bgp::Config task
145 }
146 
147 // iqn should be [virtual-router:FQN-of-vr]
148 std::string IFMapXmppChannel::VrSubscribeGetVrName(const std::string &iqn,
149  bool *valid_message) {
150  std::string vr_name;
151  size_t tstart = iqn.find("virtual-router:");
152  if (tstart == 0) {
153  vr_name = std::string(iqn, (sizeof("virtual-router:") - 1));
154  *valid_message = true;
155  } else {
156  *valid_message = false;
157  }
158 
159  return vr_name;
160 }
161 
162 // iqn should be [virtual-machine:UUID-of-vm]
163 std::string IFMapXmppChannel::VmSubscribeGetVmUuid(const std::string &iqn,
164  bool *valid_message) {
165  std::string vm_uuid;
166  size_t tstart = iqn.find("virtual-machine:");
167  if (tstart == 0) {
168  vm_uuid = std::string(iqn, (sizeof("virtual-machine:") - 1));
169  *valid_message = true;
170  } else {
172  *valid_message = false;
173  }
174 
175  return vm_uuid;
176 }
177 
178 // If add-client was sent to ifmap_server, we must send delete-client too and
179 // vice-versa
181  return client_added_;
182 }
183 
184 const std::string& IFMapXmppChannel::FQName() const {
185  if (ifmap_client_) {
186  return ifmap_client_->identifier();
187  } else {
189  }
190 }
191 
192 void IFMapXmppChannel::ProcessVrSubscribe(const std::string &identifier) {
193  // If we have already received a vr-subscribe on this channel...
194  if (client_added_) {
196  IFMAP_XMPP_WARN(IFMapDuplicateVrSub, channel_name(), FQName());
197  return;
198  }
199 
200  ifmap_client_->SetIdentifier(identifier);
201  bool add_client = true;
203  client_added_ = true;
204  IFMAP_XMPP_DEBUG(IFMapXmppVrSubUnsub, "VrSubscribe", channel_name(),
205  FQName());
206 }
207 
208 void IFMapXmppChannel::ProcessVmSubscribe(const std::string &vm_uuid) {
209  if (!client_added_) {
210  // If we didnt receive the vr-subscribe for this vm...
212  IFMAP_XMPP_WARN(IFMapNoVrSub, "VmSubscribe", channel_name(), FQName(),
213  vm_uuid);
214  return;
215  }
216 
217  if (!ifmap_client_->HasAddedVm(vm_uuid)) {
218  ifmap_client_->AddVm(vm_uuid);
219  bool subscribe = true;
221  subscribe, ifmap_client_->HasVms());
222  IFMAP_XMPP_DEBUG(IFMapXmppVmSubUnsub, "VmSubscribe", channel_name(),
223  FQName(), vm_uuid);
224  } else {
225  // If we have already received a subscribe for this vm
227  IFMAP_XMPP_WARN(IFMapDuplicateVmSub, channel_name(), FQName(), vm_uuid);
228  }
229 }
230 
231 void IFMapXmppChannel::ProcessVmUnsubscribe(const std::string &vm_uuid) {
232  // If we didnt receive the vr-sub for this vm, ignore the request
233  if (!client_added_) {
235  IFMAP_XMPP_WARN(IFMapNoVrSub, "VmUnsubscribe", channel_name(), FQName(),
236  vm_uuid);
237  return;
238  }
239 
240  if (ifmap_client_->HasAddedVm(vm_uuid)) {
241  ifmap_client_->DeleteVm(vm_uuid);
242  bool subscribe = false;
244  subscribe, ifmap_client_->HasVms());
245  IFMAP_XMPP_DEBUG(IFMapXmppVmSubUnsub, "VmUnsubscribe", channel_name(),
246  FQName(), vm_uuid);
247  } else {
248  // If we didnt receive the subscribe for this vm, ignore the unsubscribe
250  IFMAP_XMPP_WARN(IFMapNoVmSub, channel_name(), FQName(), vm_uuid);
251  }
252 }
253 
254 void IFMapXmppChannel::EnqueueVrSubscribe(const std::string &identifier) {
255  ChannelEventInfo info;
256  info.event = XCE_VR_SUBSCRIBE;
257  info.name = identifier;
258 
259  ChannelEventProcTask *proc_task = new ChannelEventProcTask(info, this);
261  scheduler->Enqueue(proc_task);
262 }
263 
265  const std::string &vm_uuid) {
266  ChannelEventInfo info;
267  info.event = (subscribe == true) ? XCE_VM_SUBSCRIBE : XCE_VM_UNSUBSCRIBE;
268  info.name = vm_uuid;
269 
270  ChannelEventProcTask *proc_task = new ChannelEventProcTask(info, this);
272  scheduler->Enqueue(proc_task);
273 }
274 
275 // This runs in the context of the "xmpp::StateMachine" and queues all requests
276 // which are then processed in the context of "db::IFMapTable"
278 
279  if (msg->type == XmppStanza::IQ_STANZA) {
280  const XmppStanza::XmppMessageIq *iq =
281  static_cast<const XmppStanza::XmppMessageIq *>(msg);
282 
283  const char* const vr_string = "virtual-router:";
284  const char* const vm_string = "virtual-machine:";
285  if ((iq->iq_type.compare("set") == 0) &&
286  (iq->action.compare("subscribe") == 0)) {
287  if (iq->node.compare(0, strlen(vr_string), vr_string) == 0) {
288  bool valid_message = false;
289  std::string vr_name = VrSubscribeGetVrName(iq->node,
290  &valid_message);
291  if (valid_message) {
292  EnqueueVrSubscribe(vr_name);
293  }
294  } else if (iq->node.compare(0, strlen(vm_string), vm_string) == 0) {
295  bool valid_message = false;
296  std::string vm_uuid = VmSubscribeGetVmUuid(iq->node,
297  &valid_message);
298  if (valid_message) {
299  bool subscribe = true;
300  EnqueueVmSubUnsub(subscribe, vm_uuid);
301  }
302  } else {
304  IFMAP_XMPP_WARN(IFMapXmppUnknownMessage, iq->iq_type,
305  iq->action, iq->node, channel_name());
306  }
307  }
308  if ((iq->iq_type.compare("set") == 0) &&
309  (iq->action.compare("unsubscribe") == 0)) {
310  if (iq->node.compare(0, strlen(vm_string), vm_string) == 0) {
311  bool valid_message = false;
312  std::string vm_uuid = VmSubscribeGetVmUuid(iq->node,
313  &valid_message);
314  if (valid_message) {
315  bool subscribe = false;
316  EnqueueVmSubUnsub(subscribe, vm_uuid);
317  }
318  } else {
320  IFMAP_XMPP_WARN(IFMapXmppUnknownMessage, iq->iq_type,
321  iq->action, iq->node, channel_name());
322  }
323  }
324  }
325 }
326 
328  return ifmap_client_;
329 }
330 
331 uint64_t IFMapXmppChannel::msgs_sent() const {
332  return ifmap_client_->msgs_sent();
333 }
334 
335 // IFMapClient Manager routines
337  IFMapServer *ifmap_server)
338  : xmpp_server_(xmpp_server), ifmap_server_(ifmap_server),
339  config_task_work_queue_(
340  TaskScheduler::GetInstance()->GetTaskId("bgp::Config"),
341  0, boost::bind(&IFMapChannelManager::ProcessChannelUnregister,
342  this, _1)) {
355  boost::bind(&IFMapChannelManager::IFMapXmppChannelEventCb, this, _1,
356  _2));
357 }
358 
360  tbb::mutex::scoped_lock lock(channel_map_mutex_);
362 }
363 
365  tbb::mutex::scoped_lock lock(channel_map_mutex_);
366  ChannelMap::iterator loc =
367  channel_map_.find(const_cast<XmppChannel *>(channel));
368  if (loc != channel_map_.end()) {
369  return loc->second;
370  }
371  return NULL;
372 }
373 
375  tbb::mutex::scoped_lock lock(channel_map_mutex_);
376  BOOST_FOREACH(ChannelMap::value_type &i, channel_map_) {
377  if (i.second->ToString() == tostring)
378  return i.second;
379  }
380  return NULL;
381 }
382 
384  XmppChannel *channel) {
385  IFMapXmppChannel *ifmap_chnl = IfmapStaticObjectFactory::
386  Create<IFMapXmppChannel>(channel,
388  this);
389  tbb::mutex::scoped_lock lock(channel_map_mutex_);
390  channel_map_.insert(std::make_pair(channel, ifmap_chnl));
391  return ifmap_chnl;
392 }
393 
395  tbb::mutex::scoped_lock lock(channel_map_mutex_);
396  channel_map_.erase(ifmap_chnl->channel());
397  delete ifmap_chnl;
398 }
399 
401  IFMapXmppChannel *ifmap_chnl = FindChannel(channel);
402  if (ifmap_chnl == NULL) {
403  IFMAP_XMPP_DEBUG(IFMapXmppChannelEvent, "Create",
405  CreateIFMapXmppChannel(channel);
407  if (client_manager && !client_manager->GetEndOfRibComputed()) {
408  IFMAP_XMPP_DEBUG(IFMapXmppChannelEvent, "Close",
409  channel->connection()->ToUVEKey(),
411  channel->Close();
412  }
413  } else {
415  }
416 }
417 
419  IFMapXmppChannel *ifmap_chnl = FindChannel(channel);
420  if (ifmap_chnl) {
421  // If we have received subscriptions and ifmap_server knows about the
422  // client for this channel, ask ifmap_server to cleanup.
423  std::string fq_name = ifmap_chnl->FQName();
424  if (ifmap_chnl->MustProcessChannelNotReady()) {
425  bool add_client = false;
426  ifmap_server_->ProcessClientWork(add_client, ifmap_chnl->Sender());
427  }
428  IFMAP_XMPP_DEBUG(IFMapXmppChannelEvent, "Destroy",
429  channel->connection()->ToUVEKey(), fq_name);
430  DeleteIFMapXmppChannel(ifmap_chnl);
431  } else {
433  }
434 }
435 
437  XmppChannel *channel) {
438  ChannelEventInfo info;
439  info.event = event;
440  info.channel = channel;
441 
442  ChannelEventProcTask *proc_task = new ChannelEventProcTask(info, this);
444  scheduler->Enqueue(proc_task);
445 }
446 
447 // This runs in the context of the "xmpp::StateMachine"
449  xmps::PeerState state) {
450  if (state == xmps::READY) {
451  // Process the READY right away
452  ProcessChannelReady(channel);
453  } else if (state == xmps::NOT_READY) {
454  // Queue the NOT_READY to be processed via the "db::IFMapTable" task
456  } else {
458  }
459 }
460 
461 // This runs in the context of bgp::Config
463  IFMAP_XMPP_DEBUG(IFMapChannelUnregisterMessage,
464  "Unregistering config peer for channel",
465  entry.channel->connection()->ToUVEKey());
467  return true;
468 }
469 
471  // Let ProcessChannelUnregister() unregister in the context of bgp::Config
472  ConfigTaskQueueEntry entry;
473  entry.channel = channel;
474  config_task_work_queue_.Enqueue(entry);
475 }
476 
478  std::vector<IFMapXmppChannelMapEntry> *out_map) {
479  tbb::mutex::scoped_lock lock(channel_map_mutex_);
480  for (ChannelMap::iterator iter = channel_map_.begin();
481  iter != channel_map_.end(); ++iter) {
482  IFMapXmppChannel *ifmap_chnl = iter->second;
483  IFMapXmppChannelMapEntry entry;
484  entry.set_client_name(ifmap_chnl->FQName());
485  entry.set_host_name(ifmap_chnl->channel()->ToString());
486  entry.set_channel_name(ifmap_chnl->channel_name());
487  entry.set_client_added(ifmap_chnl->get_client_added());
488  out_map->push_back(entry);
489  }
490 }
491 
493  const RequestPipeline::PipeSpec ps,
494  int stage, int instNum,
496  const IFMapXmppShowReq *request =
497  static_cast<const IFMapXmppShowReq *>(ps.snhRequest_.get());
498  IFMapSandeshContext *sctx =
499  static_cast<IFMapSandeshContext *>(request->module_context("IFMap"));
500  IFMapChannelManager *ifmap_channel_manager =
502 
503  IFMapXmppShowResp *response = new IFMapXmppShowResp();
504  response->set_context(request->context());
505  response->set_more(false);
506 
507  if (!ifmap_channel_manager) {
508  response->Response();
509  return true;
510  }
511 
512  IFMapChannelManagerInfo channel_manager_info;
513  IFMapChannelManagerStats channel_manager_stats;
514 
515  channel_manager_stats.set_unknown_subscribe_messages(
516  ifmap_channel_manager->get_unknown_subscribe_messages());
517  channel_manager_stats.set_unknown_unsubscribe_messages(
518  ifmap_channel_manager->get_unknown_unsubscribe_messages());
519  channel_manager_stats.set_duplicate_channel_ready_messages(
520  ifmap_channel_manager->get_duplicate_channel_ready_messages());
521  channel_manager_stats.set_invalid_channel_not_ready_messages(
522  ifmap_channel_manager->get_invalid_channel_not_ready_messages());
523  channel_manager_stats.set_invalid_channel_state_messages(
524  ifmap_channel_manager->get_invalid_channel_state_messages());
525  channel_manager_stats.set_invalid_vm_subscribe_messages(
526  ifmap_channel_manager->get_invalid_vm_subscribe_messages());
527  channel_manager_stats.set_vmsub_novrsub_messages(
528  ifmap_channel_manager->get_vmsub_novrsub_messages());
529  channel_manager_stats.set_vmunsub_novrsub_messages(
530  ifmap_channel_manager->get_vmunsub_novrsub_messages());
531  channel_manager_stats.set_vmunsub_novmsub_messages(
532  ifmap_channel_manager->get_vmunsub_novmsub_messages());
533  channel_manager_stats.set_duplicate_vrsub_messages(
534  ifmap_channel_manager->get_duplicate_vrsub_messages());
535  channel_manager_stats.set_duplicate_vmsub_messages(
536  ifmap_channel_manager->get_duplicate_vmsub_messages());
537 
538  IFMapXmppChannelMapList channel_map_list;
539  std::vector<IFMapXmppChannelMapEntry> channel_map;
540  ifmap_channel_manager->FillChannelMap(&channel_map);
541  channel_map_list.set_channel_list(channel_map);
542  channel_map_list.set_channel_count(channel_map.size());
543 
544  channel_manager_info.set_channel_manager_stats(channel_manager_stats);
545  channel_manager_info.set_channel_manager_map(channel_map_list);
546 
547  response->set_channel_manager_info(channel_manager_info);
548  response->Response();
549 
550  // Return 'true' so that we are not called again
551  return true;
552 }
553 
554 void IFMapXmppShowReq::HandleRequest() const {
555 
558 
559  s0.taskId_ = scheduler->GetTaskId("db::IFMapTable");
561  s0.instances_.push_back(0);
562 
563  RequestPipeline::PipeSpec ps(this);
564  ps.stages_= boost::assign::list_of(s0)
565  .convert_to_container<std::vector<RequestPipeline::StageSpec> >();
566  RequestPipeline rp(ps);
567 }
568 
void incr_unknown_subscribe_messages()
Definition: ifmap_xmpp.h:96
virtual const XmppConnection * connection() const =0
tbb::atomic< uint64_t > vmunsub_novmsub_messages
Definition: ifmap_xmpp.h:188
tbb::atomic< uint64_t > unknown_unsubscribe_messages
Definition: ifmap_xmpp.h:181
std::vector< int > instances_
virtual ~IFMapChannelManager()
Definition: ifmap_xmpp.cc:359
IFMapSender * ifmap_client_
Definition: ifmap_xmpp.h:78
void incr_unknown_unsubscribe_messages()
Definition: ifmap_xmpp.h:97
virtual void SetName(const std::string &name)
Definition: ifmap_client.h:29
virtual const std::string & identifier() const
Definition: ifmap_xmpp.cc:88
XmppMessageType type
Definition: xmpp_proto.h:57
void DeleteIFMapXmppChannel(IFMapXmppChannel *ifmap_chnl)
Definition: ifmap_xmpp.cc:394
IFMapServer * ifmap_server()
virtual void ReceiveUpdate(const XmppStanza::XmppMessage *)
Definition: ifmap_xmpp.cc:277
tbb::atomic< uint64_t > invalid_channel_not_ready_messages
Definition: ifmap_xmpp.h:183
ConfigClientManager * get_config_manager()
Definition: ifmap_server.h:92
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
tbb::atomic< uint64_t > duplicate_vmsub_messages
Definition: ifmap_xmpp.h:190
xmps::PeerId peer_id_
Definition: ifmap_xmpp.h:74
IFMapServer * ifmap_server_
Definition: ifmap_xmpp.h:170
static bool IFMapXmppShowReqHandleRequest(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
Definition: ifmap_xmpp.cc:492
virtual void RegisterReceive(xmps::PeerId, ReceiveCb)=0
int index() const
Definition: ifmap_client.h:31
void incr_vmunsub_novmsub_messages()
Definition: ifmap_xmpp.h:116
#define IFMAP_XMPP_WARN(obj,...)
Definition: ifmap_log.h:78
IFMapServer * ifmap_server_
Definition: ifmap_xmpp.h:76
bool get_client_added()
Definition: ifmap_xmpp.h:66
tbb::atomic< uint64_t > vmunsub_novrsub_messages
Definition: ifmap_xmpp.h:187
void EnqueueVmSubUnsub(bool subscribe, const std::string &vm_uuid)
Definition: ifmap_xmpp.cc:264
uint64_t get_unknown_subscribe_messages()
Definition: ifmap_xmpp.h:126
IFMapXmppChannel(XmppChannel *, IFMapServer *, IFMapChannelManager *)
Definition: ifmap_xmpp.cc:127
void ProcessChannelNotReady(XmppChannel *channel)
Definition: ifmap_xmpp.cc:418
uint64_t get_duplicate_vmsub_messages()
Definition: ifmap_xmpp.h:156
void incr_vmsub_novrsub_messages()
Definition: ifmap_xmpp.h:110
ChannelEventProcTask(const ChannelEventInfo &ev, IFMapChannelManager *mgr)
Definition: ifmap_xmpp.cc:47
#define IFMAP_XMPP_DEBUG(obj,...)
Definition: ifmap_log.h:51
IFMapChannelManager * ifmap_channel_manager_
Definition: ifmap_xmpp.h:77
ChannelEventInfo event_info_
Definition: ifmap_xmpp.cc:76
static const std::string NoFqnSet
Definition: ifmap_xmpp.h:41
uint64_t msgs_sent() const
Definition: ifmap_client.h:32
virtual bool SendUpdate(const std::string &msg)
Definition: ifmap_xmpp.cc:106
IFMapClient * Sender()
Definition: ifmap_xmpp.cc:327
virtual void SendActive(int index)
IFMapXmppChannel * parent()
Definition: ifmap_xmpp.cc:90
void incr_invalid_channel_state_messages()
Definition: ifmap_xmpp.h:104
std::string Description() const
Definition: ifmap_xmpp.cc:73
virtual void Close()=0
void ProcessVmUnsubscribe(const std::string &vm_uuid)
Definition: ifmap_xmpp.cc:231
uint64_t get_vmunsub_novrsub_messages()
Definition: ifmap_xmpp.h:147
const std::string & channel_name()
Definition: ifmap_xmpp.h:67
tbb::atomic< uint64_t > duplicate_vrsub_messages
Definition: ifmap_xmpp.h:189
int GetTaskId(const std::string &name)
Definition: task.cc:856
tbb::atomic< uint64_t > duplicate_channel_ready_messages
Definition: ifmap_xmpp.h:182
XCEvent
Definition: ifmap_xmpp.h:26
static string ToString(PhysicalDevice::ManagementProtocol proto)
void ProcessChannelReady(XmppChannel *channel)
Definition: ifmap_xmpp.cc:400
tbb::atomic< uint64_t > invalid_channel_state_messages
Definition: ifmap_xmpp.h:184
void WriteReadyCb(const boost::system::error_code &ec)
Definition: ifmap_xmpp.cc:122
void IFMapXmppChannelEventCb(XmppChannel *, xmps::PeerState)
Definition: ifmap_xmpp.cc:448
bool ProcessChannelUnregister(ConfigTaskQueueEntry entry)
Definition: ifmap_xmpp.cc:462
IFMapXmppChannel * FindChannel(XmppChannel *)
Definition: ifmap_xmpp.cc:364
bool HasAddedVm(const std::string &vm_uuid)
Definition: ifmap_client.h:65
IFMapSender(IFMapXmppChannel *parent)
Definition: ifmap_xmpp.cc:102
IFMapUpdateSender * sender()
Definition: ifmap_server.h:85
tbb::atomic< uint64_t > vmsub_novrsub_messages
Definition: ifmap_xmpp.h:186
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
void FillChannelMap(std::vector< IFMapXmppChannelMapEntry > *out_map)
Definition: ifmap_xmpp.cc:477
void EnqueueVrSubscribe(const std::string &identifier)
Definition: ifmap_xmpp.cc:254
void incr_duplicate_vmsub_messages()
Definition: ifmap_xmpp.h:122
uint64_t get_duplicate_channel_ready_messages()
Definition: ifmap_xmpp.h:132
tbb::atomic< uint64_t > invalid_vm_subscribe_messages
Definition: ifmap_xmpp.h:185
const std::string & hostname() const
Definition: ifmap_xmpp.cc:89
void STLDeleteElements(Container *container)
Definition: util.h:114
uint64_t get_unknown_unsubscribe_messages()
Definition: ifmap_xmpp.h:129
bool MustProcessChannelNotReady()
Definition: ifmap_xmpp.cc:180
uint64_t get_invalid_channel_state_messages()
Definition: ifmap_xmpp.h:138
virtual const std::string & ToString() const =0
uint64_t get_vmsub_novrsub_messages()
Definition: ifmap_xmpp.h:144
std::string name
Definition: ifmap_xmpp.h:36
void AddVm(const std::string &vm_uuid)
Definition: ifmap_client.h:59
virtual void UnRegisterReceive(xmps::PeerId)=0
XmppServer * xmpp_server_
Definition: ifmap_xmpp.h:169
void DeleteVm(const std::string &vm_uuid)
Definition: ifmap_client.h:62
bool HasVms()
Definition: ifmap_client.h:68
uint64_t get_invalid_vm_subscribe_messages()
Definition: ifmap_xmpp.h:141
uint64_t get_duplicate_vrsub_messages()
Definition: ifmap_xmpp.h:153
virtual ~IFMapXmppChannel()
Definition: ifmap_xmpp.cc:140
IFMapChannelManager * get_ifmap_channel_manager()
Definition: ifmap_server.h:97
void ProcessVmSubscribe(const std::string &vm_uuid)
Definition: ifmap_xmpp.cc:208
void EnqueueChannelEvent(XCEvent event, XmppChannel *channel)
Definition: ifmap_xmpp.cc:436
ChannelEventProcTask(const ChannelEventInfo &ev, IFMapXmppChannel *chnl)
Definition: ifmap_xmpp.cc:54
tbb::atomic< uint64_t > unknown_subscribe_messages
Definition: ifmap_xmpp.h:180
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: ifmap_xmpp.cc:60
void incr_invalid_channel_not_ready_messages()
Definition: ifmap_xmpp.h:101
std::string VrSubscribeGetVrName(const std::string &iqnode, bool *valid_message)
Definition: ifmap_xmpp.cc:148
int GetTaskId() const
Definition: task.h:118
void ProcessVmSubscribe(std::string vr_name, std::string vm_uuid, bool subscribe, bool has_vms)
IFMapChannelManager(XmppServer *, IFMapServer *)
Definition: ifmap_xmpp.cc:336
uint64_t get_invalid_channel_not_ready_messages()
Definition: ifmap_xmpp.h:135
virtual std::string ToString() const
Definition: ifmap_xmpp.cc:87
void incr_duplicate_vrsub_messages()
Definition: ifmap_xmpp.h:119
boost::shared_ptr< const SandeshRequest > snhRequest_
void incr_invalid_vm_subscribe_messages()
Definition: ifmap_xmpp.h:107
tbb::mutex channel_map_mutex_
Definition: ifmap_xmpp.h:172
virtual void UnRegisterWriteReady(xmps::PeerId id)=0
IFMapChannelManager * ifmap_channel_manager_
Definition: ifmap_xmpp.cc:77
ChannelMap channel_map_
Definition: ifmap_xmpp.h:171
virtual IFMapXmppChannel * CreateIFMapXmppChannel(XmppChannel *)
Definition: ifmap_xmpp.cc:383
void set_send_is_blocked(bool is_blocked)
Definition: ifmap_client.h:49
IFMapXmppChannel * parent_
Definition: ifmap_xmpp.cc:97
const std::string & FQName() const
Definition: ifmap_xmpp.cc:184
XmppChannel * channel
Definition: ifmap_xmpp.h:35
void EnqueueChannelUnregister(XmppChannel *channel)
Definition: ifmap_xmpp.cc:470
void ProcessVrSubscribe(const std::string &identifier)
Definition: ifmap_xmpp.cc:192
uint64_t msgs_sent() const
Definition: ifmap_xmpp.cc:331
std::string VmSubscribeGetVmUuid(const std::string &iqnode, bool *valid_message)
Definition: ifmap_xmpp.cc:163
void incr_vmunsub_novrsub_messages()
Definition: ifmap_xmpp.h:113
uint64_t get_vmunsub_novmsub_messages()
Definition: ifmap_xmpp.h:150
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
XmppChannel * channel()
Definition: ifmap_xmpp.h:50
XmppChannel * channel_
Definition: ifmap_xmpp.h:75
const std::string & ToUVEKey() const
void RegisterConnectionEvent(xmps::PeerId, ConnectionEventCb)
Definition: xmpp_server.cc:452
IFMapXmppChannel * ifmap_chnl_
Definition: ifmap_xmpp.cc:78
WorkQueue< ConfigTaskQueueEntry > config_task_work_queue_
Definition: ifmap_xmpp.h:173
std::string channel_name_
Definition: ifmap_xmpp.h:80
bool ProcessClientWork(bool add, IFMapClient *client)
void SetIdentifier(const std::string &identifier)
Definition: ifmap_xmpp.cc:92
void incr_duplicate_channel_ready_messages()
Definition: ifmap_xmpp.h:98