8 #include <boost/asio/ip/address.hpp>
9 #include <boost/asio/ip/host_name.hpp>
10 #include <boost/program_options.hpp>
11 #include <boost/assign/list_of.hpp>
12 #include <boost/uuid/random_generator.hpp>
13 #include <boost/random/mersenne_twister.hpp>
14 #include <boost/random/uniform_int_distribution.hpp>
20 #include <sandesh/common/vns_constants.h>
21 #include <sandesh/common/vns_types.h>
22 #include <sandesh/common/flow_types.h>
23 #include <ksync/ksync_types.h>
25 namespace opt = boost::program_options;
36 std::string &node_type_name,
37 std::string &instance_id,
38 int http_server_port,
int start_vn,
int end_vn,
int other_vn,
39 int num_vns,
int vm_iterations,
40 std::vector<std::string> &collectors,
41 std::vector<uint32_t> &ip_vns,
43 int num_vrouter_error_messages_per_sec,
44 int num_sessions_per_vm,
45 int num_session_samples_per_sec,
46 int num_session_samples_in_message,
78 scheduler->
GetTaskId(
"mockgen::SendSessionTask"), -1));
83 scheduler->
GetTaskId(
"mockgen::SendMessageTask"), -1));
94 int task_id,
int task_instance) :
95 Task(task_id, task_instance),
96 mgen_(mock_generator) {
100 std::string str1(
"VRouter operation failed. Error <");
102 std::string str2(
":");
103 std::string error_msg(
"Entry not pressent");
104 std::string str3(
">. Object <");
105 std::string obj_str(
"Flow: 333333 with Source IP: "
106 "not present >. Object < Flow: 333333 with Source IP: "
107 "10.0.0.6 Source Port: 3333 Destination IP: 10.0.0.10 "
108 "Destination Port: 13333 Protocol 6");
109 std::string str4(
">. Operation <");
110 std::string state_str(
"Change");
111 std::string str5(
">. Message number :");
112 uint64_t msg_no(418940931);
113 V_ROUTER_ERROR_LOG(
"", SandeshLevel::SYS_DEBUG, str1, error,
114 str2, error_msg, str3, obj_str, str4, state_str, str5,
119 uint64_t diff_time(0);
125 if (diff_time >= 1000000) {
126 LOG(ERROR,
"Sent: " << i + 1 <<
" in " <<
127 diff_time/1000000 <<
" seconds, NOT sending at " <<
132 usleep(1000000 - diff_time);
137 return "SendMessageTask";
148 int task_id,
int task_instance) :
149 Task(task_id, task_instance),
150 mgen_(mock_generator) {
160 SessionEndpoint end_point;
168 end_point.set_deployment(
172 end_point.set_application(
176 end_point.set_remote_deployment(
178 end_point.set_remote_tier(
180 end_point.set_remote_application(
185 std::vector<std::string> labels;
186 std::vector<std::string> remote_labels;
189 for (
int i = 0; i < nlabels + 1; i++) {
193 for (
int i = 0; i < nremote_labels + 1; i++) {
194 remote_labels.push_back(
197 end_point.set_labels(
198 std::set<string>(labels.begin(),labels.end()));
199 end_point.set_remote_labels(
200 std::set<string>(remote_labels.begin(),remote_labels.end()));
203 for (
int i = 0; i < nsport; i++) {
209 SessionIpPortProtocol sess_ip_port_proto;
210 sess_ip_port_proto.set_local_ip(ipaddr);
211 sess_ip_port_proto.set_service_port(port);
212 sess_ip_port_proto.set_protocol(proto);
213 SessionAggInfo place_holder;
214 sess_agg_map[sess_ip_port_proto];
216 end_point.set_sess_agg_info(sess_agg_map);
224 int lsession_cnt = 0;
225 int last_lsession_cnt = 0;
226 uint64_t diff_time = 0;
227 std::vector<SessionEndpoint>::iterator begin(
mgen_->
sessions_.begin() +
229 for (std::vector<SessionEndpoint>::iterator it = begin;
231 bool sent_message(
false);
233 SessionEndpoint &end_point(*it);
235 for (SessionAggMap::const_iterator it2
236 = end_point.get_sess_agg_info().begin();
237 it2 != end_point.get_sess_agg_info().end(); ++it2) {
238 SessionAggInfo sess_agg_info;
239 std::map<SessionIpPort, SessionInfo> session_map;
241 for (
int i = 0; i < ncport; i++) {
244 for (
int j = 0; j < nips; j++) {
248 std::string::npos), other_vn);
251 SessionIpPort sess_ip_port;
252 sess_ip_port.set_port(cport);
253 sess_ip_port.set_ip(ipaddr);
254 std::map<SessionIpPort, SessionInfo>::iterator iter
255 = session_map.find(sess_ip_port);
256 if (iter != session_map.end()) {
259 SessionInfo session_val;
260 SessionFlowInfo forward_flow_info;
261 SessionFlowInfo reverse_flow_info;
268 forward_flow_info.set_sampled_pkts(forward_pkts);
269 forward_flow_info.set_sampled_bytes(forward_pkts *
271 reverse_flow_info.set_sampled_pkts(reverse_pkts);
272 reverse_flow_info.set_sampled_bytes(reverse_pkts *
274 sess_agg_info.set_sampled_forward_pkts(
275 sess_agg_info.get_sampled_forward_pkts() +
277 sess_agg_info.set_sampled_forward_bytes(
278 sess_agg_info.get_sampled_forward_bytes() +
279 forward_flow_info.get_sampled_bytes());
280 sess_agg_info.set_sampled_reverse_pkts(
281 sess_agg_info.get_sampled_reverse_pkts() +
283 sess_agg_info.set_sampled_reverse_bytes(
284 sess_agg_info.get_sampled_reverse_bytes() +
285 reverse_flow_info.get_sampled_bytes());
287 forward_flow_info.set_logged_pkts(forward_pkts);
288 forward_flow_info.set_logged_bytes(forward_pkts *
290 reverse_flow_info.set_logged_pkts(reverse_pkts);
291 reverse_flow_info.set_logged_bytes(reverse_pkts *
293 sess_agg_info.set_logged_forward_pkts(
294 sess_agg_info.get_logged_forward_pkts() +
296 sess_agg_info.set_logged_forward_bytes(
297 sess_agg_info.get_logged_forward_bytes() +
298 forward_flow_info.get_logged_bytes());
299 sess_agg_info.set_logged_reverse_pkts(
300 sess_agg_info.get_logged_reverse_pkts() +
302 sess_agg_info.set_logged_reverse_bytes(
303 sess_agg_info.get_logged_reverse_bytes() +
304 reverse_flow_info.get_logged_bytes());
306 session_val.set_forward_flow_info(forward_flow_info);
307 session_val.set_reverse_flow_info(reverse_flow_info);
308 session_map[sess_ip_port] = session_val;
311 sess_agg_info.set_sessionMap(session_map);
312 sess_agg_info_map[it2->first] = sess_agg_info;
314 end_point.set_sess_agg_info(sess_agg_info_map);
317 SESSION_ENDPOINT_OBJECT_LOG(
"", SandeshLevel::SYS_NOTICE,
318 std::vector<SessionEndpoint>(begin+last_lsession_cnt, it + 1));
320 last_lsession_cnt = lsession_cnt;
323 SESSION_ENDPOINT_OBJECT_LOG(
"", SandeshLevel::SYS_NOTICE,
324 std::vector<SessionEndpoint>(begin+last_lsession_cnt, it + 1));
327 usleep(1000000 - diff_time);
331 if (diff_time >= 1000000) {
333 LOG(ERROR,
"Sent: " << lsession_cnt <<
" in " <<
334 diff_time/1000000 <<
" seconds, NOT sending at " <<
346 return "SendSessionTask";
362 const static boost::random::uniform_int_distribution<>
364 const static boost::random::uniform_int_distribution<>
366 const static boost::random::uniform_int_distribution<>
368 const static boost::random::uniform_int_distribution<>
370 const static boost::random::uniform_int_distribution<>
372 const static boost::random::uniform_int_distribution<>
374 const static boost::random::uniform_int_distribution<>
376 const static boost::random::uniform_int_distribution<>
378 const static boost::random::uniform_int_distribution<>
381 const static boost::random::uniform_int_distribution<>
383 const static boost::random::uniform_int_distribution<>
387 const static std::vector<std::string>
kTier;
388 const static std::vector<std::string>
kSite;
419 const boost::random::uniform_int_distribution<>
421 const boost::random::uniform_int_distribution<>
423 const boost::random::uniform_int_distribution<>
425 const boost::random::uniform_int_distribution<>
427 const boost::random::uniform_int_distribution<>
429 const boost::random::uniform_int_distribution<>
433 const boost::random::uniform_int_distribution<>
438 (
"Dep1")(
"Dep2")(
"Dep3")(
"Dep4");
440 (
"Tier1")(
"Tier2")(
"Tier3")(
"Tier4");
442 (
"App1")(
"App2")(
"App3")(
"App4");
444 (
"Site1")(
"Site2")(
"Site3")(
"Site4");
446 (
"Label1")(
"Label2")(
"Label3")(
"Label4")(
"Label5");
447 const boost::random::uniform_int_distribution<>
449 const boost::random::uniform_int_distribution<>
451 const boost::random::uniform_int_distribution<>
453 const boost::random::uniform_int_distribution<>
461 int main(
int argc,
char *argv[]) {
462 bool log_local(
false), use_syslog(
false), log_flow(
false);
463 std::string log_category;
464 opt::options_description desc(
"Command line options");
466 (
"help",
"help message")
467 (
"collectors", opt::value<std::vector<std::string> >()->multitoken(
468 )->default_value(std::vector<std::string>(1,
"127.0.0.1:8086"),
470 "List of Collectors addresses in ip:port format")
471 (
"num_instances_per_generator", opt::value<int>()->default_value(10),
472 "Number of instances (virtual machines) per generator")
473 (
"num_networks", opt::value<int>()->default_value(100),
474 "Number of virtual networks")
475 (
"num_sessions_per_instance", opt::value<int>()->default_value(10),
476 "Number of sessions per instance")
478 opt::value<std::string>()->default_value(
"1.0.0.1"),
479 "Start IP address to be used for instances")
480 (
"http_server_port", opt::value<int>()->default_value(-1),
482 (
"generator_id", opt::value<int>()->default_value(0),
484 (
"num_generators", opt::value<int>()->default_value(1),
485 "Number of generators")
486 (
"num_vrouter_errors_per_second", opt::value<int>()->default_value(
488 "Number of VRouterErrror messages to send in one second")
489 (
"num_session_samples_per_second", opt::value<int>()->default_value(
491 "Number of session messages to send in one second")
492 (
"num_session_samples_in_message", opt::value<int>()->default_value(
494 "Number of session samples to send in one message")
495 (
"log_property_file", opt::value<std::string>()->default_value(
""),
496 "log4cplus property file name")
497 (
"log_files_count", opt::value<int>()->default_value(10),
498 "Maximum log file roll over index")
500 opt::value<long>()->default_value(10*1024*1024),
501 "Maximum size of the log file")
503 opt::value<std::string>()->default_value(log_category),
504 "Category filter for local logging of sandesh messages")
505 (
"log_file", opt::value<std::string>()->default_value(
"<stdout>"),
506 "Filename for the logs to be written to")
507 (
"log_level", opt::value<std::string>()->default_value(
"SYS_NOTICE"),
508 "Severity level for local logging of sandesh messages")
509 (
"log_local", opt::bool_switch(&log_local),
510 "Enable local logging of sandesh messages")
511 (
"use_syslog", opt::bool_switch(&use_syslog),
512 "Enable logging to syslog")
513 (
"syslog_facility", opt::value<std::string>()->default_value(
514 "LOG_LOCAL0"),
"Syslog facility to receive log lines")
515 (
"log_flow", opt::bool_switch(&log_flow),
516 "Enable local logging of flow sandesh messages")
517 (
"slo_destination", opt::value<std::vector<std::string> >()->multitoken(
518 )->default_value(std::vector<std::string>(1,
"collector"),
520 "List of destinations. valid values are collector, file, syslog")
521 (
"sampled_destination", opt::value<std::vector<std::string> >()->multitoken(
522 )->default_value(std::vector<std::string>(1,
"collector"),
524 "List of destinations. valid values are collector, file, syslog");
526 opt::variables_map var_map;
527 opt::store(opt::parse_command_line(argc, argv, desc), var_map);
528 opt::notify(var_map);
530 if (var_map.count(
"help")) {
531 std::cout << desc << std::endl;
536 std::string moduleid(g_vns_constants.ModuleNames.find(module)->second);
537 std::string log_property_file(
538 var_map[
"log_property_file"].as<std::string>());
539 if (log_property_file.size()) {
543 var_map[
"log_file_size"].as<long>(),
544 var_map[
"log_files_count"].as<int>(),
546 var_map[
"syslog_facility"].as<std::string>(),
549 var_map[
"log_level"].as<std::string>())));
552 var_map[
"log_category"].as<std::string>(),
553 var_map[
"log_level"].as<std::string>(),
false, log_flow);
555 std::vector<std::string> slo_destination(
556 var_map[
"slo_destination"].as<std::vector<std::string> >());
557 std::vector<std::string> sample_destination(
558 var_map[
"sampled_destination"].as<std::vector<std::string> >());
560 var_map[
"log_file_size"].as<long>(),
561 var_map[
"log_files_count"].as<int>(),
562 var_map[
"syslog_facility"].as<std::string>(),
567 var_map[
"log_file_size"].as<long>(),
568 var_map[
"log_files_count"].as<int>(),
569 var_map[
"syslog_facility"].as<std::string>(),
575 int gen_id(var_map[
"generator_id"].as<int>());
576 int ngens(var_map[
"num_generators"].as<int>());
578 int num_instances(var_map[
"num_instances_per_generator"].as<int>());
579 int num_networks(var_map[
"num_networks"].as<int>());
581 g_vns_constants.Module2NodeType.find(module)->second);
582 std::string node_type_name(
583 g_vns_constants.NodeTypeNames.find(node_type)->second);
584 int http_server_port(var_map[
"http_server_port"].as<int>());
585 std::vector<std::string> collectors(
586 var_map[
"collectors"].as<std::vector<std::string> >());
588 boost::system::error_code ec;
589 std::string hostname(boost::asio::ip::host_name(ec));
591 LOG(ERROR,
"Hostname FAILED: " << ec);
595 int gen_factor = num_networks / num_instances;
596 if (gen_factor == 0) {
597 LOG(ERROR,
"Number of virtual networks(" << num_networks <<
") should "
598 "be greater than number of instances per generator(" <<
599 num_instances <<
")");
602 int start_vn((gen_id % gen_factor) * num_instances);
603 int end_vn(((gen_id % gen_factor) + 1) * num_instances);
604 int other_vn_adj(num_networks / 2);
606 if (gen_id >= other_vn_adj) {
607 other_vn = gen_id - other_vn_adj;
609 other_vn = gen_id + other_vn_adj;
611 int instance_iterations((num_instances + num_networks - 1) / num_networks);
612 int num_ips_per_vn(((ngens * num_instances) + num_networks - 1) /
614 std::string start_ip(var_map[
"start_ip_address"].as<std::string>());
615 boost::asio::ip::address_v4 start_ip_address(
616 boost::asio::ip::address_v4::from_string(start_ip.c_str(), ec));
618 LOG(ERROR,
"IP Address (" << start_ip <<
") FAILED: " << ec);
621 std::vector<uint32_t> ip_vns;
622 for (
int num = 0;
num < num_networks;
num++) {
623 ip_vns.push_back(start_ip_address.to_ulong() +
624 num_ips_per_vn *
num);
626 int start_ip_index(gen_id * num_instances / num_networks);
629 int num_sessions_per_instance(var_map[
"num_sessions_per_instance"].as<int>());
630 int num_session_samples_per_sec(
631 var_map[
"num_session_samples_per_second"].as<int>());
632 int num_session_samples_in_message(
633 var_map[
"num_session_samples_in_message"].as<int>());
634 int num_vrouter_error_messages_per_sec(
635 var_map[
"num_vrouter_errors_per_second"].as<int>());
637 MockGenerator mock_generator(hostname, moduleid, node_type_name,
638 instance_id, http_server_port, start_vn, end_vn, other_vn,
639 num_networks, instance_iterations, collectors, ip_vns, start_ip_index,
640 num_vrouter_error_messages_per_sec,
641 num_sessions_per_instance, num_session_samples_per_sec,
642 num_session_samples_in_message, &
evm);
643 mock_generator.Run();
boost::asio::ip::address IpAddress
boost::asio::ip::address_v4 Ip4Address
std::string Description() const
SendMessageTask(MockGenerator *mock_generator, int task_id, int task_instance)
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
std::string Description() const
SendSessionTask(MockGenerator *mock_generator, int task_id, int task_instance)
static const boost::random::uniform_int_distribution dProtocols
static const boost::random::uniform_int_distribution dIps
static const boost::random::uniform_int_distribution dTagIdx
static const boost::random::uniform_int_distribution dClientSession
static const std::vector< int > kPorts
static const int kNumSessionSamplesPerSec
static const std::vector< std::string > kTier
static const std::string kVnPrefix
const int ip_start_index_
const int num_vrouter_error_messages_per_sec_
static const boost::random::uniform_int_distribution dPort
static const std::string kVmPrefix
boost::uuids::random_generator u_rgen_
static const boost::random::uniform_int_distribution dFlowPktsPerSec
const int num_session_samples_per_sec_
const int http_server_port_
static const int kBytesPerPacket
friend class SendMessageTask
static const int kFlowPktsPerSec
const std::string module_name_
static const std::vector< std::string > kDeployment
boost::random::mt19937 rgen_
static const int kMaxPorts
const int num_session_per_vm_
const std::string node_type_name_
const int num_session_samples_in_message_
static const std::vector< std::string > kApplication
static int session_counter_
static const int kFlowMsgIntvlInSec
static const boost::random::uniform_int_distribution dNPorts
static const int kOtherVnPktsPerSec
static const std::vector< string > kLabels
const std::vector< uint32_t > ip_vns_
static const int kNumSessionSamplesInMessage
static const boost::random::uniform_int_distribution dOtherVnPktsPerSec
MockGenerator(std::string &hostname, std::string &module_name, std::string &node_type_name, std::string &instance_id, int http_server_port, int start_vn, int end_vn, int other_vn, int num_vns, int vm_iterations, std::vector< std::string > &collectors, std::vector< uint32_t > &ip_vns, int ip_start_index, int num_vrouter_error_messages_per_sec, int num_sessions_per_vm, int num_session_samples_per_sec, int num_session_samples_in_message, EventManager *evm)
std::vector< SessionEndpoint > sessions_
const std::vector< std::string > collectors_
static const int kUveMsgIntvlInSec
static const std::vector< std::string > kSite
static const boost::random::uniform_int_distribution dDirection
static const std::vector< int > kProtocols
const std::string instance_id_
static const boost::random::uniform_int_distribution dLabels
const std::string hostname_
static const boost::random::uniform_int_distribution dBytesPerPacket
static const int kNumVRouterErrorMessagesPerSec
static void set_send_to_collector_flags(const std::vector< std::string > &sampled_destination, const std::vector< std::string > &slo_destination)
static bool InitGenerator(const std::string &module, const std::string &source, const std::string &node_type, const std::string &instance_id, EventManager *evm, unsigned short http_port, const std::vector< std::string > &collectors, SandeshContext *client_context=NULL, DerivedStats ds=DerivedStats(), const SandeshConfig &config=SandeshConfig())
static void set_logger_appender(const std::string &file_name, long max_file_size, int max_backup_index, const std::string &syslog_facility, const std::vector< std::string > &destn, const std::string &ident, bool is_sampled_logger)
static void SetLoggingParams(bool enable_local_log, std::string category, std::string level, bool enable_trace_print=false, bool enable_flow_log=false, bool enable_session_syslog=false)
static SandeshLevel::type StringToLevel(std::string level)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a wrapper over tbb::task to support policies.
#define LOG(_Level, _Msg)
std::map< SessionIpPortProtocol, SessionAggInfo > SessionAggMap
log4cplus::LogLevel SandeshLevelTolog4Level(SandeshLevel::type slevel)
bool stringToInteger(const std::string &str, NumberType &num)
static const std::string integerToString(const NumberType &num)
static uint64_t UTCTimestampUsec()