24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
#import "OFStreamObserver_select.h"
#import "OFStream.h"
#import "OFArray.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"
#ifdef _WIN32
# define close(sock) closesocket(sock)
#endif
enum {
QUEUE_ADD = 0,
QUEUE_REMOVE = 1,
QUEUE_READ = 0,
QUEUE_WRITE = 2
};
@implementation OFStreamObserver_select
- init
{
self = [super init];
FD_ZERO(&readFDs);
FD_ZERO(&writeFDs);
FD_SET(cancelFD[0], &readFDs);
nFDs = cancelFD[0] + 1;
return self;
}
- (void)_addStream: (OFStream*)stream
withFDSet: (fd_set*)FDSet
{
OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
int fileDescriptor = [stream fileDescriptor];
FD_SET(fileDescriptor, FDSet);
FD_SET(fileDescriptor, &exceptFDs);
if (fileDescriptor >= nFDs)
nFDs = fileDescriptor + 1;
[pool release];
}
- (void)_removeStream: (OFStream*)stream
withFDSet: (fd_set*)FDSet
otherFDSet: (fd_set*)otherFDSet
{
int fileDescriptor = [stream fileDescriptor];
FD_CLR(fileDescriptor, FDSet);
if (!FD_ISSET(fileDescriptor, otherFDSet))
FD_CLR(fileDescriptor, &exceptFDs);
}
- (void)_processQueue
{
@synchronized (queue) {
OFStream **queueCArray = [queue cArray];
OFNumber **queueInfoCArray = [queueInfo cArray];
size_t i, count = [queue count];
for (i = 0; i < count; i++) {
switch ([queueInfoCArray[i] intValue]) {
case QUEUE_ADD | QUEUE_READ:
[readStreams addObject: queueCArray[i]];
[self _addStream: queueCArray[i]
withFDSet: &readFDs];
break;
case QUEUE_ADD | QUEUE_WRITE:
[writeStreams addObject: queueCArray[i]];
[self _addStream: queueCArray[i]
withFDSet: &writeFDs];
break;
case QUEUE_REMOVE | QUEUE_READ:
[readStreams removeObjectIdenticalTo:
queueCArray[i]];
[self _removeStream: queueCArray[i]
withFDSet: &readFDs
otherFDSet: &writeFDs];
break;
case QUEUE_REMOVE | QUEUE_WRITE:
[writeStreams removeObjectIdenticalTo:
queueCArray[i]];
[self _removeStream: queueCArray[i]
withFDSet: &writeFDs
otherFDSet: &readFDs];
break;
default:
assert(0);
}
}
[queue removeNObjects: count];
[queueInfo removeNObjects: count];
}
}
- (BOOL)observeWithTimeout: (int)timeout
{
OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
OFStream **cArray;
fd_set readFDs_;
|
<
<
<
<
<
<
<
<
<
<
<
<
|
<
<
|
|
|
|
<
<
<
<
<
|
<
<
|
<
|
<
|
<
<
<
<
<
<
|
<
<
<
<
|
|
<
|
<
<
<
|
>
|
<
<
<
<
|
<
<
|
|
<
<
<
<
|
<
|
<
<
<
<
|
<
<
<
|
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
#import "OFStreamObserver_select.h"
#import "OFStream.h"
#import "OFArray.h"
#import "OFNumber.h"
#import "OFAutoreleasePool.h"
@implementation OFStreamObserver_select
- init
{
self = [super init];
FD_ZERO(&readFDs);
FD_ZERO(&writeFDs);
FD_SET(cancelFD[0], &readFDs);
return self;
}
- (void)_addStreamToObserveForReading: (OFStream*)stream
{
int fd = [stream fileDescriptor];
FD_SET(fd, &readFDs);
FD_SET(fd, &exceptFDs);
}
- (void)_addStreamToObserveForWriting: (OFStream*)stream
{
int fd = [stream fileDescriptor];
FD_SET(fd, &writeFDs);
FD_SET(fd, &exceptFDs);
}
- (void)_removeStreamToObserveForReading: (OFStream*)stream
{
int fd = [stream fileDescriptor];
FD_CLR(fd, &readFDs);
if (!FD_ISSET(fd, &writeFDs))
FD_CLR(fd, &exceptFDs);
}
- (void)_removeStreamToObserveForWriting: (OFStream*)stream
{
int fd = [stream fileDescriptor];
FD_CLR(fd, &writeFDs);
if (!FD_ISSET(fd, &readFDs))
FD_CLR(fd, &exceptFDs);
}
- (BOOL)observeWithTimeout: (int)timeout
{
OFAutoreleasePool *pool = [[OFAutoreleasePool alloc] init];
OFStream **cArray;
fd_set readFDs_;
|
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
writeFDs_ = writeFDs;
exceptFDs_ = exceptFDs;
# endif
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;
if (select(nFDs, &readFDs_, &writeFDs_, &exceptFDs_,
(timeout != -1 ? &time : NULL)) < 1)
return NO;
if (FD_ISSET(cancelFD[0], &readFDs_)) {
char buffer;
#ifndef _WIN32
assert(read(cancelFD[0], &buffer, 1) > 0);
|
|
|
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
writeFDs_ = writeFDs;
exceptFDs_ = exceptFDs;
# endif
time.tv_sec = timeout / 1000;
time.tv_usec = (timeout % 1000) * 1000;
if (select((int)maxFD + 1, &readFDs_, &writeFDs_, &exceptFDs_,
(timeout != -1 ? &time : NULL)) < 1)
return NO;
if (FD_ISSET(cancelFD[0], &readFDs_)) {
char buffer;
#ifndef _WIN32
assert(read(cancelFD[0], &buffer, 1) > 0);
|