@@ -186,55 +186,67 @@ delegate = delegate_; } - (void)addStreamForReading: (OFStream*)stream { +#ifdef OF_THREADS [mutex lock]; @try { +#endif int qi = QUEUE_ADD | QUEUE_READ; int fd = [stream fileDescriptorForReading]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; +#ifdef OF_THREADS } @finally { [mutex unlock]; } +#endif [self cancel]; } - (void)addStreamForWriting: (OFStream*)stream { +#ifdef OF_THREADS [mutex lock]; @try { +#endif int qi = QUEUE_ADD | QUEUE_WRITE; int fd = [stream fileDescriptorForWriting]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; +#ifdef OF_THREADS } @finally { [mutex unlock]; } +#endif [self cancel]; } - (void)removeStreamForReading: (OFStream*)stream { +#ifdef OF_THREADS [mutex lock]; @try { +#endif int qi = QUEUE_REMOVE | QUEUE_READ; int fd = [stream fileDescriptorForReading]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; +#ifdef OF_THREADS } @finally { [mutex unlock]; } +#endif #ifndef _WIN32 OF_ENSURE(write(cancelFD[1], "", 1) > 0); #else OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, @@ -242,21 +254,25 @@ #endif } - (void)removeStreamForWriting: (OFStream*)stream { +#ifdef OF_THREADS [mutex lock]; @try { +#endif int qi = QUEUE_REMOVE | QUEUE_WRITE; int fd = [stream fileDescriptorForWriting]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; +#ifdef OF_THREADS } @finally { [mutex unlock]; } +#endif #ifndef _WIN32 OF_ENSURE(write(cancelFD[1], "", 1) > 0); #else OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, @@ -288,12 +304,14 @@ abort(); } - (void)OF_processQueue { +#ifdef OF_THREADS [mutex lock]; @try { +#endif OFStream **queueObjects = [queue objects]; int *queueInfoItems = [queueInfo items]; int *queueFDsItems = [queueFDs items]; size_t i, count = [queue count]; @@ -350,13 +368,15 @@ } [queue removeAllObjects]; [queueInfo removeAllItems]; [queueFDs removeAllItems]; +#ifdef OF_THREADS } @finally { [mutex unlock]; } +#endif } - (void)observe { [self observeWithTimeout: -1];