11 #include "ifmap/ifmap_log_types.h"
19 : server_(server), queue_(queue), message_(new
IFMapMessage()),
20 task_scheduled_(false), queue_active_(false) {
56 std::string
Description()
const {
return "IFMapUpdateSender::SendTask"; }
76 tbb::mutex::scoped_lock lock(
mutex_);
81 tbb::mutex::scoped_lock lock(
mutex_);
87 tbb::mutex::scoped_lock lock(
mutex_);
94 tbb::mutex::scoped_lock lock(
mutex_);
112 if (blocked_clients == marker->
mask) {
119 if (!blocked_clients.
empty()) {
132 if (curr->IsMarker()) {
151 base_send_set.
clear();
159 if (send_set.
empty()) {
163 if (base_send_set.
empty()) {
164 base_send_set = send_set;
175 if (!blocked_set.
empty()) {
177 if (blocked_set == marker->
mask) {
179 "blocked before", curr->ToString());
189 curr->ToString(),
"and split from", marker->
ToString());
191 send_set.
Reset(blocked_set);
195 base_send_set = send_set;
212 if (marker != last) {
222 "before", last->
ToString(),
"with blocked_set",
230 const BitSet &base_send_set) {
295 if (blocked_set.
empty() || ready_set.
empty()) {
299 assert(next_marker->
mask == total_set);
309 assert(next_marker->
mask == total_set);
314 if (ready_set.
empty()) {
327 const BitSet &base_send_set) {
328 size_t total = base_send_set.
count();
333 size_t client_id = base_send_set.
find_first();
347 }
else if (update->
IsLink()) {
357 client_id = base_send_set.
find_next(client_id);
void ProcessUpdate(IFMapUpdate *update, const BitSet &base_send_set)
virtual const std::string & identifier() const =0
void incr_delete_nodes_sent()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
bool test(size_t pos) const
#define IFMAP_UPD_SENDER_TRACE(obj,...)
IFMapUpdateQueue * queue_
void MoveMarkerBefore(IFMapMarker *marker, IFMapListEntry *current)
virtual void QueueActive()
BitSet & reset(size_t pos)
void incr_delete_links_sent()
void MoveMarkerAfter(IFMapMarker *marker, IFMapListEntry *current)
void Reset(const BitSet &rhs)
IFMapMarker * tail_marker()
IFMapListEntry * GetLast()
IFMapUpdateSender * sender_
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
IFMapExporter * exporter()
const BitSet & advertise() const
virtual void SendActive(int index)
const std::string & get_string() const
void AdvertiseReset(const BitSet &set)
void SendUpdate(BitSet send_set, BitSet *blocked_set)
#define IFMAP_DEBUG_ONLY(obj,...)
void EncodeUpdate(const IFMapUpdate *update)
IFMapMarker * GetMarker(int bit)
void Dequeue(IFMapUpdate *update)
void LogAndCountSentUpdate(IFMapUpdate *update, const BitSet &base_send_set)
void incr_update_links_sent()
IFMapClient * GetClient(int index)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
std::string TypeToString()
IFMapMarker * ProcessMarker(IFMapMarker *marker, IFMapMarker *next_marker, bool *done)
IFMapMarker * MarkerSplitBefore(IFMapMarker *marker, IFMapListEntry *current, const BitSet &msplit)
void StateUpdateOnDequeue(IFMapUpdate *update, const BitSet &dequeue_set, bool is_delete)
void MarkerMerge(IFMapMarker *dst, IFMapMarker *src, const BitSet &mmove)
size_t find_first() const
void BuildComplement(const BitSet &lhs, const BitSet &rhs)
size_t find_next(size_t pos) const
virtual std::string ToString()
virtual const std::string & name() const
SendTask(IFMapUpdateSender *sender)
IFMapUpdateSender(IFMapServer *server, IFMapUpdateQueue *queue)
virtual std::string ToString()
void incr_update_nodes_sent()
std::string ToNumberedString() const
Task is a wrapper over tbb::task to support policies.
void SetReceiverInMsg(const std::string &cli_identifier)
std::string Description() const
void CleanupClient(int index)
void Send(IFMapMarker *imarker)
void GetSendScheduled(BitSet *current)
virtual ~IFMapUpdateSender()
IFMapListEntry * Next(IFMapListEntry *current)