ae_evport.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. /* ae.c module for illumos event ports.
  2. *
  3. * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. *
  8. * * Redistributions of source code must retain the above copyright notice,
  9. * this list of conditions and the following disclaimer.
  10. * * Redistributions in binary form must reproduce the above copyright
  11. * notice, this list of conditions and the following disclaimer in the
  12. * documentation and/or other materials provided with the distribution.
  13. * * Neither the name of Redis nor the names of its contributors may be used
  14. * to endorse or promote products derived from this software without
  15. * specific prior written permission.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  18. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  21. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  22. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  23. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  24. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  25. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  26. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  27. * POSSIBILITY OF SUCH DAMAGE.
  28. */
  29. #include "../../inc/ae.h"
  30. #include <assert.h>
  31. #include <errno.h>
  32. #include <port.h>
  33. #include <poll.h>
  34. #include <sys/types.h>
  35. #include <sys/time.h>
  36. #include <stdio.h>
  37. static int evport_debug = 0;
  38. /*
  39. * This file implements the ae API using event ports, present on Solaris-based
  40. * systems since Solaris 10. Using the event port interface, we associate file
  41. * descriptors with the port. Each association also includes the set of poll(2)
  42. * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
  43. *
  44. * There's one tricky piece to this implementation: when we return events via
  45. * aeApiPoll, the corresponding file descriptors become dissociated from the
  46. * port. This is necessary because poll events are level-triggered, so if the
  47. * fd didn't become dissociated, it would immediately fire another event since
  48. * the underlying state hasn't changed yet. We must re-associate the file
  49. * descriptor, but only after we know that our caller has actually read from it.
  50. * The ae API does not tell us exactly when that happens, but we do know that
  51. * it must happen by the time aeApiPoll is called again. Our solution is to
  52. * keep track of the last fds returned by aeApiPoll and re-associate them next
  53. * time aeApiPoll is invoked.
  54. *
  55. * To summarize, in this module, each fd association is EITHER (a) represented
  56. * only via the in-kernel association OR (b) represented by pending_fds and
  57. * pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
  58. * and only until we enter aeApiPoll again (at which point we restore the
  59. * in-kernel association).
  60. */
  61. #define MAX_EVENT_BATCHSZ 512
  62. typedef struct aeApiState {
  63. int portfd; /* event port */
  64. int npending; /* # of pending fds */
  65. int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */
  66. int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */
  67. } aeApiState;
  68. static int aeApiCreate(aeEventLoop *eventLoop) {
  69. int i;
  70. aeApiState *state = zmalloc(sizeof(aeApiState));
  71. if (!state) return -1;
  72. state->portfd = port_create();
  73. if (state->portfd == -1) {
  74. zfree(state);
  75. return -1;
  76. }
  77. state->npending = 0;
  78. for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
  79. state->pending_fds[i] = -1;
  80. state->pending_masks[i] = AE_NONE;
  81. }
  82. eventLoop->apidata = state;
  83. return 0;
  84. }
  85. static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
  86. /* Nothing to resize here. */
  87. return 0;
  88. }
  89. static void aeApiFree(aeEventLoop *eventLoop) {
  90. aeApiState *state = eventLoop->apidata;
  91. close(state->portfd);
  92. zfree(state);
  93. }
  94. static int aeApiLookupPending(aeApiState *state, int fd) {
  95. int i;
  96. for (i = 0; i < state->npending; i++) {
  97. if (state->pending_fds[i] == fd)
  98. return (i);
  99. }
  100. return (-1);
  101. }
  102. /*
  103. * Helper function to invoke port_associate for the given fd and mask.
  104. */
  105. static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
  106. int events = 0;
  107. int rv, err;
  108. if (mask & AE_READABLE)
  109. events |= POLLIN;
  110. if (mask & AE_WRITABLE)
  111. events |= POLLOUT;
  112. if (evport_debug)
  113. fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
  114. rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
  115. (void *)(uintptr_t)mask);
  116. err = errno;
  117. if (evport_debug)
  118. fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
  119. if (rv == -1) {
  120. fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
  121. if (err == EAGAIN)
  122. fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
  123. }
  124. return rv;
  125. }
  126. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  127. aeApiState *state = eventLoop->apidata;
  128. int fullmask, pfd;
  129. if (evport_debug)
  130. fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
  131. /*
  132. * Since port_associate's "events" argument replaces any existing events, we
  133. * must be sure to include whatever events are already associated when
  134. * we call port_associate() again.
  135. */
  136. fullmask = mask | eventLoop->events[fd].mask;
  137. pfd = aeApiLookupPending(state, fd);
  138. if (pfd != -1) {
  139. /*
  140. * This fd was recently returned from aeApiPoll. It should be safe to
  141. * assume that the consumer has processed that poll event, but we play
  142. * it safer by simply updating pending_mask. The fd will be
  143. * re-associated as usual when aeApiPoll is called again.
  144. */
  145. if (evport_debug)
  146. fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
  147. state->pending_masks[pfd] |= fullmask;
  148. return 0;
  149. }
  150. return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
  151. }
  152. static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
  153. aeApiState *state = eventLoop->apidata;
  154. int fullmask, pfd;
  155. if (evport_debug)
  156. fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
  157. pfd = aeApiLookupPending(state, fd);
  158. if (pfd != -1) {
  159. if (evport_debug)
  160. fprintf(stderr, "deleting event from pending fd %d\n", fd);
  161. /*
  162. * This fd was just returned from aeApiPoll, so it's not currently
  163. * associated with the port. All we need to do is update
  164. * pending_mask appropriately.
  165. */
  166. state->pending_masks[pfd] &= ~mask;
  167. if (state->pending_masks[pfd] == AE_NONE)
  168. state->pending_fds[pfd] = -1;
  169. return;
  170. }
  171. /*
  172. * The fd is currently associated with the port. Like with the add case
  173. * above, we must look at the full mask for the file descriptor before
  174. * updating that association. We don't have a good way of knowing what the
  175. * events are without looking into the eventLoop state directly. We rely on
  176. * the fact that our caller has already updated the mask in the eventLoop.
  177. */
  178. fullmask = eventLoop->events[fd].mask;
  179. if (fullmask == AE_NONE) {
  180. /*
  181. * We're removing *all* events, so use port_dissociate to remove the
  182. * association completely. Failure here indicates a bug.
  183. */
  184. if (evport_debug)
  185. fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
  186. if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
  187. perror("aeApiDelEvent: port_dissociate");
  188. abort(); /* will not return */
  189. }
  190. } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
  191. fullmask) != 0) {
  192. /*
  193. * ENOMEM is a potentially transient condition, but the kernel won't
  194. * generally return it unless things are really bad. EAGAIN indicates
  195. * we've reached an resource limit, for which it doesn't make sense to
  196. * retry (counter-intuitively). All other errors indicate a bug. In any
  197. * of these cases, the best we can do is to abort.
  198. */
  199. abort(); /* will not return */
  200. }
  201. }
  202. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  203. aeApiState *state = eventLoop->apidata;
  204. struct timespec timeout, *tsp;
  205. int mask, i;
  206. uint_t nevents;
  207. port_event_t event[MAX_EVENT_BATCHSZ];
  208. /*
  209. * If we've returned fd events before, we must re-associate them with the
  210. * port now, before calling port_get(). See the block comment at the top of
  211. * this file for an explanation of why.
  212. */
  213. for (i = 0; i < state->npending; i++) {
  214. if (state->pending_fds[i] == -1)
  215. /* This fd has since been deleted. */
  216. continue;
  217. if (aeApiAssociate("aeApiPoll", state->portfd,
  218. state->pending_fds[i], state->pending_masks[i]) != 0) {
  219. /* See aeApiDelEvent for why this case is fatal. */
  220. abort();
  221. }
  222. state->pending_masks[i] = AE_NONE;
  223. state->pending_fds[i] = -1;
  224. }
  225. state->npending = 0;
  226. if (tvp != NULL) {
  227. timeout.tv_sec = tvp->tv_sec;
  228. timeout.tv_nsec = tvp->tv_usec * 1000;
  229. tsp = &timeout;
  230. } else {
  231. tsp = NULL;
  232. }
  233. /*
  234. * port_getn can return with errno == ETIME having returned some events (!).
  235. * So if we get ETIME, we check nevents, too.
  236. */
  237. nevents = 1;
  238. if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
  239. tsp) == -1 && (errno != ETIME || nevents == 0)) {
  240. if (errno == ETIME || errno == EINTR)
  241. return 0;
  242. /* Any other error indicates a bug. */
  243. perror("aeApiPoll: port_get");
  244. abort();
  245. }
  246. state->npending = nevents;
  247. for (i = 0; i < nevents; i++) {
  248. mask = 0;
  249. if (event[i].portev_events & POLLIN)
  250. mask |= AE_READABLE;
  251. if (event[i].portev_events & POLLOUT)
  252. mask |= AE_WRITABLE;
  253. eventLoop->fired[i].fd = event[i].portev_object;
  254. eventLoop->fired[i].mask = mask;
  255. if (evport_debug)
  256. fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
  257. (int)event[i].portev_object, mask);
  258. state->pending_fds[i] = event[i].portev_object;
  259. state->pending_masks[i] = (uintptr_t)event[i].portev_user;
  260. }
  261. return nevents;
  262. }
  263. static char *aeApiName(void) {
  264. return "evport";
  265. }