@@ -48,25 +48,24 @@ class IPCBase:
48
48
connection = None # type: _IPCHandle
49
49
50
50
def __init__ (self , name : str , timeout : Optional [float ]) -> None :
51
- self .READ_SIZE = 100000
52
51
self .name = name
53
52
self .timeout = timeout
54
53
55
- def read (self ) -> bytes :
54
+ def read (self , size : int = 100000 ) -> bytes :
56
55
"""Read bytes from an IPC connection until its empty."""
57
56
bdata = bytearray ()
58
57
if sys .platform == 'win32' :
59
58
while True :
60
- ov , err = _winapi .ReadFile (self .connection , self . READ_SIZE , overlapped = True )
59
+ ov , err = _winapi .ReadFile (self .connection , size , overlapped = True )
61
60
# TODO: remove once typeshed supports Literal types
62
61
assert isinstance (ov , _winapi .Overlapped )
63
62
assert isinstance (err , int )
64
63
try :
65
- if err != 0 :
66
- assert err == _winapi .ERROR_IO_PENDING
64
+ if err == _winapi .ERROR_IO_PENDING :
67
65
timeout = int (self .timeout * 1000 ) if self .timeout else _winapi .INFINITE
68
66
res = _winapi .WaitForSingleObject (ov .event , timeout )
69
- assert res == _winapi .WAIT_OBJECT_0
67
+ if res != _winapi .WAIT_OBJECT_0 :
68
+ raise IPCException ("Bad result from I/O wait: {}" .format (res ))
70
69
except BaseException :
71
70
ov .cancel ()
72
71
raise
@@ -77,11 +76,14 @@ def read(self) -> bytes:
77
76
if err == 0 :
78
77
# we are done!
79
78
break
79
+ elif err == _winapi .ERROR_MORE_DATA :
80
+ # read again
81
+ continue
80
82
elif err == _winapi .ERROR_OPERATION_ABORTED :
81
83
raise IPCException ("ReadFile operation aborted." )
82
84
else :
83
85
while True :
84
- more = self .connection .recv (self . READ_SIZE )
86
+ more = self .connection .recv (size )
85
87
if not more :
86
88
break
87
89
bdata .extend (more )
@@ -96,16 +98,18 @@ def write(self, data: bytes) -> None:
96
98
assert isinstance (ov , _winapi .Overlapped )
97
99
assert isinstance (err , int )
98
100
try :
99
- if err != 0 :
100
- assert err == _winapi .ERROR_IO_PENDING
101
+ if err == _winapi .ERROR_IO_PENDING :
101
102
timeout = int (self .timeout * 1000 ) if self .timeout else _winapi .INFINITE
102
103
res = _winapi .WaitForSingleObject (ov .event , timeout )
103
- assert res == _winapi .WAIT_OBJECT_0
104
+ if res != _winapi .WAIT_OBJECT_0 :
105
+ raise IPCException ("Bad result from I/O wait: {}" .format (res ))
106
+ elif err != 0 :
107
+ raise IPCException ("Failed writing to pipe with error: {}" .format (err ))
104
108
except BaseException :
105
109
ov .cancel ()
106
110
raise
107
111
bytes_written , err = ov .GetOverlappedResult (True )
108
- assert err == 0
112
+ assert err == 0 , err
109
113
assert bytes_written == len (data )
110
114
except WindowsError as e :
111
115
raise IPCException ("Failed to write with error: {}" .format (e .winerror ))
0 commit comments