@@ -30,11 +30,11 @@ #import "OFStream.h" #import "OFDataArray.h" #ifdef _WIN32 # import "OFTCPSocket.h" #endif -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS # import "OFMutex.h" #endif #ifdef HAVE_KQUEUE # import "OFStreamObserver_kqueue.h" @@ -146,11 +146,11 @@ maxFD = cancelFD[0]; FDToStream = [self allocMemoryWithSize: sizeof(OFStream*) count: maxFD + 1]; FDToStream[cancelFD[0]] = nil; -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS mutex = [[OFMutex alloc] init]; #endif } @catch (id e) { [self release]; @throw e; @@ -167,11 +167,11 @@ [readStreams release]; [writeStreams release]; [queue release]; [queueInfo release]; [queueFDs release]; -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS [mutex release]; #endif [super dealloc]; } @@ -186,67 +186,67 @@ delegate = delegate_; } - (void)addStreamForReading: (OFStream*)stream { -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS [mutex lock]; - @try { #endif + @try { int qi = QUEUE_ADD | QUEUE_READ; int fd = [stream fileDescriptorForReading]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; -#ifdef OF_THREADS } @finally { +#ifdef OF_HAVE_THREADS [mutex unlock]; - } #endif + } [self cancel]; } - (void)addStreamForWriting: (OFStream*)stream { -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS [mutex lock]; - @try { #endif + @try { int qi = QUEUE_ADD | QUEUE_WRITE; int fd = [stream fileDescriptorForWriting]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; -#ifdef OF_THREADS } @finally { +#ifdef OF_HAVE_THREADS [mutex unlock]; - } #endif + } [self cancel]; } - (void)removeStreamForReading: (OFStream*)stream { -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS [mutex lock]; - @try { #endif + @try { int qi = QUEUE_REMOVE | QUEUE_READ; int fd = [stream fileDescriptorForReading]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; -#ifdef OF_THREADS } @finally { +#ifdef OF_HAVE_THREADS [mutex unlock]; - } #endif + } #ifndef _WIN32 OF_ENSURE(write(cancelFD[1], "", 1) > 0); #else OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, @@ -254,25 +254,25 @@ #endif } - (void)removeStreamForWriting: (OFStream*)stream { -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS [mutex lock]; - @try { #endif + @try { int qi = QUEUE_REMOVE | QUEUE_WRITE; int fd = [stream fileDescriptorForWriting]; [queue addObject: stream]; [queueInfo addItem: &qi]; [queueFDs addItem: &fd]; -#ifdef OF_THREADS } @finally { +#ifdef OF_HAVE_THREADS [mutex unlock]; - } #endif + } #ifndef _WIN32 OF_ENSURE(write(cancelFD[1], "", 1) > 0); #else OF_ENSURE(sendto(cancelFD[1], "", 1, 0, (struct sockaddr*)&cancelAddr, @@ -304,14 +304,14 @@ abort(); } - (void)OF_processQueue { -#ifdef OF_THREADS +#ifdef OF_HAVE_THREADS [mutex lock]; - @try { #endif + @try { OFStream **queueObjects = [queue objects]; int *queueInfoItems = [queueInfo items]; int *queueFDsItems = [queueFDs items]; size_t i, count = [queue count]; @@ -368,15 +368,15 @@ } [queue removeAllObjects]; [queueInfo removeAllItems]; [queueFDs removeAllItems]; -#ifdef OF_THREADS } @finally { +#ifdef OF_HAVE_THREADS [mutex unlock]; - } #endif + } } - (void)observe { [self observeWithTimeout: -1];