8 #include <boost/asio/ip/address.hpp>
9 #include <boost/asio/ip/host_name.hpp>
10 #include <boost/program_options.hpp>
11 #include <boost/assign/list_of.hpp>
12 #include <boost/uuid/random_generator.hpp>
13 #include <boost/random/mersenne_twister.hpp>
14 #include <boost/random/uniform_int_distribution.hpp>
20 #include <sandesh/common/vns_constants.h>
21 #include <sandesh/common/vns_types.h>
22 #include <sandesh/common/flow_types.h>
23 #include <ksync/ksync_types.h>
25 namespace opt = boost::program_options;
36 std::string &node_type_name,
37 std::string &instance_id,
38 int http_server_port,
int start_vn,
int end_vn,
int other_vn,
39 int num_vns,
int vm_iterations,
40 std::vector<std::string> &collectors,
41 std::vector<uint32_t> &ip_vns,
43 int num_vrouter_error_messages_per_sec,
44 int num_sessions_per_vm,
45 int num_session_samples_per_sec,
46 int num_session_samples_in_message,
78 scheduler->
GetTaskId(
"mockgen::SendSessionTask"), -1));
83 scheduler->
GetTaskId(
"mockgen::SendMessageTask"), -1));
94 int task_id,
int task_instance) :
95 Task(task_id, task_instance),
96 mgen_(mock_generator) {
100 std::string str1(
"VRouter operation failed. Error <");
102 std::string str2(
":");
103 std::string error_msg(
"Entry not pressent");
104 std::string str3(
">. Object <");
105 std::string obj_str(
"Flow: 333333 with Source IP: "
106 "not present >. Object < Flow: 333333 with Source IP: "
107 "10.0.0.6 Source Port: 3333 Destination IP: 10.0.0.10 "
108 "Destination Port: 13333 Protocol 6");
109 std::string str4(
">. Operation <");
110 std::string state_str(
"Change");
111 std::string str5(
">. Message number :");
112 uint64_t msg_no(418940931);
113 V_ROUTER_ERROR_LOG(
"", SandeshLevel::SYS_DEBUG, str1, error,
114 str2, error_msg, str3, obj_str, str4, state_str, str5,
119 uint64_t diff_time(0);
125 if (diff_time >= 1000000) {
126 LOG(ERROR,
"Sent: " << i + 1 <<
" in " <<
127 diff_time/1000000 <<
" seconds, NOT sending at " <<
132 usleep(1000000 - diff_time);
137 return "SendMessageTask";
148 int task_id,
int task_instance) :
149 Task(task_id, task_instance),
150 mgen_(mock_generator) {
160 SessionEndpoint end_point;
168 end_point.set_deployment(
172 end_point.set_application(
176 end_point.set_remote_deployment(
178 end_point.set_remote_tier(
180 end_point.set_remote_application(
185 std::vector<std::string> labels;
186 std::vector<std::string> remote_labels;
189 for (
int i = 0; i < nlabels + 1; i++) {
193 for (
int i = 0; i < nremote_labels + 1; i++) {
194 remote_labels.push_back(
197 end_point.set_labels(
198 std::set<string>(labels.begin(),labels.end()));
199 end_point.set_remote_labels(
200 std::set<string>(remote_labels.begin(),remote_labels.end()));
203 for (
int i = 0; i < nsport; i++) {
209 SessionIpPortProtocol sess_ip_port_proto;
210 sess_ip_port_proto.set_local_ip(ipaddr);
211 sess_ip_port_proto.set_service_port(port);
212 sess_ip_port_proto.set_protocol(proto);
213 SessionAggInfo place_holder;
214 sess_agg_map[sess_ip_port_proto];
216 end_point.set_sess_agg_info(sess_agg_map);
224 int lsession_cnt = 0;
225 int last_lsession_cnt = 0;
226 uint64_t diff_time = 0;
227 std::vector<SessionEndpoint>::iterator begin(
mgen_->
sessions_.begin() +
229 for (std::vector<SessionEndpoint>::iterator it = begin;
231 bool sent_message(
false);
233 SessionEndpoint &end_point(*it);
235 for (SessionAggMap::const_iterator it2
236 = end_point.get_sess_agg_info().begin();
237 it2 != end_point.get_sess_agg_info().end(); ++it2) {
238 SessionAggInfo sess_agg_info;
239 std::map<SessionIpPort, SessionInfo> session_map;
241 for (
int i = 0; i < ncport; i++) {
244 for (
int j = 0; j < nips; j++) {
248 std::string::npos), other_vn);
251 SessionIpPort sess_ip_port;
252 sess_ip_port.set_port(cport);
253 sess_ip_port.set_ip(ipaddr);
254 std::map<SessionIpPort, SessionInfo>::iterator iter
255 = session_map.find(sess_ip_port);
256 if (iter != session_map.end()) {
259 SessionInfo session_val;
260 SessionFlowInfo forward_flow_info;
261 SessionFlowInfo reverse_flow_info;
268 forward_flow_info.set_sampled_pkts(forward_pkts);
269 forward_flow_info.set_sampled_bytes(forward_pkts *
271 reverse_flow_info.set_sampled_pkts(reverse_pkts);
272 reverse_flow_info.set_sampled_bytes(reverse_pkts *
274 sess_agg_info.set_sampled_forward_pkts(
275 sess_agg_info.get_sampled_forward_pkts() +
277 sess_agg_info.set_sampled_forward_bytes(
278 sess_agg_info.get_sampled_forward_bytes() +
279 forward_flow_info.get_sampled_bytes());
280 sess_agg_info.set_sampled_reverse_pkts(
281 sess_agg_info.get_sampled_reverse_pkts() +
283 sess_agg_info.set_sampled_reverse_bytes(
284 sess_agg_info.get_sampled_reverse_bytes() +
285 reverse_flow_info.get_sampled_bytes());
287 forward_flow_info.set_logged_pkts(forward_pkts);
288 forward_flow_info.set_logged_bytes(forward_pkts *
290 reverse_flow_info.set_logged_pkts(reverse_pkts);
291 reverse_flow_info.set_logged_bytes(reverse_pkts *
293 sess_agg_info.set_logged_forward_pkts(
294 sess_agg_info.get_logged_forward_pkts() +
296 sess_agg_info.set_logged_forward_bytes(
297 sess_agg_info.get_logged_forward_bytes() +
298 forward_flow_info.get_logged_bytes());
299 sess_agg_info.set_logged_reverse_pkts(
300 sess_agg_info.get_logged_reverse_pkts() +
302 sess_agg_info.set_logged_reverse_bytes(
303 sess_agg_info.get_logged_reverse_bytes() +
304 reverse_flow_info.get_logged_bytes());
306 session_val.set_forward_flow_info(forward_flow_info);
307 session_val.set_reverse_flow_info(reverse_flow_info);
308 session_map[sess_ip_port] = session_val;
311 sess_agg_info.set_sessionMap(session_map);
312 sess_agg_info_map[it2->first] = sess_agg_info;
314 end_point.set_sess_agg_info(sess_agg_info_map);
317 SESSION_ENDPOINT_OBJECT_LOG(
"", SandeshLevel::SYS_NOTICE,
318 std::vector<SessionEndpoint>(begin+last_lsession_cnt, it + 1));
320 last_lsession_cnt = lsession_cnt;
323 SESSION_ENDPOINT_OBJECT_LOG(
"", SandeshLevel::SYS_NOTICE,
324 std::vector<SessionEndpoint>(begin+last_lsession_cnt, it + 1));
327 usleep(1000000 - diff_time);
331 if (diff_time >= 1000000) {
333 LOG(ERROR,
"Sent: " << lsession_cnt <<
" in " <<
334 diff_time/1000000 <<
" seconds, NOT sending at " <<
346 return "SendSessionTask";
362 const static boost::random::uniform_int_distribution<>
364 const static boost::random::uniform_int_distribution<>
366 const static boost::random::uniform_int_distribution<>
368 const static boost::random::uniform_int_distribution<>
370 const static boost::random::uniform_int_distribution<>
372 const static boost::random::uniform_int_distribution<>
374 const static boost::random::uniform_int_distribution<>
376 const static boost::random::uniform_int_distribution<>
378 const static boost::random::uniform_int_distribution<>
381 const static boost::random::uniform_int_distribution<>
383 const static boost::random::uniform_int_distribution<>
387 const static std::vector<std::string>
kTier;
388 const static std::vector<std::string>
kSite;
419 const boost::random::uniform_int_distribution<>
421 const boost::random::uniform_int_distribution<>
423 const boost::random::uniform_int_distribution<>
425 const boost::random::uniform_int_distribution<>
427 const boost::random::uniform_int_distribution<>
429 const boost::random::uniform_int_distribution<>
433 const boost::random::uniform_int_distribution<>
438 (
"Dep1")(
"Dep2")(
"Dep3")(
"Dep4");
440 (
"Tier1")(
"Tier2")(
"Tier3")(
"Tier4");
442 (
"App1")(
"App2")(
"App3")(
"App4");
444 (
"Site1")(
"Site2")(
"Site3")(
"Site4");
446 (
"Label1")(
"Label2")(
"Label3")(
"Label4")(
"Label5");
447 const boost::random::uniform_int_distribution<>
449 const boost::random::uniform_int_distribution<>
451 const boost::random::uniform_int_distribution<>
453 const boost::random::uniform_int_distribution<>
461 int main(
int argc,
char *argv[]) {
462 bool log_local(
false), use_syslog(
false), log_flow(
false);
463 std::string log_category;
464 opt::options_description desc(
"Command line options");
466 (
"help",
"help message")
467 (
"collectors", opt::value<std::vector<std::string> >()->multitoken(
468 )->default_value(std::vector<std::string>(1,
"127.0.0.1:8086"),
470 "List of Collectors addresses in ip:port format")
471 (
"num_instances_per_generator", opt::value<int>()->default_value(10),
472 "Number of instances (virtual machines) per generator")
473 (
"num_networks", opt::value<int>()->default_value(100),
474 "Number of virtual networks")
475 (
"num_sessions_per_instance", opt::value<int>()->default_value(10),
476 "Number of sessions per instance")
478 opt::value<std::string>()->default_value(
"1.0.0.1"),
479 "Start IP address to be used for instances")
480 (
"http_server_port", opt::value<int>()->default_value(-1),
482 (
"generator_id", opt::value<int>()->default_value(0),
484 (
"num_generators", opt::value<int>()->default_value(1),
485 "Number of generators")
486 (
"num_vrouter_errors_per_second", opt::value<int>()->default_value(
488 "Number of VRouterErrror messages to send in one second")
489 (
"num_session_samples_per_second", opt::value<int>()->default_value(
491 "Number of session messages to send in one second")
492 (
"num_session_samples_in_message", opt::value<int>()->default_value(
494 "Number of session samples to send in one message")
495 (
"log_property_file", opt::value<std::string>()->default_value(
""),
496 "log4cplus property file name")
497 (
"log_files_count", opt::value<int>()->default_value(10),
498 "Maximum log file roll over index")
500 opt::value<long>()->default_value(10*1024*1024),
501 "Maximum size of the log file")
503 opt::value<std::string>()->default_value(log_category),
504 "Category filter for local logging of sandesh messages")
505 (
"log_file", opt::value<std::string>()->default_value(
"<stdout>"),
506 "Filename for the logs to be written to")
507 (
"log_level", opt::value<std::string>()->default_value(
"SYS_NOTICE"),
508 "Severity level for local logging of sandesh messages")
509 (
"log_local", opt::bool_switch(&log_local),
510 "Enable local logging of sandesh messages")
511 (
"use_syslog", opt::bool_switch(&use_syslog),
512 "Enable logging to syslog")
513 (
"syslog_facility", opt::value<std::string>()->default_value(
514 "LOG_LOCAL0"),
"Syslog facility to receive log lines")
515 (
"log_flow", opt::bool_switch(&log_flow),
516 "Enable local logging of flow sandesh messages")
517 (
"slo_destination", opt::value<std::vector<std::string> >()->multitoken(
518 )->default_value(std::vector<std::string>(1,
"collector"),
520 "List of destinations. valid values are collector, file, syslog")
521 (
"sampled_destination", opt::value<std::vector<std::string> >()->multitoken(
522 )->default_value(std::vector<std::string>(1,
"collector"),
524 "List of destinations. valid values are collector, file, syslog");
526 opt::variables_map var_map;
527 opt::store(opt::parse_command_line(argc, argv, desc), var_map);
528 opt::notify(var_map);
530 if (var_map.count(
"help")) {
531 std::cout << desc << std::endl;
536 std::string moduleid(g_vns_constants.ModuleNames.find(module)->second);
537 std::string log_property_file(
538 var_map[
"log_property_file"].as<std::string>());
539 if (log_property_file.size()) {
543 var_map[
"log_file_size"].as<long>(),
544 var_map[
"log_files_count"].as<int>(),
546 var_map[
"syslog_facility"].as<std::string>(),
549 var_map[
"log_level"].as<std::string>())));
552 var_map[
"log_category"].as<std::string>(),
553 var_map[
"log_level"].as<std::string>(),
false, log_flow);
555 std::vector<std::string> slo_destination(
556 var_map[
"slo_destination"].as<std::vector<std::string> >());
557 std::vector<std::string> sample_destination(
558 var_map[
"sampled_destination"].as<std::vector<std::string> >());
560 var_map[
"log_file_size"].as<long>(),
561 var_map[
"log_files_count"].as<int>(),
562 var_map[
"syslog_facility"].as<std::string>(),
567 var_map[
"log_file_size"].as<long>(),
568 var_map[
"log_files_count"].as<int>(),
569 var_map[
"syslog_facility"].as<std::string>(),
575 int gen_id(var_map[
"generator_id"].as<int>());
576 int ngens(var_map[
"num_generators"].as<int>());
578 int num_instances(var_map[
"num_instances_per_generator"].as<int>());
579 int num_networks(var_map[
"num_networks"].as<int>());
581 g_vns_constants.Module2NodeType.find(module)->second);
582 std::string node_type_name(
583 g_vns_constants.NodeTypeNames.find(node_type)->second);
584 int http_server_port(var_map[
"http_server_port"].as<int>());
585 std::vector<std::string> collectors(
586 var_map[
"collectors"].as<std::vector<std::string> >());
588 boost::system::error_code ec;
589 std::string hostname(boost::asio::ip::host_name(ec));
591 LOG(ERROR,
"Hostname FAILED: " << ec);
595 int gen_factor = num_networks / num_instances;
596 if (gen_factor == 0) {
597 LOG(ERROR,
"Number of virtual networks(" << num_networks <<
") should "
598 "be greater than number of instances per generator(" <<
599 num_instances <<
")");
602 int start_vn((gen_id % gen_factor) * num_instances);
603 int end_vn(((gen_id % gen_factor) + 1) * num_instances);
604 int other_vn_adj(num_networks / 2);
606 if (gen_id >= other_vn_adj) {
607 other_vn = gen_id - other_vn_adj;
609 other_vn = gen_id + other_vn_adj;
611 int instance_iterations((num_instances + num_networks - 1) / num_networks);
612 int num_ips_per_vn(((ngens * num_instances) + num_networks - 1) /
614 std::string start_ip(var_map[
"start_ip_address"].as<std::string>());
615 boost::asio::ip::address_v4 start_ip_address(
616 boost::asio::ip::address_v4::from_string(start_ip.c_str(), ec));
618 LOG(ERROR,
"IP Address (" << start_ip <<
") FAILED: " << ec);
621 std::vector<uint32_t> ip_vns;
622 for (
int num = 0;
num < num_networks;
num++) {
623 ip_vns.push_back(start_ip_address.to_ulong() +
624 num_ips_per_vn *
num);
626 int start_ip_index(gen_id * num_instances / num_networks);
629 int num_sessions_per_instance(var_map[
"num_sessions_per_instance"].as<int>());
630 int num_session_samples_per_sec(
631 var_map[
"num_session_samples_per_second"].as<int>());
632 int num_session_samples_in_message(
633 var_map[
"num_session_samples_in_message"].as<int>());
634 int num_vrouter_error_messages_per_sec(
635 var_map[
"num_vrouter_errors_per_second"].as<int>());
637 MockGenerator mock_generator(hostname, moduleid, node_type_name,
638 instance_id, http_server_port, start_vn, end_vn, other_vn,
639 num_networks, instance_iterations, collectors, ip_vns, start_ip_index,
640 num_vrouter_error_messages_per_sec,
641 num_sessions_per_instance, num_session_samples_per_sec,
642 num_session_samples_in_message, &evm);
643 mock_generator.Run();
log4cplus::LogLevel SandeshLevelTolog4Level(SandeshLevel::type slevel)
static int session_counter_
static const boost::random::uniform_int_distribution dProtocols
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
static const int kFlowPktsPerSec
static SandeshLevel::type StringToLevel(std::string level)
boost::uuids::random_generator u_rgen_
static const std::vector< std::string > kTier
static const int kNumSessionSamplesInMessage
static const std::vector< string > kLabels
static const boost::random::uniform_int_distribution dLabels
static const int kMaxPorts
static void SetLoggingParams(bool enable_local_log, std::string category, std::string level, bool enable_trace_print=false, bool enable_flow_log=false, bool enable_session_syslog=false)
boost::asio::ip::address IpAddress
static const std::vector< std::string > kApplication
const int num_session_samples_per_sec_
bool stringToInteger(const std::string &str, NumberType &num)
std::map< SessionIpPortProtocol, SessionAggInfo > SessionAggMap
static const std::vector< std::string > kSite
static const std::string kVnPrefix
static const boost::random::uniform_int_distribution dDirection
static const int kFlowMsgIntvlInSec
static const boost::random::uniform_int_distribution dPort
std::vector< SessionEndpoint > sessions_
std::string Description() const
const int num_session_per_vm_
friend class SendMessageTask
int GetTaskId(const std::string &name)
static const std::vector< int > kPorts
static const int kUveMsgIntvlInSec
const std::vector< std::string > collectors_
static const int kBytesPerPacket
static const std::string integerToString(const NumberType &num)
std::string Description() const
const std::string hostname_
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
SendMessageTask(MockGenerator *mock_generator, int task_id, int task_instance)
const int num_vrouter_error_messages_per_sec_
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
static const std::vector< int > kProtocols
SendSessionTask(MockGenerator *mock_generator, int task_id, int task_instance)
static const int kOtherVnPktsPerSec
MockGenerator(std::string &hostname, std::string &module_name, std::string &node_type_name, std::string &instance_id, int http_server_port, int start_vn, int end_vn, int other_vn, int num_vns, int vm_iterations, std::vector< std::string > &collectors, std::vector< uint32_t > &ip_vns, int ip_start_index, int num_vrouter_error_messages_per_sec, int num_sessions_per_vm, int num_session_samples_per_sec, int num_session_samples_in_message, EventManager *evm)
static const boost::random::uniform_int_distribution dIps
boost::asio::ip::address_v4 Ip4Address
static const boost::random::uniform_int_distribution dTagIdx
static const int kNumSessionSamplesPerSec
const std::string module_name_
static const boost::random::uniform_int_distribution dBytesPerPacket
const int ip_start_index_
const std::string node_type_name_
static bool InitGenerator(const std::string &module, const std::string &source, const std::string &node_type, const std::string &instance_id, EventManager *evm, unsigned short http_port, const std::vector< std::string > &collectors, SandeshContext *client_context=NULL, DerivedStats ds=DerivedStats(), const SandeshConfig &config=SandeshConfig())
static const int kNumVRouterErrorMessagesPerSec
static uint64_t UTCTimestampUsec()
boost::random::mt19937 rgen_
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
static const std::string kVmPrefix
const std::string instance_id_
const int num_session_samples_in_message_
#define LOG(_Level, _Msg)
static const boost::random::uniform_int_distribution dOtherVnPktsPerSec
static void set_logger_appender(const std::string &file_name, long max_file_size, int max_backup_index, const std::string &syslog_facility, const std::vector< std::string > &destn, const std::string &ident, bool is_sampled_logger)
const std::vector< uint32_t > ip_vns_
static const boost::random::uniform_int_distribution dFlowPktsPerSec
const int http_server_port_
Task is a wrapper over tbb::task to support policies.
static const std::vector< std::string > kDeployment
static void set_send_to_collector_flags(const std::vector< std::string > &sampled_destination, const std::vector< std::string > &slo_destination)
static const boost::random::uniform_int_distribution dNPorts
static const boost::random::uniform_int_distribution dClientSession