Overview
Comment: | Use OFData instead of a buffer for async writes
This avoids the entire problem of keeping the buffer alive until the |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
f218986f51ac032828395851b54b529a |
User & Date: | js on 2018-12-18 14:14:25 |
Other Links: | manifest | tags |
Context
2018-12-18
| ||
16:41 | Add -[OFStream asyncWriteString:] check-in: 6b35b78f94 user: js tags: trunk | |
14:14 | Use OFData instead of a buffer for async writes check-in: f218986f51 user: js tags: trunk | |
2018-12-11
| ||
22:57 | Include an exception in delegate methods check-in: 064dbe5127 user: js tags: trunk | |
Changes
Modified src/OFHTTPClient.m from [209c55758d] to [c6a81ba86b].
︙ | ︙ | |||
52 53 54 55 56 57 58 | #define REDIRECTS_DEFAULT 10 @interface OFHTTPClientRequestHandler: OFObject <OFTCPSocketDelegate> { @public OFHTTPClient *_client; OFHTTPRequest *_request; | < | 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | #define REDIRECTS_DEFAULT 10 @interface OFHTTPClientRequestHandler: OFObject <OFTCPSocketDelegate> { @public OFHTTPClient *_client; OFHTTPRequest *_request; unsigned int _redirects; id _context; bool _firstLine; OFString *_version; int _status; OFMutableDictionary OF_GENERIC(OFString *, OFString *) *_serverHeaders; } |
︙ | ︙ | |||
275 276 277 278 279 280 281 | return self; } - (void)dealloc { [_client release]; [_request release]; | < | 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | return self; } - (void)dealloc { [_client release]; [_request release]; [_context release]; [_version release]; [_serverHeaders release]; [super dealloc]; } |
︙ | ︙ | |||
553 554 555 556 557 558 559 | [self raiseException: e]; ret = false; } return ret; } | | | | | | | < < < | | | | < | | | | > > | 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 | [self raiseException: e]; ret = false; } return ret; } - (OFData *)stream: (OF_KINDOF(OFStream *))stream didWriteData: (OFData *)data bytesWritten: (size_t)bytesWritten exception: (id)exception { if (exception != nil) { if ([exception isKindOfClass: [OFWriteFailedException class]] && ([exception errNo] == ECONNRESET || [exception errNo] == EPIPE)) { /* In case a keep-alive connection timed out */ [self closeAndReconnect]; return nil; } [self raiseException: exception]; return nil; } _firstLine = true; if ([[_request headers] objectForKey: @"Content-Length"] != nil) { [stream setDelegate: nil]; OFStream *requestBody = [[[OFHTTPClientRequestBodyStream alloc] initWithHandler: self socket: stream] autorelease]; if ([_client->_delegate respondsToSelector: @selector(client:wantsRequestBody:request:context:)]) [_client->_delegate client: _client wantsRequestBody: requestBody request: _request context: _context]; } else [stream asyncReadLine]; return nil; } - (void)handleSocket: (OFTCPSocket *)sock { /* * As a work around for a bug with split packets in lighttpd when using * HTTPS, we construct the complete request in a buffer string and then * send it all at once. * * We do not use the socket's write buffer in case we need to resend * the entire request (e.g. in case a keep-alive connection timed out). */ @try { OFString *requestString = constructRequestString(_request); OFData *requestData = [OFData dataWithItems: [requestString UTF8String] count: [requestString UTF8StringLength]]; [sock asyncWriteData: requestData]; } @catch (id e) { [self raiseException: e]; return; } } - (void)socket: (OF_KINDOF(OFTCPSocket *))sock |
︙ | ︙ |
Modified src/OFRunLoop+Private.h from [985334eabe] to [1b93fde180].
︙ | ︙ | |||
51 52 53 54 55 56 57 | + (void)of_addAsyncReadLineForStream: (OFStream <OFReadyForReadingObserving> *) stream encoding: (of_string_encoding_t)encoding mode: (of_run_loop_mode_t)mode delegate: (id <OFStreamDelegate>)delegate; + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream | < | | 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | + (void)of_addAsyncReadLineForStream: (OFStream <OFReadyForReadingObserving> *) stream encoding: (of_string_encoding_t)encoding mode: (of_run_loop_mode_t)mode delegate: (id <OFStreamDelegate>)delegate; + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream data: (OFData *)data mode: (of_run_loop_mode_t)mode delegate: (id <OFStreamDelegate>)delegate; + (void)of_addAsyncConnectForTCPSocket: (OFTCPSocket *)socket mode: (of_run_loop_mode_t)mode delegate: (id <OFTCPSocketDelegate_Private>) delegate; + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)socket |
︙ | ︙ | |||
93 94 95 96 97 98 99 | + (void)of_addAsyncReadLineForStream: (OFStream <OFReadyForReadingObserving> *) stream encoding: (of_string_encoding_t)encoding mode: (of_run_loop_mode_t)mode block: (of_stream_async_read_line_block_t)block; + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream | < | | 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | + (void)of_addAsyncReadLineForStream: (OFStream <OFReadyForReadingObserving> *) stream encoding: (of_string_encoding_t)encoding mode: (of_run_loop_mode_t)mode block: (of_stream_async_read_line_block_t)block; + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream data: (OFData *)data mode: (of_run_loop_mode_t)mode block: (of_stream_async_write_block_t)block; + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)socket mode: (of_run_loop_mode_t)mode block: (of_tcp_socket_async_accept_block_t) block; + (void)of_addAsyncReceiveForUDPSocket: (OFUDPSocket *)socket |
︙ | ︙ |
Modified src/OFRunLoop.m from [2b70c16414] to [7000e17b8e].
︙ | ︙ | |||
18 19 20 21 22 23 24 25 26 27 28 29 30 31 | #include "config.h" #include <assert.h> #include <errno.h> #import "OFRunLoop.h" #import "OFRunLoop+Private.h" #import "OFDictionary.h" #ifdef OF_HAVE_SOCKETS # import "OFKernelEventObserver.h" # import "OFTCPSocket.h" # import "OFTCPSocket+Private.h" #endif #import "OFThread.h" | > | 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | #include "config.h" #include <assert.h> #include <errno.h> #import "OFRunLoop.h" #import "OFRunLoop+Private.h" #import "OFData.h" #import "OFDictionary.h" #ifdef OF_HAVE_SOCKETS # import "OFKernelEventObserver.h" # import "OFTCPSocket.h" # import "OFTCPSocket+Private.h" #endif #import "OFThread.h" |
︙ | ︙ | |||
114 115 116 117 118 119 120 | @interface OFRunLoop_WriteQueueItem: OFRunLoop_QueueItem { @public # ifdef OF_HAVE_BLOCKS of_stream_async_write_block_t _block; # endif | | | | 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | @interface OFRunLoop_WriteQueueItem: OFRunLoop_QueueItem { @public # ifdef OF_HAVE_BLOCKS of_stream_async_write_block_t _block; # endif OFData *_data; size_t _writtenLength; } @end @interface OFRunLoop_ConnectQueueItem: OFRunLoop_QueueItem @end @interface OFRunLoop_AcceptQueueItem: OFRunLoop_QueueItem |
︙ | ︙ | |||
446 447 448 449 450 451 452 453 454 | @end @implementation OFRunLoop_WriteQueueItem - (bool)handleObject: (id)object { size_t length; id exception = nil; @try { | > > > > | | | | | > > > > | | | | | > > > > < > > > < | 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 | @end @implementation OFRunLoop_WriteQueueItem - (bool)handleObject: (id)object { size_t length; id exception = nil; size_t dataLength = [_data count] * [_data itemSize]; OFData *newData, *oldData; @try { const char *dataItems = [_data items]; length = [object writeBuffer: dataItems + _writtenLength length: dataLength - _writtenLength]; } @catch (id e) { length = 0; exception = e; } _writtenLength += length; if (_writtenLength != dataLength && exception == nil) return true; # ifdef OF_HAVE_BLOCKS if (_block != NULL) { newData = _block(object, _data, _writtenLength, exception); if (newData == nil) return false; oldData = _data; _data = [newData copy]; [oldData release]; _writtenLength = 0; return true; } else { # endif if (![_delegate respondsToSelector: @selector(stream:didWriteData:bytesWritten:exception:)]) return false; newData = [_delegate stream: object didWriteData: _data bytesWritten: _writtenLength exception: exception]; if (newData == nil) return false; oldData = _data; _data = [newData copy]; [oldData release]; _writtenLength = 0; return true; # ifdef OF_HAVE_BLOCKS } # endif } - (void)dealloc { [_data release]; # ifdef OF_HAVE_BLOCKS [_block release]; # endif [super dealloc]; } @end @implementation OFRunLoop_ConnectQueueItem - (bool)handleObject: (id)object { id exception = nil; int errNo; |
︙ | ︙ | |||
767 768 769 770 771 772 773 | queueItem->_delegate = [delegate retain]; queueItem->_encoding = encoding; }) } + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream | < | | < | 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 | queueItem->_delegate = [delegate retain]; queueItem->_encoding = encoding; }) } + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream data: (OFData *)data mode: (of_run_loop_mode_t)mode delegate: (id <OFStreamDelegate>)delegate { ADD_WRITE(OFRunLoop_WriteQueueItem, stream, mode, { queueItem->_delegate = [delegate retain]; queueItem->_data = [data copy]; }) } + (void)of_addAsyncConnectForTCPSocket: (OFTCPSocket *)stream mode: (of_run_loop_mode_t)mode delegate: (id <OFTCPSocketDelegate_Private>) delegate |
︙ | ︙ | |||
869 870 871 872 873 874 875 | queueItem->_block = [block copy]; queueItem->_encoding = encoding; }) } + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream | < | > < < | 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 | queueItem->_block = [block copy]; queueItem->_encoding = encoding; }) } + (void)of_addAsyncWriteForStream: (OFStream <OFReadyForWritingObserving> *) stream data: (OFData *)data mode: (of_run_loop_mode_t)mode block: (of_stream_async_write_block_t)block { ADD_WRITE(OFRunLoop_WriteQueueItem, stream, mode, { queueItem->_data = [data copy]; queueItem->_block = [block copy]; }) } + (void)of_addAsyncAcceptForTCPSocket: (OFTCPSocket *)stream mode: (of_run_loop_mode_t)mode block: (of_tcp_socket_async_accept_block_t)block { |
︙ | ︙ |
Modified src/OFStream.h from [ba57cd6e80] to [a9818d561e].
︙ | ︙ | |||
67 68 69 70 71 72 73 | typedef bool (^of_stream_async_read_line_block_t)(OF_KINDOF(OFStream *) stream, OFString *_Nullable line, id _Nullable exception); /*! * @brief A block which is called when data was written asynchronously to a * stream. * | < | < < | | | < < | | | 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | typedef bool (^of_stream_async_read_line_block_t)(OF_KINDOF(OFStream *) stream, OFString *_Nullable line, id _Nullable exception); /*! * @brief A block which is called when data was written asynchronously to a * stream. * * @param data The data which was written to the stream * @param bytesWritten The number of bytes which have been written. This * matches the length of the specified data on the * asynchronous write if no exception was encountered. * @param exception An exception which occurred while writing or `nil` on * success * @return The data to repeat the write with or nil if it should not repeat */ typedef OFData *_Nullable (^of_stream_async_write_block_t)( OF_KINDOF(OFStream *) stream, OFData *_Nonnull data, size_t bytesWritten, id _Nullable exception); #endif /*! * @protocol OFStreamDelegate OFStream.h ObjFW/OFStream.h * * A delegate for OFStream. |
︙ | ︙ | |||
126 127 128 129 130 131 132 | exception: (nullable id)exception; /*! * @brief This method is called when data was written asynchronously to a * stream. * * @param stream The stream to which data was written | | | < | > | < < | | | | | 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | exception: (nullable id)exception; /*! * @brief This method is called when data was written asynchronously to a * stream. * * @param stream The stream to which data was written * @param data The data which was written to the stream * @param bytesWritten The number of bytes which have been written. This * matches the length of the specified data on the * asynchronous write if no exception was encountered. * @param exception An exception that occurred while writing, or nil on success * @return The data to repeat the write with or nil if it should not repeat */ - (nullable OFData *)stream: (OF_KINDOF(OFStream *))stream didWriteData: (OFData *)data bytesWritten: (size_t)bytesWritten exception: (nullable id)exception; @end /*! * @class OFStream OFStream.h ObjFW/OFStream.h * * @brief A base class for different types of streams. * |
︙ | ︙ | |||
956 957 958 959 960 961 962 | * length in non-blocking mode. */ - (size_t)writeBuffer: (const void *)buffer length: (size_t)length; #ifdef OF_HAVE_SOCKETS /*! | | | < < | < | | < < | < | | < < < | | > | | | < < | < | | | < < | | < < | < | | | 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 | * length in non-blocking mode. */ - (size_t)writeBuffer: (const void *)buffer length: (size_t)length; #ifdef OF_HAVE_SOCKETS /*! * @brief Asynchronously writes data into the stream. * * @note The stream must conform to @ref OFReadyForWritingObserving in order * for this to work! * * @param data The data which is written into the stream */ - (void)asyncWriteData: (OFData *)data; /*! * @brief Asynchronously writes data into the stream. * * @note The stream must conform to @ref OFReadyForWritingObserving in order * for this to work! * * @param data The data which is written into the stream * @param runLoopMode The run loop mode in which to perform the async write */ - (void)asyncWriteData: (OFData *)data runLoopMode: (of_run_loop_mode_t)runLoopMode; # ifdef OF_HAVE_BLOCKS /*! * @brief Asynchronously writes data into the stream. * * @note The stream must conform to @ref OFReadyForWritingObserving in order * for this to work! * * @param data The data which is written into the stream * @param block The block to call when the data has been written. It should * return the data for the next write with the same callback or * nil if it should not repeat. */ - (void)asyncWriteData: (OFData *)data block: (of_stream_async_write_block_t)block; /*! * @brief Asynchronously writes data into the stream. * * @note The stream must conform to @ref OFReadyForWritingObserving in order * for this to work! * * @param data The data which is written into the stream * @param runLoopMode The run loop mode in which to perform the async write * @param block The block to call when the data has been written. It should * return the data for the next write with the same callback or * nil if it should not repeat. */ - (void)asyncWriteData: (OFData *)data runLoopMode: (of_run_loop_mode_t)runLoopMode block: (of_stream_async_write_block_t)block; # endif #endif /*! * @brief Writes a uint8_t into the stream. * * @param int8 A uint8_t |
︙ | ︙ |
Modified src/OFStream.m from [a08f06103a] to [4879805b8b].
︙ | ︙ | |||
1162 1163 1164 1165 1166 1167 1168 | _writeBufferLength += length; return length; } } #ifdef OF_HAVE_SOCKETS | | < | < | | < | | < | < | | < | | | < | | | < | 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 | _writeBufferLength += length; return length; } } #ifdef OF_HAVE_SOCKETS - (void)asyncWriteData: (OFData *)data { [self asyncWriteData: data runLoopMode: of_run_loop_mode_default]; } - (void)asyncWriteData: (OFData *)data runLoopMode: (of_run_loop_mode_t)runLoopMode { OFStream <OFReadyForWritingObserving> *stream = (OFStream <OFReadyForWritingObserving> *)self; [OFRunLoop of_addAsyncWriteForStream: stream data: data mode: runLoopMode delegate: _delegate]; } # ifdef OF_HAVE_BLOCKS - (void)asyncWriteData: (OFData *)data block: (of_stream_async_write_block_t)block { [self asyncWriteData: data runLoopMode: of_run_loop_mode_default block: block]; } - (void)asyncWriteData: (OFData *)data runLoopMode: (of_run_loop_mode_t)runLoopMode block: (of_stream_async_write_block_t)block { OFStream <OFReadyForWritingObserving> *stream = (OFStream <OFReadyForWritingObserving> *)self; [OFRunLoop of_addAsyncWriteForStream: stream data: data mode: runLoopMode block: block]; } # endif #endif - (void)writeInt8: (uint8_t)int8 |
︙ | ︙ |
Modified src/OFTCPSocket.m from [dd4630f033] to [d516c427cd].
︙ | ︙ | |||
359 360 361 362 363 364 365 366 | socketAddresses:context: exception:) context: nil]; } - (void)sendSOCKS5Request { _SOCKS5State = SOCKS5_STATE_SEND_AUTHENTICATION; | > > > | < | | 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 | socketAddresses:context: exception:) context: nil]; } - (void)sendSOCKS5Request { OFData *data = [OFData dataWithItems: "\x05\x01\x00" count: 3]; _SOCKS5State = SOCKS5_STATE_SEND_AUTHENTICATION; [_socket asyncWriteData: data runLoopMode: [[OFRunLoop currentRunLoop] currentMode]]; } - (bool)stream: (OF_KINDOF(OFStream *))sock didReadIntoBuffer: (void *)buffer length: (size_t)length exception: (id)exception { |
︙ | ︙ | |||
415 416 417 418 419 420 421 | port[0] = _port >> 8; port[1] = _port & 0xFF; [_request addItems: port count: 2]; _SOCKS5State = SOCKS5_STATE_SEND_REQUEST; | | < | | 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 | port[0] = _port >> 8; port[1] = _port & 0xFF; [_request addItems: port count: 2]; _SOCKS5State = SOCKS5_STATE_SEND_REQUEST; [_socket asyncWriteData: _request runLoopMode: runLoopMode]; return false; case SOCKS5_STATE_READ_RESPONSE: response = buffer; if (response[0] != 5 || response[2] != 0) { _exception = [[OFConnectionFailedException alloc] initWithHost: _host |
︙ | ︙ | |||
519 520 521 522 523 524 525 | return false; default: assert(0); return false; } } | | | | | | | | | | 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 | return false; default: assert(0); return false; } } - (OFData *)stream: (OF_KINDOF(OFStream *))sock didWriteData: (OFData *)data bytesWritten: (size_t)bytesWritten exception: (id)exception { of_run_loop_mode_t runLoopMode; if (exception != nil) { _exception = [exception retain]; [self didConnect]; return nil; } runLoopMode = [[OFRunLoop currentRunLoop] currentMode]; switch (_SOCKS5State) { case SOCKS5_STATE_SEND_AUTHENTICATION: _SOCKS5State = SOCKS5_STATE_READ_VERSION; [_socket asyncReadIntoBuffer: _buffer exactLength: 2 runLoopMode: runLoopMode]; return nil; case SOCKS5_STATE_SEND_REQUEST: [_request release]; _request = nil; _SOCKS5State = SOCKS5_STATE_READ_RESPONSE; [_socket asyncReadIntoBuffer: _buffer exactLength: 4 runLoopMode: runLoopMode]; return nil; default: assert(0); return nil; } } @end @implementation OFTCPSocket_ConnectDelegate - (void)dealloc { |
︙ | ︙ |