@@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
55
55
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
56
56
// occuring.
57
57
private boolean retrying = false ;
58
+ private boolean checkingForLastChunk = false ;
58
59
59
60
boolean isRetrying () {
60
61
return retrying ;
@@ -64,129 +65,141 @@ StorageObject getStorageObject() {
64
65
return storageObject ;
65
66
}
66
67
68
+ private StorageObject transmitChunk (
69
+ int chunkOffset , int chunkLength , long position , boolean last ) {
70
+ return getOptions ()
71
+ .getStorageRpcV1 ()
72
+ .writeWithResponse (getUploadId (), getBuffer (), chunkOffset , position , chunkLength , last );
73
+ }
74
+
75
+ private long getRemotePosition () {
76
+ return getOptions ().getStorageRpcV1 ().getCurrentUploadOffset (getUploadId ());
77
+ }
78
+
79
+ private StorageObject getRemoteStorageObject () {
80
+ return getOptions ().getStorageRpcV1 ().get (getEntity ().toPb (), null );
81
+ }
82
+
83
+ private StorageException unrecoverableState (
84
+ int chunkOffset , int chunkLength , long localPosition , long remotePosition , boolean last ) {
85
+ StringBuilder sb = new StringBuilder ();
86
+ sb .append ("Unable to recover in upload.\n " );
87
+ sb .append (
88
+ "This may be a symptom of multiple clients uploading to the same upload session.\n \n " );
89
+ sb .append ("For debugging purposes:\n " );
90
+ sb .append ("uploadId: " ).append (getUploadId ()).append ('\n' );
91
+ sb .append ("chunkOffset: " ).append (chunkOffset ).append ('\n' );
92
+ sb .append ("chunkLength: " ).append (chunkLength ).append ('\n' );
93
+ sb .append ("localOffset: " ).append (localPosition ).append ('\n' );
94
+ sb .append ("remoteOffset: " ).append (remotePosition ).append ('\n' );
95
+ sb .append ("lastChunk: " ).append (last ).append ("\n \n " );
96
+ return new StorageException (0 , sb .toString ());
97
+ }
98
+
99
+ // Retriable interruption occurred.
100
+ // Variables:
101
+ // chunk = getBuffer()
102
+ // localNextByteOffset == getPosition()
103
+ // chunkSize = getChunkSize()
104
+ //
105
+ // Case 1: localNextByteOffset == remoteNextByteOffset:
106
+ // Retrying the entire chunk
107
+ //
108
+ // Case 2: localNextByteOffset < remoteNextByteOffset
109
+ // && driftOffset < chunkSize:
110
+ // Upload progressed and localNextByteOffset is not in-sync with
111
+ // remoteNextByteOffset and driftOffset is less than chunkSize.
112
+ // driftOffset must be less than chunkSize for it to retry using
113
+ // chunk maintained in memory.
114
+ // Find the driftOffset by subtracting localNextByteOffset from
115
+ // remoteNextByteOffset.
116
+ // Use driftOffset to determine where to restart from using the chunk in
117
+ // memory.
118
+ //
119
+ // Case 3: localNextByteOffset < remoteNextByteOffset
120
+ // && driftOffset == chunkSize:
121
+ // Special case of Case 2.
122
+ // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
123
+ // to the next chunk.
124
+ //
125
+ // Case 4: localNextByteOffset < remoteNextByteOffset
126
+ // && driftOffset > chunkSize:
127
+ // Throw exception as remoteNextByteOffset has drifted beyond the retriable
128
+ // chunk maintained in memory. This is not possible unless there's multiple
129
+ // clients uploading to the same resumable upload session.
130
+ //
131
+ // Case 5: localNextByteOffset > remoteNextByteOffset:
132
+ // For completeness, this case is not possible because it would require retrying
133
+ // a 400 status code which is not allowed.
134
+ //
135
+ // Case 6: remoteNextByteOffset==-1 && last == true
136
+ // Upload is complete and retry occurred in the "last" chunk. Data sent was
137
+ // received by the service.
138
+ //
139
+ // Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk
140
+ // Not last chunk and are not checkingForLastChunk, allow for the client to
141
+ // catch up to final chunk which meets
142
+ // Case 6.
143
+ //
144
+ // Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk
145
+ // Not last chunk and checkingForLastChunk means this is the second time we
146
+ // hit this case, meaning the upload was completed by a different client.
147
+ //
148
+ // Case 9: Only possible if the client local offset continues beyond the remote
149
+ // offset which is not possible.
150
+ //
67
151
@ Override
68
- protected void flushBuffer (final int length , final boolean last ) {
152
+ protected void flushBuffer (final int length , final boolean lastChunk ) {
69
153
try {
70
154
runWithRetries (
71
155
callable (
72
156
new Runnable () {
73
157
@ Override
74
158
public void run () {
159
+ // Get remote offset from API
160
+ final long localPosition = getPosition ();
161
+ // For each request it should be possible to retry from its location in this code
162
+ final long remotePosition = isRetrying () ? getRemotePosition () : getPosition ();
163
+ final int chunkOffset = (int ) (remotePosition - localPosition );
164
+ final int chunkLength = length - chunkOffset ;
165
+ final boolean uploadAlreadyComplete = remotePosition == -1 ;
166
+ // Enable isRetrying state to reduce number of calls to getRemotePosition()
75
167
if (!isRetrying ()) {
76
- // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
77
168
retrying = true ;
169
+ }
170
+ if (uploadAlreadyComplete && lastChunk ) {
171
+ // Case 6
172
+ // Request object metadata if not available
173
+ if (storageObject == null ) {
174
+ storageObject = getRemoteStorageObject ();
175
+ }
176
+ // Verify that with the final chunk we match the blob length
177
+ if (storageObject .getSize ().longValue () != getPosition () + length ) {
178
+ throw unrecoverableState (
179
+ chunkOffset , chunkLength , localPosition , remotePosition , lastChunk );
180
+ }
181
+ retrying = false ;
182
+ } else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk ) {
183
+ // Case 7
184
+ // Make sure this is the second to last chunk.
185
+ checkingForLastChunk = true ;
186
+ // Continue onto next chunk in case this is the last chunk
187
+ } else if (localPosition <= remotePosition && chunkOffset < getChunkSize ()) {
188
+ // Case 1 && Case 2
189
+ // We are in a position to send a chunk
78
190
storageObject =
79
- getOptions ()
80
- .getStorageRpcV1 ()
81
- .writeWithResponse (
82
- getUploadId (), getBuffer (), 0 , getPosition (), length , last );
191
+ transmitChunk (chunkOffset , chunkLength , remotePosition , lastChunk );
192
+ retrying = false ;
193
+ } else if (localPosition < remotePosition && chunkOffset == getChunkSize ()) {
194
+ // Case 3
195
+ // Continue to next chunk to catch up with remotePosition we are one chunk
196
+ // behind
197
+ retrying = false ;
83
198
} else {
84
- // Retriable interruption occurred.
85
- // Variables:
86
- // chunk = getBuffer()
87
- // localNextByteOffset == getPosition()
88
- // chunkSize = getChunkSize()
89
- //
90
- // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
91
- // we are retrying from first chunk start from 0 offset.
92
- //
93
- // Case 2: localNextByteOffset == remoteNextByteOffset:
94
- // Special case of Case 1 when a chunk is retried.
95
- //
96
- // Case 3: localNextByteOffset < remoteNextByteOffset
97
- // && driftOffset < chunkSize:
98
- // Upload progressed and localNextByteOffset is not in-sync with
99
- // remoteNextByteOffset and driftOffset is less than chunkSize.
100
- // driftOffset must be less than chunkSize for it to retry using
101
- // chunk maintained in memory.
102
- // Find the driftOffset by subtracting localNextByteOffset from
103
- // remoteNextByteOffset.
104
- // Use driftOffset to determine where to restart from using the chunk in
105
- // memory.
106
- //
107
- // Case 4: localNextByteOffset < remoteNextByteOffset
108
- // && driftOffset == chunkSize:
109
- // Special case of Case 3.
110
- // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
111
- // to the next chunk.
112
- //
113
- // Case 5: localNextByteOffset < remoteNextByteOffset
114
- // && driftOffset > chunkSize:
115
- // Throw exception as remoteNextByteOffset has drifted beyond the retriable
116
- // chunk maintained in memory. This is not possible unless there's multiple
117
- // clients uploading to the same resumable upload session.
118
- //
119
- // Case 6: localNextByteOffset > remoteNextByteOffset:
120
- // For completeness, this case is not possible because it would require retrying
121
- // a 400 status code which is not allowed.
122
- //
123
- // Case 7: remoteNextByteOffset==-1 && last == true
124
- // Upload is complete and retry occurred in the "last" chunk. Data sent was
125
- // received by the service.
126
- //
127
- // Case 8: remoteNextByteOffset==-1 && last == false
128
- // Upload was completed by another client because this retry did not occur
129
- // during the last chunk.
130
- //
131
- // Get remote offset from API
132
- long remoteNextByteOffset =
133
- getOptions ().getStorageRpcV1 ().getCurrentUploadOffset (getUploadId ());
134
- long localNextByteOffset = getPosition ();
135
- int driftOffset = (int ) (remoteNextByteOffset - localNextByteOffset );
136
- int retryChunkLength = length - driftOffset ;
137
-
138
- if (localNextByteOffset == 0 && remoteNextByteOffset == 0
139
- || localNextByteOffset == remoteNextByteOffset ) {
140
- // Case 1 and 2
141
- storageObject =
142
- getOptions ()
143
- .getStorageRpcV1 ()
144
- .writeWithResponse (
145
- getUploadId (), getBuffer (), 0 , getPosition (), length , last );
146
- } else if (localNextByteOffset < remoteNextByteOffset
147
- && driftOffset < getChunkSize ()) {
148
- // Case 3
149
- storageObject =
150
- getOptions ()
151
- .getStorageRpcV1 ()
152
- .writeWithResponse (
153
- getUploadId (),
154
- getBuffer (),
155
- driftOffset ,
156
- remoteNextByteOffset ,
157
- retryChunkLength ,
158
- last );
159
- } else if (localNextByteOffset < remoteNextByteOffset
160
- && driftOffset == getChunkSize ()) {
161
- // Case 4
162
- // Continue to next chunk
163
- retrying = false ;
164
- return ;
165
- } else if (localNextByteOffset < remoteNextByteOffset
166
- && driftOffset > getChunkSize ()) {
167
- // Case 5
168
- StringBuilder sb = new StringBuilder ();
169
- sb .append (
170
- "Remote offset has progressed beyond starting byte offset of next chunk." );
171
- sb .append (
172
- "This may be a symptom of multiple clients uploading to the same upload session.\n \n " );
173
- sb .append ("For debugging purposes:\n " );
174
- sb .append ("uploadId: " ).append (getUploadId ()).append ('\n' );
175
- sb .append ("localNextByteOffset: " ).append (localNextByteOffset ).append ('\n' );
176
- sb .append ("remoteNextByteOffset: " ).append (remoteNextByteOffset ).append ('\n' );
177
- sb .append ("driftOffset: " ).append (driftOffset ).append ("\n \n " );
178
- throw new StorageException (0 , sb .toString ());
179
- } else if (remoteNextByteOffset == -1 && last ) {
180
- // Case 7
181
- retrying = false ;
182
- return ;
183
- } else if (remoteNextByteOffset == -1 && !last ) {
184
- // Case 8
185
- throw new StorageException (0 , "Resumable upload is already complete." );
186
- }
199
+ // Case 4 && Case 8 && Case 9
200
+ throw unrecoverableState (
201
+ chunkOffset , chunkLength , localPosition , remotePosition , lastChunk );
187
202
}
188
- // Request was successful and retrying state is now disabled.
189
- retrying = false ;
190
203
}
191
204
}),
192
205
getOptions ().getRetrySettings (),
0 commit comments