Index: src/OFThreadPool.h ================================================================== --- src/OFThreadPool.h +++ src/OFThreadPool.h @@ -25,20 +25,23 @@ @class OFCondition; @class OFThreadPoolJob; /** * \brief A class providing a pool of reusable threads. + * + * \note When the thread pool is released, all threads will terminate after + * they finish the job they are currently processing. */ @interface OFThreadPool: OFObject { size_t size; OFMutableArray *threads; - int count; + volatile int count; @public OFList *queue; OFCondition *queueCondition; - int doneCount; + volatile int doneCount; OFCondition *countCondition; } /** * \brief Returns a new thread pool with one thread for each core in the system. Index: src/OFThreadPool.m ================================================================== --- src/OFThreadPool.m +++ src/OFThreadPool.m @@ -12,16 +12,10 @@ * Public License, either version 2 or 3, which can be found in the file * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this * file. */ -/* - * TODO: - * - What to do when threads are running and the thread pool is deallocated? - * - How to tell the threads to terminate because they won't get new work? - */ - #include "config.h" #import "OFThreadPool.h" #import "OFArray.h" #import "OFList.h" @@ -134,11 +128,15 @@ } @end @interface OFThreadPoolThread: OFThread { - OFThreadPool *threadPool; + OFList *queue; + OFCondition *queueCondition, *countCondition; +@public + volatile BOOL terminate; + volatile int *doneCount; } + threadWithThreadPool: (OFThreadPool*)threadPool; - initWithThreadPool: (OFThreadPool*)threadPool; @end @@ -147,53 +145,102 @@ + threadWithThreadPool: (OFThreadPool*)threadPool { return [[[self alloc] initWithThreadPool: threadPool] autorelease]; } -- initWithThreadPool: (OFThreadPool*)threadPool_ +- initWithThreadPool: (OFThreadPool*)threadPool { self = [super init]; - threadPool = threadPool_; + @try { + queue = [threadPool->queue retain]; + queueCondition = [threadPool->queueCondition retain]; + countCondition = [threadPool->countCondition retain]; + doneCount = &threadPool->doneCount; + } @catch (id e) { + [self release]; + @throw e; + } return self; } + +- (void)dealloc +{ + [queue release]; + [queueCondition release]; + [countCondition release]; + + [super dealloc]; +} - (id)main { OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + + if (terminate) { + [pool release]; + return nil; + } for (;;) { OFThreadPoolJob *job; - [threadPool->queueCondition lock]; + [queueCondition lock]; @try { of_list_object_t *listObject; - listObject = [threadPool->queue firstListObject]; + if (terminate) { + [pool release]; + return nil; + } + + listObject = [queue firstListObject]; while (listObject == NULL) { - [threadPool->queueCondition wait]; - listObject = - [threadPool->queue firstListObject]; + [queueCondition wait]; + + if (terminate) { + [pool release]; + return nil; + } + + listObject = [queue firstListObject]; } job = [[listObject->object retain] autorelease]; - [threadPool->queue removeListObject: listObject]; + [queue removeListObject: listObject]; } @finally { - [threadPool->queueCondition unlock]; + [queueCondition unlock]; + } + + if (terminate) { + [pool release]; + return nil; } [job perform]; + + if (terminate) { + [pool release]; + return nil; + } + [pool releaseObjects]; - [threadPool->countCondition lock]; + [countCondition lock]; @try { - threadPool->doneCount++; - [threadPool->countCondition signal]; + if (terminate) { + [pool release]; + return nil; + } + + (*doneCount)++; + + [countCondition signal]; } @finally { - [threadPool->countCondition unlock]; + [countCondition unlock]; } } } @end @@ -255,10 +302,30 @@ return self; } - (void)dealloc { + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + [queueCondition lock]; + @try { + [countCondition lock]; + @try { + OFEnumerator *enumerator = [threads objectEnumerator]; + OFThreadPoolThread *thread; + + while ((thread = [enumerator nextObject]) != nil) + thread->terminate = YES; + } @finally { + [countCondition unlock]; + } + + [queueCondition broadcast]; + } @finally { + [queueCondition unlock]; + } + [pool release]; + [threads release]; [queue release]; [queueCondition release]; [countCondition release];