[sr-dev] git:andrei/tcp_tls_changes: io_wait: fix kqueue io_wait_add & POLLIN

Andrei Pelinescu-Onciul andrei at iptel.org
Fri Jun 18 15:10:33 CEST 2010


Module: sip-router
Branch: andrei/tcp_tls_changes
Commit: d01e95b109558e283d91674858af959db9f83f59
URL:    http://git.sip-router.org/cgi-bin/gitweb.cgi/sip-router/?a=commit;h=d01e95b109558e283d91674858af959db9f83f59

Author: Andrei Pelinescu-Onciul <andrei at iptel.org>
Committer: Andrei Pelinescu-Onciul <andrei at iptel.org>
Date:   Fri Jun 18 15:01:01 2010 +0200

io_wait: fix kqueue io_wait_add & POLLIN

A "goto error" was placed outside the error handling "if",
resulting in any io_watch_add(), that tried to enable write
watching on a new FD, returning failure (fortunately this kind
of io_watch_add() usage doesn't happen very often, usually write
watch is enabled via io_watch_chg() on FDs already
io_watch_add()'ed for reading).

Only POLL_KQUEUE was affect by this bug, meaning the default on
all *bsd and darwin.

---

 io_wait.h |  105 +++++++++++++++++++++++++++++++++++++------------------------
 1 files changed, 64 insertions(+), 41 deletions(-)

diff --git a/io_wait.h b/io_wait.h
index 01df1e6..e04efd8 100644
--- a/io_wait.h
+++ b/io_wait.h
@@ -148,7 +148,9 @@ struct io_wait_handler{
 	int flags;
 	struct fd_map* fd_hash;
 	int fd_no; /*  current index used in fd_array and the passed size for 
-				   ep_array & kq_array*/
+				   ep_array (for kq_array at least
+				    max(twice the size, kq_changes_size) should be
+				   be passed). */
 	int max_fd_no; /* maximum fd no, is also the size of fd_array,
 						       fd_hash  and ep_array*/
 	/* common stuff for POLL, SIGIO_RT and SELECT
@@ -170,6 +172,7 @@ struct io_wait_handler{
 	struct kevent* kq_array;   /* used for the eventlist*/
 	struct kevent* kq_changes; /* used for the changelist */
 	size_t kq_nchanges;
+	size_t kq_array_size;   /* array size */
 	size_t kq_changes_size; /* size of the changes array */
 #endif
 #ifdef HAVE_DEVPOLL
@@ -259,34 +262,33 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
 again:
 		n=kevent(h->kq_fd, h->kq_changes, h->kq_nchanges, 0, 0, &tspec);
 		if (unlikely(n == -1)){
-			if (likely(errno == EBADF)) {
+			if (unlikely(errno == EINTR)) goto again;
+			else {
+				/* for a detailed explanation of what follows see
+				   io_wait_loop_kqueue EV_ERROR case */
+				if (unlikely(!(errno == EBADF || errno == ENOENT)))
+					BUG("kq_ev_change: kevent flush changes failed"
+							" (unexpected error): %s [%d]\n",
+							strerror(errno), errno);
+					/* ignore error even if it's not a EBADF/ENOENT */
 				/* one of the file descriptors is bad, probably already
 				   closed => try to apply changes one-by-one */
 				for (r = 0; r < h->kq_nchanges; r++) {
 retry2:
 					n = kevent(h->kq_fd, &h->kq_changes[r], 1, 0, 0, &tspec);
 					if (n==-1) {
-						if (errno == EBADF)
-							continue; /* skip over it */
-						if (errno == EINTR)
+						if (unlikely(errno == EINTR))
 							goto retry2;
-						LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
-									" failed: %s [%d]\n",
-										strerror(errno), errno);
-						/* shift the array */
-						memmove(&h->kq_changes[0], &h->kq_changes[r+1],
-									sizeof(h->kq_changes[0])*
-										(h->kq_nchanges-r-1));
-						h->kq_nchanges-=(r+1);
-						return -1;
+					/* for a detailed explanation of what follows see
+						io_wait_loop_kqueue EV_ERROR case */
+						if (unlikely(!(errno == EBADF || errno == ENOENT)))
+							BUG("kq_ev_change: kevent flush changes failed:"
+									" (unexpected error) %s [%d] (%d/%d)\n",
+										strerror(errno), errno,
+										r, h->kq_nchanges);
+						continue; /* skip over it */
 					}
 				}
-			} else if (errno == EINTR) goto again;
-			else {
-				LOG(L_ERR, "ERROR: io_watch_add: kevent flush changes"
-						" failed: %s [%d]\n", strerror(errno), errno);
-				h->kq_nchanges=0; /* reset changes array */
-				return -1;
 			}
 		}
 		h->kq_nchanges=0; /* changes array is empty */
@@ -499,7 +501,7 @@ again2:
 		case POLL_KQUEUE:
 			if (likely( events & POLLIN)){
 				if (unlikely(kq_ev_change(h, fd, EVFILT_READ, EV_ADD, e)==-1))
-				goto error;
+					goto error;
 			}
 			if (unlikely( events & POLLOUT)){
 				if (unlikely(kq_ev_change(h, fd, EVFILT_WRITE, EV_ADD, e)==-1))
@@ -507,8 +509,8 @@ again2:
 					if (likely(events & POLLIN)){
 						kq_ev_change(h, fd, EVFILT_READ, EV_DELETE, 0);
 					}
+					goto error;
 				}
-				goto error;
 			}
 			break;
 #endif
@@ -1116,19 +1118,20 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
 	do {
 again:
 		n=kevent(h->kq_fd, h->kq_changes, apply_changes,  h->kq_array,
-					h->fd_no, &tspec);
+					h->kq_array_size, &tspec);
 		if (unlikely(n==-1)){
-			if (errno==EINTR) goto again; /* signal, ignore it */
-			else if (errno==EBADF) {
+			if (unlikely(errno==EINTR)) goto again; /* signal, ignore it */
+			else {
+				/* for a detailed explanation of what follows see below
+				   the EV_ERROR case */
+				if (unlikely(!(errno==EBADF || errno==ENOENT)))
+					BUG("io_wait_loop_kqueue: kevent: unexpected error"
+						" %s [%d]\n", strerror(errno), errno);
 				/* some of the FDs in kq_changes are bad (already closed)
 				   and there is not enough space in kq_array to return all
 				   of them back */
-				apply_changes = h->fd_no;
+				apply_changes = h->kq_array_size;
 				goto again;
-			}else{
-				LOG(L_ERR, "ERROR: io_wait_loop_kqueue: kevent:"
-						" %s [%d]\n", strerror(errno), errno);
-				goto error;
 			}
 		}
 		/* remove applied changes */
@@ -1148,14 +1151,13 @@ again:
 					r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
 					h->kq_array[r].flags);
 #endif
-			if (unlikely((h->kq_array[r].flags & EV_ERROR) &&
-							(h->kq_array[r].data == EBADF ||
-							 h->kq_array[r].udata == 0))){
+			if (unlikely((h->kq_array[r].flags & EV_ERROR) ||
+							 h->kq_array[r].udata == 0)){
 				/* error in changes: we ignore it if it has to do with a
 				   bad fd or update==0. It can be caused by trying to remove an
 				   already closed fd: race between adding something to the
-				   changes array, close() and applying the changes.
-				   E.g. for ser tcp: tcp_main sends a fd to child fore reading
+				   changes array, close() and applying the changes (EBADF).
+				   E.g. for ser tcp: tcp_main sends a fd to child for reading
 				    => deletes it from the watched fds => the changes array
 					will contain an EV_DELETE for it. Before the changes
 					are applied (they are at the end of the main io_wait loop,
@@ -1163,6 +1165,16 @@ again:
 					to tcp_main by a sender (send fail) is processed and causes
 					the fd to be closed. When the changes are applied =>
 					error for the EV_DELETE attempt of a closed fd.
+					Something similar can happen when a fd is scheduled
+					for removal, is close()'ed before being removed and
+					re-opened(a new sock. get the same fd). When the
+					watched fd changes will be applied the fd will be valid
+					(so no EBADF), but it's not already watch => ENOENT.
+					We report a BUG for the other errors (there's nothing
+					constructive we can do if we get an error we don't know 
+					how to handle), but apart from that we ignore it in the
+					idea that it is better apply the rest of the changes,
+					rather then dropping all of them.
 				*/
 				/*
 					example EV_ERROR for trying to delete a read watched fd,
@@ -1176,9 +1188,12 @@ again:
 						udata = 0x0
 					}
 				*/
-				if (h->kq_array[r].data != EBADF)
-					LOG(L_INFO, "INFO: io_wait_loop_kqueue: kevent error on "
-							"fd %ld: %s [%ld]\n", (long)h->kq_array[r].ident,
+				if (h->kq_array[r].data != EBADF &&
+						h->kq_array[r].data != ENOENT)
+					BUG("io_wait_loop_kqueue: kevent unexpected error on "
+							"fd %ld udata %lx: %s [%ld]\n",
+							(long)h->kq_array[r].ident,
+							(long)h->kq_array[r].udata,
 							strerror(h->kq_array[r].data),
 							(long)h->kq_array[r].data);
 			}else{
@@ -1186,20 +1201,28 @@ again:
 				if (likely(h->kq_array[r].filter==EVFILT_READ)){
 					revents=POLLIN |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
-						(((int)!(h->kq_array[r].flags & EV_ERROR)-1)&POLLERR);
+						(((int)!((h->kq_array[r].flags & EV_EOF) &&
+								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
 					while(fm->type && (fm->events & revents) && 
 							(handle_io(fm, revents, -1)>0) && repeat);
 				}else if (h->kq_array[r].filter==EVFILT_WRITE){
 					revents=POLLOUT |
 						(((int)!(h->kq_array[r].flags & EV_EOF)-1)&POLLHUP) |
-						(((int)!(h->kq_array[r].flags & EV_ERROR)-1)&POLLERR);
+						(((int)!((h->kq_array[r].flags & EV_EOF) &&
+								 	h->kq_array[r].fflags != 0) - 1)&POLLERR);
 					while(fm->type && (fm->events & revents) && 
 							(handle_io(fm, revents, -1)>0) && repeat);
+				}else{
+					BUG("io_wait_loop_kqueue: unknown filter: kqueue: event "
+							"%d/%d: fd=%d, filter=%d, flags=0x%x, fflags=0x%x,"
+							" data=%lx, udata=%lx\n",
+					r, n, h->kq_array[r].ident, h->kq_array[r].filter,
+					h->kq_array[r].flags, h->kq_array[r].fflags, 
+					(long)h->kq_array[r].data, (long)h->kq_array[r].udata);
 				}
 			}
 		}
 	} while(unlikely(orig_changes));
-error:
 	return n;
 }
 #endif




More information about the sr-dev mailing list