Index: ObjFW.xcodeproj/project.pbxproj ================================================================== --- ObjFW.xcodeproj/project.pbxproj +++ ObjFW.xcodeproj/project.pbxproj @@ -269,10 +269,12 @@ 4B90B7A3133AD87D00BD33CB /* OFBindFailedException.m in Sources */ = {isa = PBXBuildFile; fileRef = 4B90B799133AD87D00BD33CB /* OFBindFailedException.m */; }; 4B90B7A4133AD87D00BD33CB /* OFConnectionFailedException.h in Headers */ = {isa = PBXBuildFile; fileRef = 4B90B79A133AD87D00BD33CB /* OFConnectionFailedException.h */; settings = {ATTRIBUTES = (Public, ); }; }; 4B90B7A5133AD87D00BD33CB /* OFConnectionFailedException.m in Sources */ = {isa = PBXBuildFile; fileRef = 4B90B79B133AD87D00BD33CB /* OFConnectionFailedException.m */; }; 4B90B7A6133AD87D00BD33CB /* OFListenFailedException.h in Headers */ = {isa = PBXBuildFile; fileRef = 4B90B79C133AD87D00BD33CB /* OFListenFailedException.h */; settings = {ATTRIBUTES = (Public, ); }; }; 4B90B7A7133AD87D00BD33CB /* OFListenFailedException.m in Sources */ = {isa = PBXBuildFile; fileRef = 4B90B79D133AD87D00BD33CB /* OFListenFailedException.m */; }; + 4B9361A81511000C00DCD16B /* OFThreadPool.h in Headers */ = {isa = PBXBuildFile; fileRef = 4B9361A61511000C00DCD16B /* OFThreadPool.h */; settings = {ATTRIBUTES = (Public, ); }; }; + 4B9361A91511000C00DCD16B /* OFThreadPool.m in Sources */ = {isa = PBXBuildFile; fileRef = 4B9361A71511000C00DCD16B /* OFThreadPool.m */; }; 4B989C2F13771A3700109A30 /* OFSerialization.h in Headers */ = {isa = PBXBuildFile; fileRef = 4B989C2E13771A3700109A30 /* OFSerialization.h */; settings = {ATTRIBUTES = (Public, ); }; }; 4B9BB7BD141CDE2D000AD1CC /* OFArray_adjacentSubarray.h in Headers */ = {isa = PBXBuildFile; fileRef = 4B9BB7B9141CDE2D000AD1CC /* OFArray_adjacentSubarray.h */; }; 4B9BB7BE141CDE2D000AD1CC /* OFArray_adjacentSubarray.m in Sources */ = {isa = PBXBuildFile; fileRef = 4B9BB7BA141CDE2D000AD1CC /* OFArray_adjacentSubarray.m */; }; 4B9BB7BF141CDE2D000AD1CC /* OFArray_subarray.h in Headers */ = {isa = PBXBuildFile; fileRef = 4B9BB7BB141CDE2D000AD1CC /* OFArray_subarray.h */; settings = {ATTRIBUTES = (Public, ); }; }; 4B9BB7C0141CDE2D000AD1CC /* OFArray_subarray.m in Sources */ = {isa = PBXBuildFile; fileRef = 4B9BB7BC141CDE2D000AD1CC /* OFArray_subarray.m */; }; @@ -617,10 +619,12 @@ 4B90B799133AD87D00BD33CB /* OFBindFailedException.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OFBindFailedException.m; path = src/exceptions/OFBindFailedException.m; sourceTree = ""; }; 4B90B79A133AD87D00BD33CB /* OFConnectionFailedException.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OFConnectionFailedException.h; path = src/exceptions/OFConnectionFailedException.h; sourceTree = ""; }; 4B90B79B133AD87D00BD33CB /* OFConnectionFailedException.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OFConnectionFailedException.m; path = src/exceptions/OFConnectionFailedException.m; sourceTree = ""; }; 4B90B79C133AD87D00BD33CB /* OFListenFailedException.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OFListenFailedException.h; path = src/exceptions/OFListenFailedException.h; sourceTree = ""; }; 4B90B79D133AD87D00BD33CB /* OFListenFailedException.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OFListenFailedException.m; path = src/exceptions/OFListenFailedException.m; sourceTree = ""; }; + 4B9361A61511000C00DCD16B /* OFThreadPool.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OFThreadPool.h; path = src/OFThreadPool.h; sourceTree = ""; }; + 4B9361A71511000C00DCD16B /* OFThreadPool.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OFThreadPool.m; path = src/OFThreadPool.m; sourceTree = ""; }; 4B981CDE116F71DD00294DB7 /* OFSeekableStream.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OFSeekableStream.h; path = src/OFSeekableStream.h; sourceTree = ""; }; 4B981CDF116F71DD00294DB7 /* OFSeekableStream.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OFSeekableStream.m; path = src/OFSeekableStream.m; sourceTree = ""; }; 4B989C2E13771A3700109A30 /* OFSerialization.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OFSerialization.h; path = src/OFSerialization.h; sourceTree = ""; }; 4B99250F12E0780000215DBE /* OFHTTPRequest.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; name = OFHTTPRequest.h; path = src/OFHTTPRequest.h; sourceTree = ""; }; 4B99251012E0780000215DBE /* OFHTTPRequest.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; name = OFHTTPRequest.m; path = src/OFHTTPRequest.m; sourceTree = ""; }; @@ -997,10 +1001,12 @@ 4B6799821099E7C50041064A /* OFTCPSocket.m */, 4BD653C3143B8489006182F0 /* OFTCPSocket+SOCKS5.h */, 4BD653C4143B8489006182F0 /* OFTCPSocket+SOCKS5.m */, 4B6799831099E7C50041064A /* OFThread.h */, 4B6799841099E7C50041064A /* OFThread.m */, + 4B9361A61511000C00DCD16B /* OFThreadPool.h */, + 4B9361A71511000C00DCD16B /* OFThreadPool.m */, 4BA02BA015041F5900002F84 /* OFTLSSocket.h */, 4B4A61F212DF5EA20048F3F2 /* OFURL.h */, 4B4A61F312DF5EA20048F3F2 /* OFURL.m */, 4BF1BCCE11C9663F0025511F /* OFXMLAttribute.h */, 4BF1BCCF11C9663F0025511F /* OFXMLAttribute.m */, @@ -1176,10 +1182,11 @@ 4B3D23DA1337FCB000DD29B8 /* OFString+URLEncoding.h in Headers */, 4B3D23DB1337FCB000DD29B8 /* OFString+XMLEscaping.h in Headers */, 4B3D23DC1337FCB000DD29B8 /* OFString+XMLUnescaping.h in Headers */, 4B3D23DD1337FCB000DD29B8 /* OFTCPSocket.h in Headers */, 4B3D23DE1337FCB000DD29B8 /* OFThread.h in Headers */, + 4B9361A81511000C00DCD16B /* OFThreadPool.h in Headers */, 4BA02BA215041F5900002F84 /* OFTLSSocket.h in Headers */, 4B3D23DF1337FCB000DD29B8 /* OFURL.h in Headers */, 4B3D23E01337FCB000DD29B8 /* OFXMLAttribute.h in Headers */, 4B49EA6D143B3A090005BBC6 /* OFXMLCDATA.h in Headers */, 4B49EA6F143B3A090005BBC6 /* OFXMLCharacters.h in Headers */, @@ -1471,10 +1478,11 @@ 4B3D23AA1337FC0D00DD29B8 /* OFString+XMLUnescaping.m in Sources */, 4B552555147AA5DB0003BF47 /* OFString_UTF8.m in Sources */, 4B3D23AB1337FC0D00DD29B8 /* OFTCPSocket.m in Sources */, 4BD653C6143B8489006182F0 /* OFTCPSocket+SOCKS5.m in Sources */, 4B3D23AC1337FC0D00DD29B8 /* OFThread.m in Sources */, + 4B9361A91511000C00DCD16B /* OFThreadPool.m in Sources */, 4B3D23AD1337FC0D00DD29B8 /* OFURL.m in Sources */, 4B3D23AE1337FC0D00DD29B8 /* OFXMLAttribute.m in Sources */, 4B49EA6E143B3A090005BBC6 /* OFXMLCDATA.m in Sources */, 4B49EA70143B3A090005BBC6 /* OFXMLCharacters.m in Sources */, 4B49EA72143B3A090005BBC6 /* OFXMLComment.m in Sources */, Index: src/Makefile ================================================================== --- src/Makefile +++ src/Makefile @@ -48,10 +48,11 @@ OFString+URLEncoding.m \ OFString+XMLEscaping.m \ OFString+XMLUnescaping.m \ OFTCPSocket.m \ ${OFTHREAD_M} \ + OFThreadPool.m \ OFURL.m \ OFXMLAttribute.m \ OFXMLCDATA.m \ OFXMLCharacters.m \ OFXMLComment.m \ ADDED src/OFThreadPool.h Index: src/OFThreadPool.h ================================================================== --- src/OFThreadPool.h +++ src/OFThreadPool.h @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011, 2012 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +#import "OFObject.h" + +#ifdef OF_HAVE_BLOCKS +typedef void (^of_thread_pool_block_t)(id object); +#endif + +@class OFMutableArray; +@class OFList; +@class OFCondition; +@class OFThreadPoolJob; + +/** + * \brief A class providing a pool of reusable threads. + */ +@interface OFThreadPool: OFObject +{ + size_t size; + OFMutableArray *threads; + int count; +@public + OFList *queue; + OFCondition *queueCondition; + int doneCount; + OFCondition *countCondition; +} + +/** + * \brief Returns a new thread pool with one thread for each core in the system. + * + * \warning If for some reason the number of cores in the system could not be + * determined, the pool will only have one thread! + * + * \return A new thread pool with one thread for each core in the system + */ ++ threadPool; + +/** + * \brief Returns a new thread pool with the specified number of threads. + * + * \warning If for some reason the number of cores in the system could not be + * determined, the pool will only have one thread! + * + * \param size The number of threads for the pool + * \return A new thread pool with the specified number of threads + */ ++ threadPoolWithSize: (size_t)size; + +/** + * \brief Initializes an already allocated OFThreadPool with one thread for + * each core in the system. + * + * \warning If for some reason the number of cores in the system could not be + * determined, the pool will only have one thread! + * + * \return An initialized OFThreadPool with one thread for each core in the + * system + */ +- init; + +/** + * \brief Initializes an already allocated OFThreadPool with the specified + * number of threads. + * + * \warning If for some reason the number of cores in the system could not be + * determined, the pool will only have one thread! + * + * \param size The number of threads for the pool + * \return An initialized OFThreadPool with the specified number of threads + */ +- initWithSize: (size_t)size; + +/** + * \brief Execute the specified selector on the specified target with the + * specified object as soon as a thread is ready. + * + * \param target The target on which to perform the selector + * \param selector The selector to perform on the target + * \param object THe object with which the selector is performed on the target + */ +- (void)dispatchWithTarget: (id)target + selector: (SEL)selector + object: (id)object; + +#ifdef OF_HAVE_BLOCKS +/** + * \brief Executes the specified block as soon as a thread is ready. + * + * \param block The block to execute + */ +- (void)dispatchWithBlock: (of_thread_pool_block_t)block; + +/** + * \brief Executes the specified block as soon as a thread is ready. + * + * \param block The block to execute + * \param object The object to pass to the block + */ +- (void)dispatchWithBlock: (of_thread_pool_block_t)block + object: (id)object; +#endif + +/** + * \brief Waits until all threads have finished. + */ +- (void)waitUntilFinished; +@end ADDED src/OFThreadPool.m Index: src/OFThreadPool.m ================================================================== --- src/OFThreadPool.m +++ src/OFThreadPool.m @@ -0,0 +1,319 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2011, 2012 + * Jonathan Schleifer + * + * All rights reserved. + * + * This file is part of ObjFW. It may be distributed under the terms of the + * Q Public License 1.0, which can be found in the file LICENSE.QPL included in + * the packaging of this file. + * + * Alternatively, it may be distributed under the terms of the GNU General + * Public License, either version 2 or 3, which can be found in the file + * LICENSE.GPLv2 or LICENSE.GPLv3 respectively included in the packaging of this + * file. + */ + +/* + * TODO: + * - What to do when threads are running and the thread pool is deallocated? + * - How to tell the threads to terminate because they won't get new work? + */ + +#include "config.h" + +#import "OFThreadPool.h" +#import "OFArray.h" +#import "OFList.h" +#import "OFThread.h" +#import "OFAutoreleasePool.h" + +@interface OFThreadPoolJob: OFObject +{ + id target; + SEL selector; + id object; +#ifdef OF_HAVE_BLOCKS + of_thread_pool_block_t block; +#endif +} + ++ jobWithTarget: (id)target + selector: (SEL)selector + object: (id)object; +#ifdef OF_HAVE_BLOCKS ++ jobWithBlock: (of_thread_pool_block_t)block + object: (id)object; +#endif +- initWithTarget: (id)target + selector: (SEL)selector + object: (id)object; +#ifdef OF_HAVE_BLOCKS +- initWithBlock: (of_thread_pool_block_t)block + object: (id)object; +#endif +- (void)perform; +@end + +@implementation OFThreadPoolJob ++ jobWithTarget: (id)target + selector: (SEL)selector + object: (id)object +{ + return [[[self alloc] initWithTarget: target + selector: selector + object: object] autorelease]; +} + +#ifdef OF_HAVE_BLOCKS ++ jobWithBlock: (of_thread_pool_block_t)block + object: (id)object +{ + return [[(OFThreadPoolJob*)[self alloc] + initWithBlock: block + object: object] autorelease]; +} +#endif + +- initWithTarget: (id)target_ + selector: (SEL)selector_ + object: (id)object_ +{ + self = [super init]; + + @try { + target = [target_ retain]; + selector = selector_; + object = [object_ retain]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +#ifdef OF_HAVE_BLOCKS +- initWithBlock: (of_thread_pool_block_t)block_ + object: (id)object_ +{ + self = [super init]; + + @try { + block = [block_ retain]; + object = [object_ retain]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} +#endif + +- (void)dealloc +{ + [target release]; + [object release]; +#ifdef OF_HAVE_BLOCKS + [block release]; +#endif + + [super dealloc]; +} + +- (void)perform +{ +#ifdef OF_HAVE_BLOCKS + if (block != nil) + block(object); + else +#endif + [object performSelector: selector + withObject: object]; +} +@end + +@interface OFThreadPoolThread: OFThread +{ + OFThreadPool *threadPool; +} + ++ threadWithThreadPool: (OFThreadPool*)threadPool; +- initWithThreadPool: (OFThreadPool*)threadPool; +@end + +@implementation OFThreadPoolThread ++ threadWithThreadPool: (OFThreadPool*)threadPool +{ + return [[[self alloc] initWithThreadPool: threadPool] autorelease]; +} + +- initWithThreadPool: (OFThreadPool*)threadPool_ +{ + self = [super init]; + + threadPool = threadPool_; + + return self; +} + +- (id)main +{ + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + + for (;;) { + OFThreadPoolJob *job; + + [threadPool->queueCondition lock]; + @try { + of_list_object_t *listObject; + + listObject = [threadPool->queue firstListObject]; + + while (listObject == NULL) { + [threadPool->queueCondition wait]; + listObject = + [threadPool->queue firstListObject]; + } + + job = [[listObject->object retain] autorelease]; + [threadPool->queue removeListObject: listObject]; + } @finally { + [threadPool->queueCondition unlock]; + } + + [job perform]; + [pool releaseObjects]; + + [threadPool->countCondition lock]; + @try { + threadPool->doneCount++; + [threadPool->countCondition signal]; + } @finally { + [threadPool->countCondition unlock]; + } + } +} +@end + +@implementation OFThreadPool ++ threadPool +{ + return [[[self alloc] init] autorelease]; +} + ++ threadPoolWithSize: (size_t)size +{ + return [[[self alloc] initWithSize: size] autorelease]; +} + +- init +{ + return [self initWithSize: of_num_cpus]; +} + +- initWithSize: (size_t)size_ +{ + self = [super init]; + + @try { + OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init]; + size_t i; + + size = size_; + threads = [[OFMutableArray alloc] init]; + queue = [[OFList alloc] init]; + queueCondition = [[OFCondition alloc] init]; + countCondition = [[OFCondition alloc] init]; + + for (i = 0; i < size; i++) { + OFThreadPoolThread *thread = + [OFThreadPoolThread threadWithThreadPool: self]; + + [threads addObject: thread]; + + [pool releaseObjects]; + } + + /* + * We need to start the threads in a separate loop to make sure + * threads is not modified anymore to prevent a race condition. + */ + for (i = 0; i < size; i++) { + OFThreadPoolThread *thread = [threads objectAtIndex: i]; + + [thread start]; + } + + [pool release]; + } @catch (id e) { + [self release]; + @throw e; + } + + return self; +} + +- (void)dealloc +{ + [threads release]; + [queue release]; + [queueCondition release]; + [countCondition release]; + + [super dealloc]; +} + +- (void)_dispatchJob: (OFThreadPoolJob*)job +{ + of_atomic_inc_int(&count); + + [queueCondition lock]; + @try { + [queue appendObject: job]; + [queueCondition signal]; + } @finally { + [queueCondition unlock]; + } +} + +- (void)waitUntilFinished +{ + for (;;) { + [countCondition lock]; + @try { + if (doneCount == count) + return; + + [countCondition wait]; + } @finally { + [countCondition unlock]; + } + } +} + +- (void)dispatchWithTarget: (id)target + selector: (SEL)selector + object: (id)object +{ + [self _dispatchJob: [OFThreadPoolJob jobWithTarget: target + selector: selector + object: object]]; +} + +#ifdef OF_HAVE_BLOCKS +- (void)dispatchWithBlock: (of_thread_pool_block_t)block +{ + [self _dispatchJob: [OFThreadPoolJob jobWithBlock: block + object: nil]]; +} + +- (void)dispatchWithBlock: (of_thread_pool_block_t)block + object: (id)object +{ + [self _dispatchJob: [OFThreadPoolJob jobWithBlock: block + object: object]]; +} +#endif +@end Index: src/ObjFW.h ================================================================== --- src/ObjFW.h +++ src/ObjFW.h @@ -132,12 +132,13 @@ # import "atomic.h" #endif #ifdef OF_THREADS # import "OFThread.h" +# import "OFThreadPool.h" # import "threading.h" #endif #import "asprintf.h" #import "base64.h" #import "of_asprintf.h" #import "of_strptime.h"