ae.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /* A simple event-driven programming library. Originally I wrote this code
  2. * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
  3. * it in form of a library for easy reuse.
  4. *
  5. * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are met:
  10. *
  11. * * Redistributions of source code must retain the above copyright notice,
  12. * this list of conditions and the following disclaimer.
  13. * * Redistributions in binary form must reproduce the above copyright
  14. * notice, this list of conditions and the following disclaimer in the
  15. * documentation and/or other materials provided with the distribution.
  16. * * Neither the name of Redis nor the names of its contributors may be used
  17. * to endorse or promote products derived from this software without
  18. * specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  21. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  22. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  23. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  24. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  25. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  26. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  27. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  28. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  29. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  30. * POSSIBILITY OF SUCH DAMAGE.
  31. */
  32. #include "../inc/cs.h"
  33. #include "../inc/ae.h"
  34. /* Include the best multiplexing layer supported by this system.
  35. * The following should be ordered by performances, descending. */
  36. #ifdef HAVE_EVPORT
  37. #include "ev/ae_evport.c"
  38. #else
  39. #ifdef HAVE_EPOLL
  40. #include "ev/ae_epoll.c"
  41. #else
  42. #ifdef HAVE_KQUEUE
  43. #include "ev/ae_kqueue.c"
  44. #else
  45. #include "ev/ae_select.c"
  46. #endif
  47. #endif
  48. #endif
  49. CS_API aeEventLoop *aeCreateEventLoop(int setsize) {
  50. aeEventLoop *eventLoop;
  51. int i;
  52. if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
  53. eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
  54. eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
  55. if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
  56. eventLoop->setsize = setsize;
  57. eventLoop->lastTime = time(NULL);
  58. eventLoop->timeEventHead = NULL;
  59. eventLoop->timeEventNextId = 0;
  60. eventLoop->stop = 0;
  61. eventLoop->maxfd = -1;
  62. eventLoop->beforesleep = NULL;
  63. eventLoop->aftersleep = NULL;
  64. if (aeApiCreate(eventLoop) == -1) goto err;
  65. /* Events with mask == AE_NONE are not set. So let's initialize the
  66. * vector with it. */
  67. for (i = 0; i < setsize; i++)
  68. eventLoop->events[i].mask = AE_NONE;
  69. return eventLoop;
  70. err:
  71. if (eventLoop) {
  72. zfree(eventLoop->events);
  73. zfree(eventLoop->fired);
  74. zfree(eventLoop);
  75. }
  76. return NULL;
  77. }
  78. /* Return the current set size. */
  79. CS_API int aeGetSetSize(aeEventLoop *eventLoop) {
  80. return eventLoop->setsize;
  81. }
  82. /* Resize the maximum set size of the event loop.
  83. * If the requested set size is smaller than the current set size, but
  84. * there is already a file descriptor in use that is >= the requested
  85. * set size minus one, AE_ERR is returned and the operation is not
  86. * performed at all.
  87. *
  88. * Otherwise AE_OK is returned and the operation is successful. */
  89. CS_API int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
  90. int i;
  91. if (setsize == eventLoop->setsize) return AE_OK;
  92. if (eventLoop->maxfd >= setsize) return AE_ERR;
  93. if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
  94. eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
  95. eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
  96. eventLoop->setsize = setsize;
  97. /* Make sure that if we created new slots, they are initialized with
  98. * an AE_NONE mask. */
  99. for (i = eventLoop->maxfd+1; i < setsize; i++)
  100. eventLoop->events[i].mask = AE_NONE;
  101. return AE_OK;
  102. }
  103. CS_API void aeDeleteEventLoop(aeEventLoop *eventLoop) {
  104. aeApiFree(eventLoop);
  105. zfree(eventLoop->events);
  106. zfree(eventLoop->fired);
  107. zfree(eventLoop);
  108. }
  109. CS_API void aeStop(aeEventLoop *eventLoop) {
  110. eventLoop->stop = 1;
  111. }
  112. CS_API int aeCreateFileEvent(
  113. aeEventLoop *eventLoop,
  114. int fd,
  115. int mask,
  116. aeFileProc *proc,
  117. void *clientData) {
  118. if (fd >= eventLoop->setsize) {
  119. errno = ERANGE;
  120. return AE_ERR;
  121. }
  122. aeFileEvent *fe = &eventLoop->events[fd];
  123. if (aeApiAddEvent(eventLoop, fd, mask) == -1)
  124. return AE_ERR;
  125. fe->mask |= mask;
  126. if (mask & AE_READABLE) fe->rfileProc = proc;
  127. if (mask & AE_WRITABLE) fe->wfileProc = proc;
  128. fe->clientData = clientData;
  129. if (fd > eventLoop->maxfd)
  130. eventLoop->maxfd = fd;
  131. return AE_OK;
  132. }
  133. CS_API void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
  134. if (fd >= eventLoop->setsize) return;
  135. aeFileEvent *fe = &eventLoop->events[fd];
  136. if (fe->mask == AE_NONE) return;
  137. aeApiDelEvent(eventLoop, fd, mask);
  138. fe->mask = fe->mask & (~mask);
  139. if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
  140. /* Update the max fd */
  141. int j;
  142. for (j = eventLoop->maxfd-1; j >= 0; j--)
  143. if (eventLoop->events[j].mask != AE_NONE) break;
  144. eventLoop->maxfd = j;
  145. }
  146. }
  147. CS_API int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
  148. if (fd >= eventLoop->setsize) return 0;
  149. aeFileEvent *fe = &eventLoop->events[fd];
  150. return fe->mask;
  151. }
  152. static void aeGetTime(long *seconds, long *milliseconds) {
  153. #ifdef _WIN32
  154. struct _timeb tb;
  155. memset(&tb, 0, sizeof(struct _timeb));
  156. _ftime_s(&tb);
  157. (*seconds) = tb.time;
  158. (*milliseconds) = tb.millitm;
  159. #else
  160. struct timeval tv;
  161. gettimeofday(&tv, NULL);
  162. *seconds = tv.tv_sec;
  163. *milliseconds = tv.tv_usec/1000;
  164. #endif
  165. }
  166. static void aeAddMillisecondsToNow(
  167. long long milliseconds, long *sec, long *ms) {
  168. long cur_sec, cur_ms, when_sec, when_ms;
  169. aeGetTime(&cur_sec, &cur_ms);
  170. when_sec = cur_sec + milliseconds/1000;
  171. when_ms = cur_ms + milliseconds%1000;
  172. if (when_ms >= 1000) {
  173. when_sec ++;
  174. when_ms -= 1000;
  175. }
  176. *sec = when_sec;
  177. *ms = when_ms;
  178. }
  179. CS_API long long aeCreateTimeEvent(
  180. aeEventLoop *eventLoop,
  181. long long milliseconds,
  182. aeTimeProc *proc,
  183. void *clientData,
  184. aeEventFinalizerProc *finalizerProc) {
  185. long long id = eventLoop->timeEventNextId++;
  186. aeTimeEvent *te;
  187. te = zmalloc(sizeof(*te));
  188. if (te == NULL) return AE_ERR;
  189. te->id = id;
  190. aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
  191. te->timeProc = proc;
  192. te->finalizerProc = finalizerProc;
  193. te->clientData = clientData;
  194. te->next = eventLoop->timeEventHead;
  195. eventLoop->timeEventHead = te;
  196. return id;
  197. }
  198. CS_API int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) {
  199. aeTimeEvent *te = eventLoop->timeEventHead;
  200. while(te) {
  201. if (te->id == id) {
  202. te->id = AE_DELETED_EVENT_ID;
  203. return AE_OK;
  204. }
  205. te = te->next;
  206. }
  207. return AE_ERR; /* NO event with the specified ID found */
  208. }
  209. /* Search the first timer to fire.
  210. * This operation is useful to know how many time the select can be
  211. * put in sleep without to delay any event.
  212. * If there are no timers NULL is returned.
  213. *
  214. * Note that's O(N) since time events are unsorted.
  215. * Possible optimizations (not needed by Redis so far, but...):
  216. * 1) Insert the event in order, so that the nearest is just the head.
  217. * Much better but still insertion or deletion of timers is O(N).
  218. * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
  219. */
  220. static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) {
  221. aeTimeEvent *te = eventLoop->timeEventHead;
  222. aeTimeEvent *nearest = NULL;
  223. while(te) {
  224. if (!nearest || te->when_sec < nearest->when_sec ||
  225. (te->when_sec == nearest->when_sec &&
  226. te->when_ms < nearest->when_ms))
  227. nearest = te;
  228. te = te->next;
  229. }
  230. return nearest;
  231. }
  232. /* Process time events */
  233. static int processTimeEvents(aeEventLoop *eventLoop) {
  234. int processed = 0;
  235. aeTimeEvent *te, *prev;
  236. long long maxId;
  237. time_t now = time(NULL);
  238. /* If the system clock is moved to the future, and then set back to the
  239. * right value, time events may be delayed in a random way. Often this
  240. * means that scheduled operations will not be performed soon enough.
  241. *
  242. * Here we try to detect system clock skews, and force all the time
  243. * events to be processed ASAP when this happens: the idea is that
  244. * processing events earlier is less dangerous than delaying them
  245. * indefinitely, and practice suggests it is. */
  246. if (now < eventLoop->lastTime) {
  247. te = eventLoop->timeEventHead;
  248. while(te) {
  249. te->when_sec = 0;
  250. te = te->next;
  251. }
  252. }
  253. eventLoop->lastTime = now;
  254. prev = NULL;
  255. te = eventLoop->timeEventHead;
  256. maxId = eventLoop->timeEventNextId-1;
  257. while(te) {
  258. long now_sec, now_ms;
  259. long long id;
  260. /* Remove events scheduled for deletion. */
  261. if (te->id == AE_DELETED_EVENT_ID) {
  262. aeTimeEvent *next = te->next;
  263. if (prev == NULL)
  264. eventLoop->timeEventHead = te->next;
  265. else
  266. prev->next = te->next;
  267. if (te->finalizerProc)
  268. te->finalizerProc(eventLoop, te->clientData);
  269. zfree(te);
  270. te = next;
  271. continue;
  272. }
  273. /* Make sure we don't process time events created by time events in
  274. * this iteration. Note that this check is currently useless: we always
  275. * add new timers on the head, however if we change the implementation
  276. * detail, this check may be useful again: we keep it here for future
  277. * defense. */
  278. if (te->id > maxId) {
  279. te = te->next;
  280. continue;
  281. }
  282. aeGetTime(&now_sec, &now_ms);
  283. if (now_sec > te->when_sec ||
  284. (now_sec == te->when_sec && now_ms >= te->when_ms))
  285. {
  286. int retval;
  287. id = te->id;
  288. retval = te->timeProc(eventLoop, id, te->clientData);
  289. processed++;
  290. if (retval != AE_NOMORE) {
  291. aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
  292. } else {
  293. te->id = AE_DELETED_EVENT_ID;
  294. }
  295. }
  296. prev = te;
  297. te = te->next;
  298. }
  299. return processed;
  300. }
  301. /* Process every pending time event, then every pending file event
  302. * (that may be registered by time event callbacks just processed).
  303. * Without special flags the function sleeps until some file event
  304. * fires, or when the next time event occurs (if any).
  305. *
  306. * If flags is 0, the function does nothing and returns.
  307. * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
  308. * if flags has AE_FILE_EVENTS set, file events are processed.
  309. * if flags has AE_TIME_EVENTS set, time events are processed.
  310. * if flags has AE_DONT_WAIT set the function returns ASAP until all
  311. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
  312. * the events that's possible to process without to wait are processed.
  313. *
  314. * The function returns the number of events processed. */
  315. CS_API int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
  316. int processed = 0, numevents;
  317. /* Nothing to do? return ASAP */
  318. if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
  319. /* Note that we want call select() even if there are no
  320. * file events to process as long as we want to process time
  321. * events, in order to sleep until the next time event is ready
  322. * to fire. */
  323. if (eventLoop->maxfd != -1 ||
  324. ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
  325. int j;
  326. aeTimeEvent *shortest = NULL;
  327. struct timeval tv, *tvp;
  328. if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
  329. shortest = aeSearchNearestTimer(eventLoop);
  330. if (shortest) {
  331. long now_sec, now_ms;
  332. aeGetTime(&now_sec, &now_ms);
  333. tvp = &tv;
  334. /* How many milliseconds we need to wait for the next
  335. * time event to fire? */
  336. long long ms =
  337. (shortest->when_sec - now_sec)*1000 +
  338. shortest->when_ms - now_ms;
  339. if (ms > 0) {
  340. tvp->tv_sec = ms/1000;
  341. tvp->tv_usec = (ms % 1000)*1000;
  342. } else {
  343. tvp->tv_sec = 0;
  344. tvp->tv_usec = 0;
  345. }
  346. } else {
  347. /* If we have to check for events but need to return
  348. * ASAP because of AE_DONT_WAIT we need to set the timeout
  349. * to zero */
  350. if (flags & AE_DONT_WAIT) {
  351. tv.tv_sec = tv.tv_usec = 0;
  352. tvp = &tv;
  353. } else {
  354. /* Otherwise we can block */
  355. tvp = NULL; /* wait forever */
  356. }
  357. }
  358. /* Call the multiplexing API, will return only on timeout or when
  359. * some event fires. */
  360. numevents = aeApiPoll(eventLoop, tvp);
  361. /* After sleep callback. */
  362. if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
  363. eventLoop->aftersleep(eventLoop);
  364. for (j = 0; j < numevents; j++) {
  365. aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
  366. int mask = eventLoop->fired[j].mask;
  367. int fd = eventLoop->fired[j].fd;
  368. int rfired = 0;
  369. /* note the fe->mask & mask & ... code: maybe an already processed
  370. * event removed an element that fired and we still didn't
  371. * processed, so we check if the event is still valid. */
  372. if (fe->mask & mask & AE_READABLE) {
  373. rfired = 1;
  374. fe->rfileProc(eventLoop,fd,fe->clientData,mask);
  375. }
  376. if (fe->mask & mask & AE_WRITABLE) {
  377. if (!rfired || fe->wfileProc != fe->rfileProc)
  378. fe->wfileProc(eventLoop,fd,fe->clientData,mask);
  379. }
  380. processed++;
  381. }
  382. }
  383. /* Check time events */
  384. if (flags & AE_TIME_EVENTS)
  385. processed += processTimeEvents(eventLoop);
  386. return processed; /* return the number of processed file/time events */
  387. }
  388. /* Wait for milliseconds until the given file descriptor becomes
  389. * writable/readable/exception */
  390. CS_API int aeWait(int fd, int mask, long long milliseconds) {
  391. #ifdef _WIN32
  392. struct timeval tv;
  393. fd_set rfds, wfds, efds;
  394. int retmask = 0, retval;
  395. tv.tv_sec = (long)(milliseconds/1000);
  396. tv.tv_usec = (milliseconds%1000)*1000;
  397. FD_ZERO(&rfds);
  398. FD_ZERO(&wfds);
  399. FD_ZERO(&efds);
  400. if (mask & AE_READABLE) FD_SET(fd,&rfds);
  401. if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
  402. if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
  403. if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
  404. if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
  405. return retmask;
  406. } else {
  407. return retval;
  408. }
  409. #else
  410. struct pollfd pfd;
  411. int retmask = 0, retval;
  412. memset(&pfd, 0, sizeof(pfd));
  413. pfd.fd = fd;
  414. if (mask & AE_READABLE) pfd.events |= POLLIN;
  415. if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
  416. if ((retval = poll(&pfd, 1, milliseconds))== 1) {
  417. if (pfd.revents & POLLIN) retmask |= AE_READABLE;
  418. if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
  419. if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
  420. if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
  421. return retmask;
  422. } else {
  423. return retval;
  424. }
  425. #endif
  426. }
  427. CS_API void aeMain(aeEventLoop *eventLoop) {
  428. eventLoop->stop = 0;
  429. while (!eventLoop->stop) {
  430. if (eventLoop->beforesleep != NULL)
  431. eventLoop->beforesleep(eventLoop);
  432. aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
  433. }
  434. }
  435. CS_API char *aeGetApiName(void) {
  436. return aeApiName();
  437. }
  438. CS_API void aeSetBeforeSleepProc(
  439. aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
  440. eventLoop->beforesleep = beforesleep;
  441. }
  442. CS_API void aeSetAfterSleepProc(
  443. aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
  444. eventLoop->aftersleep = aftersleep;
  445. }