Differences From Artifact [3eaeff6179]:
- File src/OFStreamObserver.m — part of check-in [52102a2906] at 2011-04-01 16:06:48 on branch trunk — Check nfds against OPEN_MAX before calling poll. (user: js, size: 7433) [annotate] [blame] [check-ins using]
To Artifact [bf14750714]:
- File
src/OFStreamObserver.m
— part of check-in
[35aab77af3]
at
2011-04-01 16:54:57
on branch trunk
— Make OFStreamObserver thread-safe.
It does not cancel the currently running -[observe] yet when changing
the set of streams to observe. (user: js, size: 9282) [annotate] [blame] [check-ins using]
︙ | ︙ | |||
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | #include "config.h" #define OF_STREAM_OBSERVER_M #include <string.h> #ifdef OF_HAVE_POLL # include <poll.h> #endif #import "OFStreamObserver.h" #import "OFDataArray.h" #import "OFArray.h" #import "OFDictionary.h" #import "OFStream.h" #import "OFNumber.h" #import "OFAutoreleasePool.h" #import "OFOutOfRangeException.h" @implementation OFStreamObserver + observer { return [[[self alloc] init] autorelease]; } - init { self = [super init]; @try { readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; #ifdef OF_HAVE_POLL fds = [[OFDataArray alloc] initWithItemSize: sizeof(struct pollfd)]; fdToStream = [[OFMutableDictionary alloc] init]; #else FD_ZERO(&readfds); FD_ZERO(&writefds); | > > > > > > > > > > > | 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | #include "config.h" #define OF_STREAM_OBSERVER_M #include <string.h> #include <assert.h> #ifdef OF_HAVE_POLL # include <poll.h> #endif #import "OFStreamObserver.h" #import "OFDataArray.h" #import "OFArray.h" #import "OFDictionary.h" #import "OFStream.h" #import "OFNumber.h" #import "OFAutoreleasePool.h" #import "OFOutOfRangeException.h" enum { QUEUE_ADD = 0, QUEUE_REMOVE = 1, QUEUE_READ = 0, QUEUE_WRITE = 2 }; @implementation OFStreamObserver + observer { return [[[self alloc] init] autorelease]; } - init { self = [super init]; @try { readStreams = [[OFMutableArray alloc] init]; writeStreams = [[OFMutableArray alloc] init]; queue = [[OFMutableArray alloc] init]; queueInfo = [[OFMutableArray alloc] init]; #ifdef OF_HAVE_POLL fds = [[OFDataArray alloc] initWithItemSize: sizeof(struct pollfd)]; fdToStream = [[OFMutableDictionary alloc] init]; #else FD_ZERO(&readfds); FD_ZERO(&writefds); |
︙ | ︙ | |||
64 65 66 67 68 69 70 71 72 73 74 75 76 77 | } - (void)dealloc { [(id)delegate release]; [readStreams release]; [writeStreams release]; #ifdef OF_HAVE_POLL [fdToStream release]; [fds release]; #endif [super dealloc]; } | > > | 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | } - (void)dealloc { [(id)delegate release]; [readStreams release]; [writeStreams release]; [queue release]; [queueInfo release]; #ifdef OF_HAVE_POLL [fdToStream release]; [fds release]; #endif [super dealloc]; } |
︙ | ︙ | |||
173 174 175 176 177 178 179 | if (!FD_ISSET(fd, other_fdset)) FD_CLR(fd, &exceptfds); } #endif - (void)addStreamToObserveForReading: (OFStream*)stream { | > > > > | > | | < < | < < < > > > > | > | | < < | < < < > > | > > > | > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | | | | < | < < > | | | | | | | > > > > > > > > > | > > | 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | if (!FD_ISSET(fd, other_fdset)) FD_CLR(fd, &exceptfds); } #endif - (void)addStreamToObserveForReading: (OFStream*)stream { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_READ]; @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } [pool release]; } - (void)addStreamToObserveForWriting: (OFStream*)stream { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; OFNumber *qi = [OFNumber numberWithInt: QUEUE_ADD | QUEUE_WRITE]; @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } [pool release]; } - (void)removeStreamToObserveForReading: (OFStream*)stream { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_READ]; @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } [pool release]; } - (void)removeStreamToObserveForWriting: (OFStream*)stream { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; OFNumber *qi = [OFNumber numberWithInt: QUEUE_REMOVE | QUEUE_WRITE]; @synchronized (queue) { [queue addObject: stream]; [queueInfo addObject: qi]; } [pool release]; } - (void)_processQueue { @synchronized (queue) { OFStream **queue_c = [queue cArray]; OFNumber **queueInfo_c = [queueInfo cArray]; size_t i, count = [queue count]; for (i = 0; i < count; i++) { switch ([queueInfo_c[i] intValue]) { case QUEUE_ADD | QUEUE_READ: [readStreams addObject: queue_c[i]]; #ifdef OF_HAVE_POLL [self _addStream: queue_c[i] withEvents: POLLIN]; #else [self _addStream: queue_c[i] withFDSet: &readfds]; #endif break; case QUEUE_ADD | QUEUE_WRITE: [writeStreams addObject: queue_c[i]]; #ifdef OF_HAVE_POLL [self _addStream: queue_c[i] withEvents: POLLOUT]; #else [self _addStream: queue_c[i] withFDSet: &writefds]; #endif break; case QUEUE_REMOVE | QUEUE_READ: [readStreams removeObjectIdenticalTo: queue_c[i]]; #ifdef OF_HAVE_POLL [self _removeStream: queue_c[i] withEvents: POLLIN]; #else [self _removeStream: queue_c[i] withFDSet: &readfds otherFDSet: &writefds]; #endif break; case QUEUE_REMOVE | QUEUE_WRITE: [writeStreams removeObjectIdenticalTo: queue_c[i]]; #ifdef OF_HAVE_POLL [self _removeStream: queue_c[i] withEvents: POLLOUT]; #else [self _removeStream: queue_c[i] withFDSet: &writefds otherFDSet: &readfds]; #endif break; default: assert(0); } } [queue removeNObjects: count]; [queueInfo removeNObjects: count]; } } - (void)observe { [self observeWithTimeout: -1]; } - (BOOL)observeWithTimeout: (int)timeout { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; BOOL foundInCache = NO; OFStream **cArray; size_t i, count; #ifdef OF_HAVE_POLL struct pollfd *fds_c; size_t nfds; #else fd_set readfds_; fd_set writefds_; fd_set exceptfds_; struct timeval tv; #endif [self _processQueue]; cArray = [readStreams cArray]; count = [readStreams count]; for (i = 0; i < count; i++) { if (cArray[i]->cache != NULL) { [delegate streamDidBecomeReadyForReading: cArray[i]]; |
︙ | ︙ |