Revised OpenFlow Library  v0.6.0dev
 All Classes Files Functions Variables Friends Groups Pages
ciosrv.h
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 #ifndef CIOSRV_H
6 #define CIOSRV_H
7 
8 #include <set>
9 #include <list>
10 #include <bitset>
11 #include <utility>
12 #include <map>
13 #include <algorithm>
14 #include <vector>
15 #include <sys/time.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <sys/select.h>
19 #include <errno.h>
20 #include <time.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <signal.h>
25 #include <sys/time.h>
26 #include <sys/resource.h>
27 #include <sys/stat.h>
28 
29 
30 #include "rofl/common/cmemory.h"
31 #include "rofl/common/logging.h"
32 #include "rofl/common/croflexception.h"
33 #include "rofl/common/thread_helper.h"
34 #include "rofl/common/cpipe.h"
35 #include "rofl/common/cevents.h"
36 #include "rofl/common/ctimers.h"
37 #include "rofl/common/ctimer.h"
38 
39 namespace rofl {
40 
41 class ciosrv;
42 
49 class cioloop {
50 public:
51 
56  static cioloop&
57  get_loop(pthread_t tid = 0) {
58  if (0 == tid) {
59  tid = pthread_self();
60  }
61  if (cioloop::threads.find(tid) == cioloop::threads.end()) {
62  cioloop::threads[tid] = new cioloop(tid);
63  }
64  return *(cioloop::threads[tid]);
65  };
66 
70  static void
71  drop_loop(pthread_t tid) {
72  if (cioloop::threads.find(tid) == cioloop::threads.end()) {
73  return;
74  }
75  delete cioloop::threads[tid];
76  cioloop::threads.erase(tid);
77  };
78 
79 public:
80 
84  void
85  run() {
86  cioloop::get_loop().run_loop();
87  };
88 
92  void
93  stop() {
94  if (cioloop::threads.find(tid) == cioloop::threads.end()) {
95  return;
96  }
97  cioloop::threads[tid]->keep_on_running = false;
98  {
99  RwLock lock(rfds_rwlock, RwLock::RWLOCK_WRITE);
100  rfds.clear();
101  }
102  {
103  RwLock lock(wfds_rwlock, RwLock::RWLOCK_WRITE);
104  wfds.clear();
105  }
106  {
107  RwLock lock(events_rwlock, RwLock::RWLOCK_WRITE);
108  events.clear();
109  }
110  {
111  RwLock lock(timers_rwlock, RwLock::RWLOCK_WRITE);
112  timers.clear();
113  }
114  if (tid != pthread_self()) {
115  //pipe.writemsg('1'); // wakeup main loop, just in case
116  }
117  };
118 
122  void
124  for (std::map<pthread_t, cioloop*>::iterator
125  it = cioloop::threads.begin(); it != cioloop::threads.end(); ++it) {
126  delete it->second;
127  }
128  cioloop::threads.clear();
129  };
130 
134  pthread_t
135  get_tid() const { return tid; };
136 
137 protected:
138 
139  friend class ciosrv;
140 
144  void
145  add_readfd(ciosrv* iosrv, int fd) {
146  RwLock lock(rfds_rwlock, RwLock::RWLOCK_WRITE);
147  rfds[fd] = iosrv;
148 
149  minrfd = (minrfd > (unsigned int)(fd+0)) ? (unsigned int)(fd+0) : minrfd;
150  maxrfd = (maxrfd < (unsigned int)(fd+1)) ? (unsigned int)(fd+1) : maxrfd;
151 
152  if (tid != pthread_self()) {
153  pipe.writemsg('1'); // wakeup main loop, just in case
154  }
155  };
156 
160  void
161  drop_readfd(ciosrv* iosrv, int fd) {
162  RwLock lock(rfds_rwlock, RwLock::RWLOCK_WRITE);
163  rfds[fd] = NULL;
164 
165  if (minrfd == (unsigned int)(fd+0)) {
166  minrfd = rfds.size();
167  for (unsigned int i = fd + 1; i < rfds.size(); i++) {
168  if (rfds[i] != NULL) {
169  minrfd = i;
170  break;
171  }
172  }
173  }
174 
175  if (maxrfd == (unsigned int)(fd+1)) {
176  maxrfd = 0;
177  for (unsigned int i = fd; i > 0; i--) {
178  if (rfds[i] != NULL) {
179  maxrfd = (i+1);
180  break;
181  }
182  }
183  }
184 
185  if (tid != pthread_self()) {
186  pipe.writemsg('1'); // wakeup main loop, just in case
187  }
188  };
189 
193  void
194  add_writefd(ciosrv* iosrv, int fd) {
195  RwLock lock(wfds_rwlock, RwLock::RWLOCK_WRITE);
196  wfds[fd] = iosrv;
197 
198  minwfd = (minwfd > (unsigned int)(fd+0)) ? (unsigned int)(fd+0) : minwfd;
199  maxwfd = (maxwfd < (unsigned int)(fd+1)) ? (unsigned int)(fd+1) : maxwfd;
200 
201  if (tid != pthread_self()) {
202  pipe.writemsg('1'); // wakeup main loop, just in case
203  }
204  };
205 
209  void
210  drop_writefd(ciosrv* iosrv, int fd) {
211  RwLock lock(wfds_rwlock, RwLock::RWLOCK_WRITE);
212  wfds[fd] = NULL;
213 
214  if (minwfd == (unsigned int)(fd+0)) {
215  minwfd = wfds.size();
216  for (unsigned int i = fd + 1; i < wfds.size(); i++) {
217  if (wfds[i] != NULL) {
218  minwfd = i;
219  break;
220  }
221  }
222  }
223 
224  if (maxwfd == (unsigned int)(fd+1)) {
225  maxwfd = 0;
226  for (unsigned int i = fd; i > 0; i--) {
227  if (wfds[i] != NULL) {
228  maxwfd = (i+1);
229  break;
230  }
231  }
232  }
233 
234  if (tid != pthread_self()) {
235  pipe.writemsg('1'); // wakeup main loop, just in case
236  }
237  };
238 
242  void
243  has_timer(ciosrv* iosrv) {
244  RwLock lock(timers_rwlock, RwLock::RWLOCK_WRITE);
245  timers[iosrv] = true;
246  if (tid != pthread_self()) {
247  pipe.writemsg('1'); // wakeup main loop, just in case
248  }
249  };
250 
254  void
255  has_no_timer(ciosrv *iosrv) {
256  RwLock lock(timers_rwlock, RwLock::RWLOCK_WRITE);
257  timers[iosrv] = false;
258  };
259 
263  void
264  has_event(ciosrv* iosrv) {
265  RwLock lock(events_rwlock, RwLock::RWLOCK_WRITE);
266  events[iosrv] = true;
267  if (tid != pthread_self()) {
268  pipe.writemsg('1'); // wakeup main loop, just in case
269  }
270  };
271 
275  void
276  has_no_event(ciosrv* iosrv) {
277  RwLock lock(events_rwlock, RwLock::RWLOCK_WRITE);
278  events[iosrv] = false;
279  };
280 
281 private:
282 
286  cioloop(pthread_t tid = 0) :
287  tid(tid),
288  keep_on_running(false) {
289 
290  if (0 == tid) {
291  this->tid = pthread_self();
292  }
293 
294  struct rlimit rlim;
295  if (getrlimit(RLIMIT_NOFILE, &rlim) < 0) {
296  throw eSysCall("getrlimit()");
297  }
298  RwLock rlock(rfds_rwlock, RwLock::RWLOCK_WRITE);
299  RwLock wlock(wfds_rwlock, RwLock::RWLOCK_WRITE);
300  for (unsigned int i = 0; i < rlim.rlim_cur; i++) {
301  rfds.push_back(NULL);
302  wfds.push_back(NULL);
303  }
304  minrfd = rfds.size();
305  maxrfd = 0;
306  minwfd = wfds.size();
307  maxwfd = 0;
308  };
309 
313  virtual
314  ~cioloop() {};
315 
319  cioloop(cioloop const& t) {
320  *this = t;
321  };
322 
326  cioloop&
327  operator= (cioloop const& t) {
328  if (this == &t)
329  return *this;
330  return *this;
331  };
332 
336  void
337  run_loop();
338 
339 
344  static void
345  child_sig_handler (int x);
346 
347 
348 
349 public:
350 
351  friend std::ostream&
352  operator<< (std::ostream& os, cioloop const& ioloop) {
353  os << indent(0) << "<cioloop tid:0x"
354  << std::hex << ioloop.get_tid() << std::dec << ">" << std::endl;
355 
356  {
357  RwLock lock(ioloop.rfds_rwlock, RwLock::RWLOCK_READ);
358  os << indent(2) << "<read-fds: ";
359  for (unsigned int i = 0; i < ioloop.rfds.size(); ++i) {
360  if (NULL != ioloop.rfds.at(i)) {
361  os << i << " ";
362  }
363  }
364  os << ">" << std::endl;
365  }
366 
367  {
368  RwLock lock(ioloop.wfds_rwlock, RwLock::RWLOCK_READ);
369  os << indent(2) << "<write-fds: ";
370  for (unsigned int i = 0; i < ioloop.wfds.size(); ++i) {
371  if (NULL != ioloop.wfds.at(i)) {
372  os << i << " ";
373  }
374  }
375  os << ">" << std::endl;
376  }
377 
378  {
379  RwLock lock(ioloop.timers_rwlock, RwLock::RWLOCK_READ);
380  os << indent(2) << "<instances with timer needs: ";
381  // locking?
382  for (std::map<ciosrv*, bool>::const_iterator
383  it = ioloop.timers.begin(); it != ioloop.timers.end(); ++it) {
384  os << it->first << ":" << it->second << " ";
385  }
386  os << ">" << std::endl;
387  }
388 
389  {
390  RwLock lock(ioloop.events_rwlock, RwLock::RWLOCK_READ);
391  os << indent(2) << "<instances with event needs: ";
392  // locking?
393  for (std::map<ciosrv*, bool>::const_iterator
394  it = ioloop.events.begin(); it != ioloop.events.end(); ++it) {
395  os << it->first << ":" << it->second << " ";
396  }
397  os << ">" << std::endl;
398  }
399 
400  return os;
401  };
402 
403 private:
404 
405  static PthreadRwLock threads_rwlock;
406  static std::map<pthread_t, cioloop*> threads;
407 
408  std::vector<ciosrv*> rfds;
409  mutable PthreadRwLock rfds_rwlock;
410  std::vector<ciosrv*> wfds;
411  mutable PthreadRwLock wfds_rwlock;
412  std::map<ciosrv*, bool> timers;
413  mutable PthreadRwLock timers_rwlock;
414  std::map<ciosrv*, bool> events;
415  mutable PthreadRwLock events_rwlock;
416 
417  cpipe pipe;
418  pthread_t tid;
419  bool keep_on_running;
420 
421  unsigned int minrfd; // lowest set readfd
422  unsigned int maxrfd; // highest set readfd
423  unsigned int minwfd; // lowest set writefd
424  unsigned int maxwfd; // highest set writefd
425 };
426 
427 
428 
429 
491 class ciosrv : public ctimer_env {
492 public:
493 
497  ciosrv(pthread_t tid = 0);
498 
502  virtual
503  ~ciosrv();
504 
508  ciosrv(const ciosrv& iosrv);
509 
513  ciosrv&
514  operator= (const ciosrv& iosrv);
515 
516 public:
517 
528  void
529  notify(const cevent& event) {
530  events.add_event(event);
531  cioloop::get_loop(get_thread_id()).has_event(this);
532  };
533 
539  pthread_t
541  { return tid; };
542 
543 protected:
544 
563  virtual void
565  const rofl::cevent& event)
566  {};
567 
575  virtual void
577  int fd)
578  {};
579 
587  virtual void
589  {};
590 
598  virtual void
600  {};
601 
610  virtual void
611  handle_timeout(int opaque, void *data = (void*)0)
612  {};
613 
616 protected:
617 
631  void
633  rfds.insert(fd);
634  cioloop::get_loop(get_thread_id()).add_readfd(this, fd);
635  };
636 
642  void
644  rfds.erase(fd);
645  cioloop::get_loop(get_thread_id()).drop_readfd(this, fd);
646  };
647 
655  void
657  wfds.insert(fd);
658  cioloop::get_loop(get_thread_id()).add_writefd(this, fd);
659  };
660 
666  void
668  wfds.erase(fd);
669  cioloop::get_loop(get_thread_id()).drop_writefd(this, fd);
670  };
671 
674 protected:
675 
679  ctimer
680  get_next_timer() {
681  return timers.get_next_timer();
682  };
683 
697  const rofl::ctimerid&
698  register_timer(int opaque, const rofl::ctimespec& timespec) {
699  if (timers.empty() || (get_thread_id() != pthread_self()))
700  cioloop::get_loop().has_timer(this);
701  return timers.add_timer(ctimer(this, opaque, timespec));
702  };
703 
713  const rofl::ctimerid&
714  reset_timer(const rofl::ctimerid& timer_id, const rofl::ctimespec& timespec) {
715  if (timers.empty() || (get_thread_id() != pthread_self()))
716  cioloop::get_loop().has_timer(this);
717  return timers.reset(timer_id, timespec);
718  };
719 
732  ctimerid&
733  restart_timer(rofl::ctimerid& timer_id, int opaque, const rofl::ctimespec& timespec) {
734  if (pending_timer(timer_id)) {
735  timer_id = reset_timer(timer_id, timespec);
736  } else {
737  timer_id = register_timer(opaque, timespec);
738  }
739  return timer_id;
740  };
741 
748  bool
749  pending_timer(const rofl::ctimerid& timer_id) {
750  return timers.pending(timer_id);
751  };
752 
758  void
759  cancel_timer(const rofl::ctimerid& timer_id) {
760  timers.cancel(timer_id);
761  if (timers.empty())
762  cioloop::get_loop().has_no_timer(this);
763  };
764 
769  void
771  timers.clear();
772  cioloop::get_loop().has_no_timer(this);
773  };
774 
779  void
781  events.clear();
782  cioloop::get_loop().has_no_event(this);
783  };
784 
787 private:
788 
789  friend class cioloop;
790 
794  void
795  __handle_event();
796 
800  void
801  __handle_revent(int fd) {
802  try {
803  handle_revent(fd);
804  } catch (eEventsNotFound& e) {/*do nothing*/}
805  };
806 
810  void
811  __handle_timeout() {
812  try {
813  ctimer timer = timers.get_expired_timer();
814  logging::trace << "[rofl-common][ciosrv][handle-timeout] timer: " << std::endl << timer;
815  handle_timeout(timer.get_opaque());
816  } catch (eTimersNotFound& e) {/*do nothing*/}
817  };
818 
819 public:
820 
821  friend std::ostream&
822  operator<< (std::ostream& os, const ciosrv& iosvc) {
823  os << indent(0) << "<ciosrv >" << std::endl;
824  os << indent(2) << "<rfds: ";
825  for (std::set<int>::const_iterator it = iosvc.rfds.begin(); it != iosvc.rfds.end(); ++it) {
826  os << (*it) << " ";
827  }
828  os << ">" << std::endl;
829  os << indent(2) << "<wfds: ";
830  for (std::set<int>::const_iterator it = iosvc.wfds.begin(); it != iosvc.wfds.end(); ++it) {
831  os << (*it) << " ";
832  }
833  os << ">" << std::endl;
834 
835  { indent i(2); os << iosvc.timers; }
836  { indent i(2); os << iosvc.events; }
837  return os;
838  };
839 
840 private:
841 
842  static PthreadRwLock ciolist_rwlock;
843  static std::set<ciosrv*> ciolist;
844 
845  pthread_t tid;
846  std::set<int> rfds;
847  std::set<int> wfds;
848  ctimers timers;
849  cevents events;
850 };
851 
852 
853 
854 
855 
856 }; // end of namespace
857 
858 #endif
const rofl::ctimerid & reset(const rofl::ctimerid &timer_id, const rofl::ctimespec &timespec)
Resets an existing timer identifier by its handle with a new timeout value.
Definition: ctimers.h:134
void cancel_timer(const rofl::ctimerid &timer_id)
Cancels a pending timer.
Definition: ciosrv.h:759
void clear()
Removes all timers from this timer list.
Definition: ctimers.h:214
void deregister_filedesc_w(int fd)
Deregisters a file descriptor from write events.
Definition: ciosrv.h:667
void stop()
Terminates cioloop instance running in thread identified by this->tid.
Definition: ciosrv.h:93
pthread_t get_thread_id() const
Returns thread-id of local thread.
Definition: ciosrv.h:540
Timer handle used by class rofl::cioloop.
Definition: ctimerid.h:21
void shutdown()
Terminates all running cioloop instances.
Definition: ciosrv.h:123
void register_filedesc_r(int fd)
Registers a file descriptor for read events.
Definition: ciosrv.h:632
void notify(const cevent &event)
Sends a notification to this ciosrv instance.
Definition: ciosrv.h:529
void cancel_all_events()
Cancels all pending events of this instance.
Definition: ciosrv.h:780
Single timer object in rofl-common.
Definition: ctimer.h:45
Environment expected by an instance of class rofl::ctimer.
Definition: ctimer.h:27
static cioloop & get_loop(pthread_t tid=0)
Returns reference to cioloop instance identified by thread id or if none is specified, cioloop of local thread.
Definition: ciosrv.h:57
virtual ~ciosrv()
Deallocates resources for this ciosrv object.
Definition: ciosrv.cc:46
void cancel_all_timers()
Cancels all pending timers of this instance.
Definition: ciosrv.h:770
void register_filedesc_w(int fd)
Registers a file descriptor for write events.
Definition: ciosrv.h:656
void cancel(const ctimerid &timer_id)
Removes a timer identified by the given timer handle from this timer list.
Definition: ctimers.h:200
virtual void handle_event(const rofl::cevent &event)
Handler for event notifications using cevent instances.
Definition: ciosrv.h:564
const rofl::ctimerid & reset_timer(const rofl::ctimerid &timer_id, const rofl::ctimespec &timespec)
Resets a running timer of type opaque.
Definition: ciosrv.h:714
virtual void handle_timeout(int opaque, void *data=(void *) 0)
Handler for timer events.
Definition: ciosrv.h:611
rofl::ctimer get_expired_timer()
Returns the next timer from timer list, when it has already expired.
Definition: ctimers.h:157
bool empty()
Deletes all timers in this timer list.
Definition: ctimers.h:88
const rofl::ctimerid & add_timer(const rofl::ctimer &t)
Inserts a new timer into the timer list.
Definition: ctimers.h:114
bool pending(const rofl::ctimerid &timer_id)
Checks whether a certain timer identified by the given handle is still pending.
Definition: ctimers.h:184
const rofl::ctimerid & register_timer(int opaque, const rofl::ctimespec &timespec)
Installs a new timer to fire in t seconds.
Definition: ciosrv.h:698
virtual void handle_revent(int fd)
Handler for read events on file descriptors.
Definition: ciosrv.h:576
Definition: thread_helper.h:88
bool pending_timer(const rofl::ctimerid &timer_id)
Checks for a pending timer of type opaque.
Definition: ciosrv.h:749
Single event used internally by class crofl::cioloop.
Definition: cevent.h:20
ciosrv(pthread_t tid=0)
Initializes all structures for this ciosrv object.
Definition: ciosrv.cc:31
ctimerid & restart_timer(rofl::ctimerid &timer_id, int opaque, const rofl::ctimespec &timespec)
Resets an existing or creates a new timer.
Definition: ciosrv.h:733
Defines an IO service loop for a single thread.
Definition: ciosrv.h:49
Base class for IO services.
Definition: ciosrv.h:491
Time specification used by class rofl::ctimer object.
Definition: ctimespec.h:25
rofl::ctimer get_next_timer()
Returns a copy of the next expiring timer in this timer list.
Definition: ctimers.h:99
static void drop_loop(pthread_t tid)
Drop.
Definition: ciosrv.h:71
void deregister_filedesc_r(int fd)
Deregisters a file descriptor from read events.
Definition: ciosrv.h:643
virtual void handle_xevent(int fd)
Handler for exceptions on file descriptors.
Definition: ciosrv.h:599
virtual void handle_wevent(int fd)
Handler for write events on file descriptors.
Definition: ciosrv.h:588