Index: src/Makefile ================================================================== --- src/Makefile +++ src/Makefile @@ -142,10 +142,11 @@ socket.m \ ${USE_SRCS_IPX} SRCS_THREADS = OFCondition.m \ OFMutex.m \ OFRecursiveMutex.m \ + OFThreadPool.m \ condition.m \ mutex.m \ thread.m \ tlskey.m SRCS_WINDOWS = OFWin32ConsoleStdIOStream.m \ ADDED src/OFThreadPool.h Index: src/OFThreadPool.h ================================================================== --- src/OFThreadPool.h +++ src/OFThreadPool.h @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2008-2021 Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#import "OFObject.h" + +OF_ASSUME_NONNULL_BEGIN + +/** @file */ + +#ifdef OF_HAVE_BLOCKS +/** + * @brief A block for a job which should be executed in a thread pool. + */ +typedef void (^of_thread_pool_block_t)(void); +#endif + +@class OFCondition; +@class OFList OF_GENERIC(ObjectType); +@class OFMutableArray OF_GENERIC(ObjectType); +@class OFThreadPoolJob; + +/** + * @class OFThreadPool OFThreadPool.h ObjFW/OFThreadPool.h + * + * @brief A class providing a pool of reusable threads. + * + * @note When the thread pool is released, all threads will terminate after + * they finish the job they are currently processing. + */ +OF_SUBCLASSING_RESTRICTED +@interface OFThreadPool: OFObject +{ + size_t _size; + OFMutableArray *_threads; + volatile int _count; +#ifdef OF_THREAD_POOL_M +@public +#endif + OFList *_queue; + OFCondition *_queueCondition; + volatile int _doneCount; + OFCondition *_countCondition; +} + +/** + * @brief The size of the thread pool. + */ +@property (readonly, nonatomic) size_t size; + +/** + * @brief Returns a new thread pool with one thread for each core in the system. + * + * @warning If for some reason the number of cores in the system could not be + * determined, the pool will only have one thread! + * + * @return A new thread pool with one thread for each core in the system + */ ++ (instancetype)threadPool; + +/** + * @brief Returns a new thread pool with the specified number of threads. + * + * @param size The number of threads for the pool + * @return A new thread pool with the specified number of threads + */ ++ (instancetype)threadPoolWithSize: (size_t)size; + +/** + * @brief Initializes an already allocated OFThreadPool with the specified + * number of threads. + * + * @param size The number of threads for the pool + * @return An initialized OFThreadPool with the specified number of threads + */ +- (instancetype)initWithSize: (size_t)size OF_DESIGNATED_INITIALIZER; + +/** + * @brief Execute the specified selector on the specified target with the + * specified object as soon as a thread is ready. + * + * @param target The target on which to perform the selector + * @param selector The selector to perform on the target + * @param object The object with which the selector is performed on the target + */ +- (void)dispatchWithTarget: (id)target + selector: (SEL)selector + object: (nullable id)object; + +#ifdef OF_HAVE_BLOCKS +/** + * @brief Executes the specified block as soon as a thread is ready. + * + * @param block The block to execute + */ +- (void)dispatchWithBlock: (of_thread_pool_block_t)block; +#endif + +/** + * @brief Waits until all jobs are done. + */ +- (void)waitUntilDone; +@end + +OF_ASSUME_NONNULL_END ADDED src/OFThreadPool.m Index: src/OFThreadPool.m ================================================================== --- src/OFThreadPool.m +++ src/OFThreadPool.m @@ -0,0 +1,360 @@ +/* + * Copyright (c) 2008-2021 Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#include "config.h" + +#define OF_THREAD_POOL_M + +#import "OFThreadPool.h" +#import "OFArray.h" +#import "OFList.h" +#import "OFThread.h" +#import "OFCondition.h" +#import "OFSystemInfo.h" + +OF_DIRECT_MEMBERS +@interface OFThreadPoolJob: OFObject +{ + id _target; + SEL _selector; + id _object; +#ifdef OF_HAVE_BLOCKS + of_thread_pool_block_t _block; +#endif +} + +- (instancetype)initWithTarget: (id)target + selector: (SEL)selector + object: (id)object; +#ifdef OF_HAVE_BLOCKS +- (instancetype)initWithBlock: (of_thread_pool_block_t)block; +#endif +- (void)perform; +@end + +@implementation OFThreadPoolJob +- (instancetype)initWithTarget: (id)target + selector: (SEL)selector + object: (id)object +{ + self = [super init]; + + @try { + _target = [target retain]; + _selector = selector; + _object = [object retain]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +#ifdef OF_HAVE_BLOCKS +- (instancetype)initWithBlock: (of_thread_pool_block_t)block +{ + self = [super init]; + + @try { + _block = [block copy]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} +#endif + +- (void)dealloc +{ + [_target release]; + [_object release]; +#ifdef OF_HAVE_BLOCKS + [_block release]; +#endif + + [super dealloc]; +} + +- (void)perform +{ +#ifdef OF_HAVE_BLOCKS + if (_block != NULL) + _block(); + else +#endif + [_target performSelector: _selector withObject: _object]; +} +@end + +OF_DIRECT_MEMBERS +@interface OFThreadPoolThread: OFThread +{ + OFList *_queue; + OFCondition *_queueCondition, *_countCondition; +@public + volatile bool _terminate; + volatile int *_doneCount; +} + ++ (instancetype)threadWithThreadPool: (OFThreadPool *)threadPool; +- (instancetype)initWithThreadPool: (OFThreadPool *)threadPool; +@end + +@implementation OFThreadPoolThread ++ (instancetype)threadWithThreadPool: (OFThreadPool *)threadPool +{ + return [[(OFThreadPoolThread *)[self alloc] + initWithThreadPool: threadPool] autorelease]; +} + +- (instancetype)initWithThreadPool: (OFThreadPool *)threadPool +{ + self = [super init]; + + @try { + _queue = [threadPool->_queue retain]; + _queueCondition = [threadPool->_queueCondition retain]; + _countCondition = [threadPool->_countCondition retain]; + _doneCount = &threadPool->_doneCount; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +- (void)dealloc +{ + [_queue release]; + [_queueCondition release]; + [_countCondition release]; + + [super dealloc]; +} + +- (id)main +{ + void *pool; + + if (_terminate) + return nil; + + pool = objc_autoreleasePoolPush(); + + for (;;) { + OFThreadPoolJob *job; + + [_queueCondition lock]; + @try { + of_list_object_t *listObject; + + if (_terminate) { + objc_autoreleasePoolPop(pool); + return nil; + } + + listObject = _queue.firstListObject; + + while (listObject == NULL) { + [_queueCondition wait]; + + if (_terminate) { + objc_autoreleasePoolPop(pool); + return nil; + } + + listObject = _queue.firstListObject; + } + + job = [[listObject->object retain] autorelease]; + [_queue removeListObject: listObject]; + } @finally { + [_queueCondition unlock]; + } + + if (_terminate) { + objc_autoreleasePoolPop(pool); + return nil; + } + + [job perform]; + + if (_terminate) { + objc_autoreleasePoolPop(pool); + return nil; + } + + objc_autoreleasePoolPop(pool); + pool = objc_autoreleasePoolPush(); + + [_countCondition lock]; + @try { + if (_terminate) { + objc_autoreleasePoolPop(pool); + return nil; + } + + (*_doneCount)++; + + [_countCondition signal]; + } @finally { + [_countCondition unlock]; + } + } +} +@end + +@implementation OFThreadPool ++ (instancetype)threadPool +{ + return [[[self alloc] init] autorelease]; +} + ++ (instancetype)threadPoolWithSize: (size_t)size +{ + return [[[self alloc] initWithSize: size] autorelease]; +} + +- (instancetype)init +{ + return [self initWithSize: [OFSystemInfo numberOfCPUs]]; +} + +- (instancetype)initWithSize: (size_t)size +{ + self = [super init]; + + @try { + _size = size; + _threads = [[OFMutableArray alloc] init]; + _queue = [[OFList alloc] init]; + _queueCondition = [[OFCondition alloc] init]; + _countCondition = [[OFCondition alloc] init]; + + for (size_t i = 0; i < size; i++) { + void *pool = objc_autoreleasePoolPush(); + + OFThreadPoolThread *thread = + [OFThreadPoolThread threadWithThreadPool: self]; + + [_threads addObject: thread]; + + objc_autoreleasePoolPop(pool); + } + + /* + * We need to start the threads in a separate loop to make sure + * _threads is not modified anymore to prevent a race condition. + */ + for (size_t i = 0; i < size; i++) + [[_threads objectAtIndex: i] start]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +- (void)dealloc +{ + [_queueCondition lock]; + @try { + [_countCondition lock]; + @try { + for (OFThreadPoolThread *thread in _threads) + thread->_terminate = true; + } @finally { + [_countCondition unlock]; + } + + [_queueCondition broadcast]; + } @finally { + [_queueCondition unlock]; + } + + [_threads release]; + [_queue release]; + [_queueCondition release]; + [_countCondition release]; + + [super dealloc]; +} + +- (void)of_dispatchJob: (OFThreadPoolJob *)job OF_DIRECT +{ + [_countCondition lock]; + _count++; + [_countCondition unlock]; + + [_queueCondition lock]; + @try { + [_queue appendObject: job]; + [_queueCondition signal]; + } @finally { + [_queueCondition unlock]; + } +} + +- (void)waitUntilDone +{ + for (;;) { + [_countCondition lock]; + @try { + if (_doneCount == _count) + return; + + [_countCondition wait]; + } @finally { + [_countCondition unlock]; + } + } +} + +- (void)dispatchWithTarget: (id)target + selector: (SEL)selector + object: (id)object +{ + OFThreadPoolJob *job = [[OFThreadPoolJob alloc] initWithTarget: target + selector: selector + object: object]; + @try { + [self of_dispatchJob: job]; + } @finally { + [job release]; + } +} + +#ifdef OF_HAVE_BLOCKS +- (void)dispatchWithBlock: (of_thread_pool_block_t)block +{ + OFThreadPoolJob *job = [[OFThreadPoolJob alloc] initWithBlock: block]; + @try { + [self of_dispatchJob: job]; + } @finally { + [job release]; + } +} +#endif + +- (size_t)size +{ + return _size; +} +@end Index: src/ObjFW.h ================================================================== --- src/ObjFW.h +++ src/ObjFW.h @@ -247,10 +247,11 @@ #ifdef OF_HAVE_THREADS # import "thread.h" # import "tlskey.h" # import "mutex.h" # import "condition.h" +# import "OFThreadPool.h" # import "OFMutex.h" # import "OFRecursiveMutex.h" # import "OFCondition.h" #endif