5 #include <sandesh/request_pipeline.h>
8 #include <boost/foreach.hpp>
9 #include <boost/assign/list_of.hpp>
17 #include "ifmap/ifmap_server_show_types.h"
18 #include "ifmap/ifmap_log_types.h"
73 std::string
Description()
const {
return "ChannelEventProcTask"; }
85 virtual bool SendUpdate(
const std::string &msg);
103 : parent_(parent), hostname_(parent_->channel_->
ToString()) {
107 bool sent = parent_->channel_->Send(
108 reinterpret_cast<const uint8_t *>(msg.data()), msg.size(), &msg,
114 incr_bytes_sent(msg.size());
116 set_send_is_blocked(
true);
149 bool *valid_message) {
151 size_t tstart = iqn.find(
"virtual-router:");
153 vr_name = std::string(iqn, (
sizeof(
"virtual-router:") - 1));
154 *valid_message =
true;
156 *valid_message =
false;
164 bool *valid_message) {
166 size_t tstart = iqn.find(
"virtual-machine:");
168 vm_uuid = std::string(iqn, (
sizeof(
"virtual-machine:") - 1));
169 *valid_message =
true;
172 *valid_message =
false;
201 bool add_client =
true;
219 bool subscribe =
true;
242 bool subscribe =
false;
257 info.
name = identifier;
265 const std::string &vm_uuid) {
283 const char*
const vr_string =
"virtual-router:";
284 const char*
const vm_string =
"virtual-machine:";
285 if ((iq->
iq_type.compare(
"set") == 0) &&
286 (iq->
action.compare(
"subscribe") == 0)) {
287 if (iq->
node.compare(0, strlen(vr_string), vr_string) == 0) {
288 bool valid_message =
false;
294 }
else if (iq->
node.compare(0, strlen(vm_string), vm_string) == 0) {
295 bool valid_message =
false;
299 bool subscribe =
true;
308 if ((iq->
iq_type.compare(
"set") == 0) &&
309 (iq->
action.compare(
"unsubscribe") == 0)) {
310 if (iq->
node.compare(0, strlen(vm_string), vm_string) == 0) {
311 bool valid_message =
false;
315 bool subscribe =
false;
338 : xmpp_server_(xmpp_server), ifmap_server_(ifmap_server),
339 config_task_work_queue_(
366 ChannelMap::iterator loc =
376 BOOST_FOREACH(ChannelMap::value_type &i,
channel_map_) {
377 if (i.second->ToString() == tostring)
386 Create<IFMapXmppChannel>(channel,
390 channel_map_.insert(std::make_pair(channel, ifmap_chnl));
402 if (ifmap_chnl == NULL) {
423 std::string fq_name = ifmap_chnl->
FQName();
425 bool add_client =
false;
464 "Unregistering config peer for channel",
478 std::vector<IFMapXmppChannelMapEntry> *out_map) {
483 IFMapXmppChannelMapEntry entry;
484 entry.set_client_name(ifmap_chnl->
FQName());
488 out_map->push_back(entry);
494 int stage,
int instNum,
496 const IFMapXmppShowReq *request =
497 static_cast<const IFMapXmppShowReq *
>(ps.
snhRequest_.get());
503 IFMapXmppShowResp *response =
new IFMapXmppShowResp();
504 response->set_context(request->context());
505 response->set_more(
false);
507 if (!ifmap_channel_manager) {
508 response->Response();
512 IFMapChannelManagerInfo channel_manager_info;
513 IFMapChannelManagerStats channel_manager_stats;
515 channel_manager_stats.set_unknown_subscribe_messages(
517 channel_manager_stats.set_unknown_unsubscribe_messages(
519 channel_manager_stats.set_duplicate_channel_ready_messages(
521 channel_manager_stats.set_invalid_channel_not_ready_messages(
523 channel_manager_stats.set_invalid_channel_state_messages(
525 channel_manager_stats.set_invalid_vm_subscribe_messages(
527 channel_manager_stats.set_vmsub_novrsub_messages(
529 channel_manager_stats.set_vmunsub_novrsub_messages(
531 channel_manager_stats.set_vmunsub_novmsub_messages(
533 channel_manager_stats.set_duplicate_vrsub_messages(
535 channel_manager_stats.set_duplicate_vmsub_messages(
538 IFMapXmppChannelMapList channel_map_list;
539 std::vector<IFMapXmppChannelMapEntry> channel_map;
541 channel_map_list.set_channel_list(channel_map);
542 channel_map_list.set_channel_count(channel_map.size());
544 channel_manager_info.set_channel_manager_stats(channel_manager_stats);
545 channel_manager_info.set_channel_manager_map(channel_map_list);
547 response->set_channel_manager_info(channel_manager_info);
548 response->Response();
554 void IFMapXmppShowReq::HandleRequest()
const {
564 ps.stages_= boost::assign::list_of(s0)
565 .convert_to_container<std::vector<RequestPipeline::StageSpec> >();
void incr_unknown_subscribe_messages()
virtual const XmppConnection * connection() const =0
tbb::atomic< uint64_t > vmunsub_novmsub_messages
tbb::atomic< uint64_t > unknown_unsubscribe_messages
std::vector< int > instances_
virtual ~IFMapChannelManager()
IFMapSender * ifmap_client_
void incr_unknown_unsubscribe_messages()
virtual void SetName(const std::string &name)
virtual const std::string & identifier() const
void DeleteIFMapXmppChannel(IFMapXmppChannel *ifmap_chnl)
IFMapServer * ifmap_server()
virtual void ReceiveUpdate(const XmppStanza::XmppMessage *)
tbb::atomic< uint64_t > invalid_channel_not_ready_messages
ConfigClientManager * get_config_manager()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
tbb::atomic< uint64_t > duplicate_vmsub_messages
IFMapServer * ifmap_server_
static bool IFMapXmppShowReqHandleRequest(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
virtual void RegisterReceive(xmps::PeerId, ReceiveCb)=0
void incr_vmunsub_novmsub_messages()
#define IFMAP_XMPP_WARN(obj,...)
IFMapServer * ifmap_server_
tbb::atomic< uint64_t > vmunsub_novrsub_messages
void EnqueueVmSubUnsub(bool subscribe, const std::string &vm_uuid)
uint64_t get_unknown_subscribe_messages()
IFMapXmppChannel(XmppChannel *, IFMapServer *, IFMapChannelManager *)
void ProcessChannelNotReady(XmppChannel *channel)
uint64_t get_duplicate_vmsub_messages()
void incr_vmsub_novrsub_messages()
ChannelEventProcTask(const ChannelEventInfo &ev, IFMapChannelManager *mgr)
#define IFMAP_XMPP_DEBUG(obj,...)
IFMapChannelManager * ifmap_channel_manager_
ChannelEventInfo event_info_
static const std::string NoFqnSet
uint64_t msgs_sent() const
virtual bool SendUpdate(const std::string &msg)
virtual void SendActive(int index)
IFMapXmppChannel * parent()
void incr_invalid_channel_state_messages()
std::string Description() const
void ProcessVmUnsubscribe(const std::string &vm_uuid)
uint64_t get_vmunsub_novrsub_messages()
const std::string & channel_name()
tbb::atomic< uint64_t > duplicate_vrsub_messages
int GetTaskId(const std::string &name)
bool GetEndOfRibComputed() const
tbb::atomic< uint64_t > duplicate_channel_ready_messages
static string ToString(PhysicalDevice::ManagementProtocol proto)
void ProcessChannelReady(XmppChannel *channel)
tbb::atomic< uint64_t > invalid_channel_state_messages
void WriteReadyCb(const boost::system::error_code &ec)
void IFMapXmppChannelEventCb(XmppChannel *, xmps::PeerState)
bool ProcessChannelUnregister(ConfigTaskQueueEntry entry)
IFMapXmppChannel * FindChannel(XmppChannel *)
bool HasAddedVm(const std::string &vm_uuid)
IFMapSender(IFMapXmppChannel *parent)
IFMapUpdateSender * sender()
tbb::atomic< uint64_t > vmsub_novrsub_messages
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
void FillChannelMap(std::vector< IFMapXmppChannelMapEntry > *out_map)
void EnqueueVrSubscribe(const std::string &identifier)
void incr_duplicate_vmsub_messages()
uint64_t get_duplicate_channel_ready_messages()
tbb::atomic< uint64_t > invalid_vm_subscribe_messages
const std::string & hostname() const
void STLDeleteElements(Container *container)
uint64_t get_unknown_unsubscribe_messages()
bool MustProcessChannelNotReady()
uint64_t get_invalid_channel_state_messages()
virtual const std::string & ToString() const =0
uint64_t get_vmsub_novrsub_messages()
void AddVm(const std::string &vm_uuid)
virtual void UnRegisterReceive(xmps::PeerId)=0
XmppServer * xmpp_server_
void DeleteVm(const std::string &vm_uuid)
uint64_t get_invalid_vm_subscribe_messages()
uint64_t get_duplicate_vrsub_messages()
virtual ~IFMapXmppChannel()
IFMapChannelManager * get_ifmap_channel_manager()
void ProcessVmSubscribe(const std::string &vm_uuid)
void EnqueueChannelEvent(XCEvent event, XmppChannel *channel)
ChannelEventProcTask(const ChannelEventInfo &ev, IFMapXmppChannel *chnl)
tbb::atomic< uint64_t > unknown_subscribe_messages
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
void incr_invalid_channel_not_ready_messages()
std::string VrSubscribeGetVrName(const std::string &iqnode, bool *valid_message)
void ProcessVmSubscribe(std::string vr_name, std::string vm_uuid, bool subscribe, bool has_vms)
IFMapChannelManager(XmppServer *, IFMapServer *)
uint64_t get_invalid_channel_not_ready_messages()
virtual std::string ToString() const
void incr_duplicate_vrsub_messages()
boost::shared_ptr< const SandeshRequest > snhRequest_
void incr_invalid_vm_subscribe_messages()
tbb::mutex channel_map_mutex_
virtual void UnRegisterWriteReady(xmps::PeerId id)=0
IFMapChannelManager * ifmap_channel_manager_
virtual IFMapXmppChannel * CreateIFMapXmppChannel(XmppChannel *)
void set_send_is_blocked(bool is_blocked)
IFMapXmppChannel * parent_
const std::string & FQName() const
void EnqueueChannelUnregister(XmppChannel *channel)
void ProcessVrSubscribe(const std::string &identifier)
uint64_t msgs_sent() const
std::string VmSubscribeGetVmUuid(const std::string &iqnode, bool *valid_message)
void incr_vmunsub_novrsub_messages()
uint64_t get_vmunsub_novmsub_messages()
Task is a wrapper over tbb::task to support policies.
const std::string & ToUVEKey() const
void RegisterConnectionEvent(xmps::PeerId, ConnectionEventCb)
IFMapXmppChannel * ifmap_chnl_
WorkQueue< ConfigTaskQueueEntry > config_task_work_queue_
std::string channel_name_
bool ProcessClientWork(bool add, IFMapClient *client)
void SetIdentifier(const std::string &identifier)
void incr_duplicate_channel_ready_messages()