Comment: | Never block when the read buffer is non-empty
This was broken by 88f2f03. The problem only existed when something was |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | 0.8 |
Files: | files | file ages | folders |
SHA3-256: |
b84490ab4fcdac6125ac1c50e927ce73 |
User & Date: | js on 2016-03-20 12:05:46 |
Other Links: | branch diff | manifest | tags |
2016-03-20
| ||
14:19 | Use the locked queue for kqueue and epoll as well check-in: 85917ea0dd user: js tags: 0.8 | |
12:05 | Never block when the read buffer is non-empty check-in: b84490ab4f user: js tags: 0.8 | |
10:55 | undef __{unsafe_unretained,bridge,autoreleasing} check-in: ea5a19118a user: js tags: 0.8 | |
Modified src/OFKernelEventObserver+Private.h from [1372173ce7] to [32cc2734b2].
︙ | ︙ | |||
15 16 17 18 19 20 21 | */ #import "OFKernelEventObserver.h" OF_ASSUME_NONNULL_BEGIN @interface OFKernelEventObserver (OF_PRIVATE_CATEGORY) | | | 15 16 17 18 19 20 21 22 23 24 25 | */ #import "OFKernelEventObserver.h" OF_ASSUME_NONNULL_BEGIN @interface OFKernelEventObserver (OF_PRIVATE_CATEGORY) - (bool)OF_processReadBuffers; @end OF_ASSUME_NONNULL_END |
Modified src/OFKernelEventObserver.m from [74500121cd] to [b44fb07041].
︙ | ︙ | |||
220 221 222 223 224 225 226 | # else OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr, 8) > 0); # endif #endif } | | > | | > > > | > > > > > > | 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 | # else OF_ENSURE(sendto(_cancelFD[1], "", 1, 0, (struct sockaddr*)&_cancelAddr, 8) > 0); # endif #endif } - (bool)OF_processReadBuffers { id const *objects = [_readObjects objects]; size_t i, count = [_readObjects count]; bool foundInReadBuffer = false; for (i = 0; i < count; i++) { void *pool = objc_autoreleasePoolPush(); if ([objects[i] isKindOfClass: [OFStream class]] && [objects[i] hasDataInReadBuffer] && ![objects[i] OF_isWaitingForDelimiter]) { if ([_delegate respondsToSelector: @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: objects[i]]; foundInReadBuffer = true; } objc_autoreleasePoolPop(pool); } /* * As long as we have data in the read buffer for any stream, we don't * want to block. */ return foundInReadBuffer; } @end |
Modified src/OFKernelEventObserver_LockedQueue.m from [abbca7e630] to [99ed534306].
︙ | ︙ | |||
166 167 168 169 170 171 172 173 174 175 176 177 178 179 | fileDescriptor: (int)fd { OF_UNRECOGNIZED_SELECTOR } - (void)OF_processQueue { #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif int *queueActions = [_queueActions items]; int *queueFDs = [_queueFDs items]; id const *queueObjects = [_queueObjects objects]; | > > | 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | fileDescriptor: (int)fd { OF_UNRECOGNIZED_SELECTOR } - (void)OF_processQueue { void *pool = objc_autoreleasePoolPush(); #ifdef OF_HAVE_THREADS [_mutex lock]; @try { #endif int *queueActions = [_queueActions items]; int *queueFDs = [_queueFDs items]; id const *queueObjects = [_queueObjects objects]; |
︙ | ︙ | |||
225 226 227 228 229 230 231 | [_queueFDs removeAllItems]; [_queueObjects removeAllObjects]; #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif | | > > | 227 228 229 230 231 232 233 234 235 236 237 | [_queueFDs removeAllItems]; [_queueObjects removeAllObjects]; #ifdef OF_HAVE_THREADS } @finally { [_mutex unlock]; } #endif objc_autoreleasePoolPop(pool); } @end |
Modified src/OFKernelEventObserver_epoll.m from [1081329583] to [f08643e4a0].
︙ | ︙ | |||
205 206 207 208 209 210 211 | events: EPOLLOUT objectsArray: _writeObjects]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { OFNull *nullObject = [OFNull null]; | < | | < | 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 | events: EPOLLOUT objectsArray: _writeObjects]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { OFNull *nullObject = [OFNull null]; struct epoll_event eventList[EVENTLIST_SIZE]; int i, events; if ([self OF_processReadBuffers]) return; events = epoll_wait(_epfd, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? timeInterval * 1000 : -1)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; |
︙ | ︙ |
Modified src/OFKernelEventObserver_kqueue.m from [a6eed99dff] to [80c8d89772].
︙ | ︙ | |||
201 202 203 204 205 206 207 | [_mutex unlock]; } #endif } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { | < > > > < < < < > > | 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | [_mutex unlock]; } #endif } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { struct timespec timeout; struct kevent eventList[EVENTLIST_SIZE]; int i, events; if ([self OF_processReadBuffers]) return; timeout.tv_sec = (time_t)timeInterval; timeout.tv_nsec = lrint((timeInterval - timeout.tv_sec) * 1000000000); events = kevent(_kernelQueue, NULL, 0, eventList, EVENTLIST_SIZE, (timeInterval != -1 ? &timeout : NULL)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; for (i = 0; i < events; i++) { void *pool; if (eventList[i].flags & EV_ERROR) @throw [OFObserveFailedException exceptionWithObserver: self errNo: (int)eventList[i].data]; if (eventList[i].ident == _cancelFD[0]) { char buffer; |
︙ | ︙ |
Modified src/OFKernelEventObserver_poll.m from [71e7c91e05] to [7dfa12138b].
︙ | ︙ | |||
155 156 157 158 159 160 161 | [self OF_removeObject: object fileDescriptor: fd events: POLLOUT]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { | < > | | < > > | 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | [self OF_removeObject: object fileDescriptor: fd events: POLLOUT]; } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { struct pollfd *FDs; int events; size_t i, nFDs; [self OF_processQueue]; if ([self OF_processReadBuffers]) return; FDs = [_FDs items]; nFDs = [_FDs count]; #ifdef OPEN_MAX if (nFDs > OPEN_MAX) @throw [OFOutOfRangeException exception]; #endif events = poll(FDs, (nfds_t)nFDs, (int)(timeInterval != -1 ? timeInterval * 1000 : -1)); if (events < 0) @throw [OFObserveFailedException exceptionWithObserver: self errNo: errno]; for (i = 0; i < nFDs; i++) { assert(FDs[i].fd <= _maxFD); if (FDs[i].revents & POLLIN) { void *pool; if (FDs[i].fd == _cancelFD[0]) { char buffer; #ifdef OF_HAVE_PIPE OF_ENSURE(read(_cancelFD[0], &buffer, 1) == 1); #else OF_ENSURE(recvfrom(_cancelFD[0], &buffer, 1, |
︙ | ︙ | |||
209 210 211 212 213 214 215 | [_delegate objectIsReadyForReading: _FDToObject[FDs[i].fd]]; objc_autoreleasePoolPop(pool); } if (FDs[i].revents & POLLOUT) { | | | 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 | [_delegate objectIsReadyForReading: _FDToObject[FDs[i].fd]]; objc_autoreleasePoolPop(pool); } if (FDs[i].revents & POLLOUT) { void *pool = objc_autoreleasePoolPush(); if ([_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: _FDToObject[FDs[i].fd]]; objc_autoreleasePoolPop(pool); } FDs[i].revents = 0; } } @end |
Modified src/OFKernelEventObserver_select.m from [8f5e338344] to [fe3f6e4f8e].
︙ | ︙ | |||
126 127 128 129 130 131 132 | #endif FD_CLR(fd, &_writeFDs); } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { | < > | | < | 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | #endif FD_CLR(fd, &_writeFDs); } - (void)observeForTimeInterval: (of_time_interval_t)timeInterval { id const *objects; fd_set readFDs; fd_set writeFDs; struct timeval timeout; int events; size_t i, count; [self OF_processQueue]; if ([self OF_processReadBuffers]) return; #ifdef FD_COPY FD_COPY(&_readFDs, &readFDs); FD_COPY(&_writeFDs, &writeFDs); #else readFDs = _readFDs; writeFDs = _writeFDs; |
︙ | ︙ | |||
182 183 184 185 186 187 188 | #endif } objects = [_readObjects objects]; count = [_readObjects count]; for (i = 0; i < count; i++) { | < < | | < < | | | 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | #endif } objects = [_readObjects objects]; count = [_readObjects count]; for (i = 0; i < count; i++) { void *pool = objc_autoreleasePoolPush(); int fd = [objects[i] fileDescriptorForReading]; if (FD_ISSET(fd, &readFDs) && [_delegate respondsToSelector: @selector(objectIsReadyForReading:)]) [_delegate objectIsReadyForReading: objects[i]]; objc_autoreleasePoolPop(pool); } objects = [_writeObjects objects]; count = [_writeObjects count]; for (i = 0; i < count; i++) { void *pool = objc_autoreleasePoolPush(); int fd = [objects[i] fileDescriptorForWriting]; if (FD_ISSET(fd, &writeFDs) && [_delegate respondsToSelector: @selector(objectIsReadyForWriting:)]) [_delegate objectIsReadyForWriting: objects[i]]; objc_autoreleasePoolPop(pool); } } @end |