22
22
import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
23
23
import org .apache .hadoop .hdds .conf .StorageUnit ;
24
24
import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
25
- import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos ;
26
25
import org .apache .hadoop .hdds .scm .OzoneClientConfig ;
27
26
import org .apache .hadoop .hdds .scm .XceiverClientManager ;
28
27
import org .apache .hadoop .hdds .scm .XceiverClientMetrics ;
47
46
import org .junit .jupiter .api .Test ;
48
47
import org .junit .jupiter .api .Timeout ;
49
48
50
- import java .io .IOException ;
51
49
import java .nio .ByteBuffer ;
52
50
import java .util .List ;
53
51
import java .util .UUID ;
54
52
import java .util .concurrent .TimeUnit ;
55
53
56
54
import static java .nio .charset .StandardCharsets .UTF_8 ;
55
+ import static org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos .Type .PutBlock ;
56
+ import static org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos .Type .WriteChunk ;
57
57
import static org .apache .hadoop .hdds .scm .ScmConfigKeys .OZONE_SCM_STALENODE_INTERVAL ;
58
58
import static org .assertj .core .api .Assertions .assertThat ;
59
59
import static org .junit .jupiter .api .Assertions .assertEquals ;
@@ -77,13 +77,6 @@ public class TestBlockDataStreamOutput {
77
77
private static String keyString ;
78
78
private static final DatanodeVersion DN_OLD_VERSION = DatanodeVersion .SEPARATE_RATIS_PORTS_AVAILABLE ;
79
79
80
- /**
81
- * Create a MiniDFSCluster for testing.
82
- * <p>
83
- * Ozone is made active by setting OZONE_ENABLED = true
84
- *
85
- * @throws IOException
86
- */
87
80
@ BeforeAll
88
81
public static void init () throws Exception {
89
82
chunkSize = 100 ;
@@ -120,7 +113,7 @@ public static void init() throws Exception {
120
113
client = OzoneClientFactory .getRpcClient (conf );
121
114
objectStore = client .getObjectStore ();
122
115
keyString = UUID .randomUUID ().toString ();
123
- volumeName = "testblockoutputstream " ;
116
+ volumeName = "testblockdatastreamoutput " ;
124
117
bucketName = volumeName ;
125
118
objectStore .createVolume (volumeName );
126
119
objectStore .getVolume (volumeName ).createBucket (bucketName );
@@ -130,9 +123,6 @@ static String getKeyName() {
130
123
return UUID .randomUUID ().toString ();
131
124
}
132
125
133
- /**
134
- * Shutdown MiniDFSCluster.
135
- */
136
126
@ AfterAll
137
127
public static void shutdown () {
138
128
IOUtils .closeQuietly (client );
@@ -166,6 +156,11 @@ public void testMultiBlockWrite() throws Exception {
166
156
}
167
157
168
158
static void testWrite (int dataLength ) throws Exception {
159
+ XceiverClientMetrics metrics =
160
+ XceiverClientManager .getXceiverClientMetrics ();
161
+ long pendingWriteChunkCount = metrics .getPendingContainerOpCountMetrics (WriteChunk );
162
+ long pendingPutBlockCount = metrics .getPendingContainerOpCountMetrics (PutBlock );
163
+
169
164
String keyName = getKeyName ();
170
165
OzoneDataStreamOutput key = createKey (
171
166
keyName , ReplicationType .RATIS , dataLength );
@@ -174,9 +169,19 @@ static void testWrite(int dataLength) throws Exception {
174
169
// now close the stream, It will update the key length.
175
170
key .close ();
176
171
validateData (keyName , data );
172
+
173
+ assertEquals (pendingPutBlockCount ,
174
+ metrics .getPendingContainerOpCountMetrics (PutBlock ));
175
+ assertEquals (pendingWriteChunkCount ,
176
+ metrics .getPendingContainerOpCountMetrics (WriteChunk ));
177
177
}
178
178
179
179
private void testWriteWithFailure (int dataLength ) throws Exception {
180
+ XceiverClientMetrics metrics =
181
+ XceiverClientManager .getXceiverClientMetrics ();
182
+ long pendingWriteChunkCount = metrics .getPendingContainerOpCountMetrics (WriteChunk );
183
+ long pendingPutBlockCount = metrics .getPendingContainerOpCountMetrics (PutBlock );
184
+
180
185
String keyName = getKeyName ();
181
186
OzoneDataStreamOutput key = createKey (
182
187
keyName , ReplicationType .RATIS , dataLength );
@@ -195,32 +200,50 @@ private void testWriteWithFailure(int dataLength) throws Exception {
195
200
key .close ();
196
201
String dataString = new String (data , UTF_8 );
197
202
validateData (keyName , dataString .concat (dataString ).getBytes (UTF_8 ));
203
+
204
+ assertEquals (pendingPutBlockCount ,
205
+ metrics .getPendingContainerOpCountMetrics (PutBlock ));
206
+ assertEquals (pendingWriteChunkCount ,
207
+ metrics .getPendingContainerOpCountMetrics (WriteChunk ));
198
208
}
199
209
200
210
@ Test
201
211
public void testPutBlockAtBoundary () throws Exception {
202
- int dataLength = 500 ;
212
+ int dataLength = maxFlushSize + 100 ;
203
213
XceiverClientMetrics metrics =
204
214
XceiverClientManager .getXceiverClientMetrics ();
205
- long putBlockCount = metrics .getContainerOpCountMetrics (
206
- ContainerProtos .Type .PutBlock );
207
- long pendingPutBlockCount = metrics .getPendingContainerOpCountMetrics (
208
- ContainerProtos .Type .PutBlock );
215
+ long writeChunkCount = metrics .getContainerOpCountMetrics (WriteChunk );
216
+ long putBlockCount = metrics .getContainerOpCountMetrics (PutBlock );
217
+ long pendingWriteChunkCount = metrics .getPendingContainerOpCountMetrics (WriteChunk );
218
+ long pendingPutBlockCount = metrics .getPendingContainerOpCountMetrics (PutBlock );
219
+ long totalOpCount = metrics .getTotalOpCount ();
220
+
209
221
String keyName = getKeyName ();
210
222
OzoneDataStreamOutput key = createKey (
211
223
keyName , ReplicationType .RATIS , 0 );
212
224
byte [] data =
213
225
ContainerTestHelper .getFixedLengthString (keyString , dataLength )
214
226
.getBytes (UTF_8 );
215
227
key .write (ByteBuffer .wrap (data ));
216
- assertThat (metrics .getPendingContainerOpCountMetrics (ContainerProtos . Type . PutBlock ))
228
+ assertThat (metrics .getPendingContainerOpCountMetrics (PutBlock ))
217
229
.isLessThanOrEqualTo (pendingPutBlockCount + 1 );
230
+ assertThat (metrics .getPendingContainerOpCountMetrics (WriteChunk ))
231
+ .isLessThanOrEqualTo (pendingWriteChunkCount + 5 );
218
232
key .close ();
219
233
// Since data length is 500 , first putBlock will be at 400(flush boundary)
220
234
// and the other at 500
221
- assertEquals (
222
- metrics .getContainerOpCountMetrics (ContainerProtos .Type .PutBlock ),
223
- putBlockCount + 2 );
235
+ assertEquals (putBlockCount + 2 ,
236
+ metrics .getContainerOpCountMetrics (PutBlock ));
237
+ // Each chunk is 100 so there will be 500 / 100 = 5 chunks.
238
+ assertEquals (writeChunkCount + 5 ,
239
+ metrics .getContainerOpCountMetrics (WriteChunk ));
240
+ assertEquals (totalOpCount + 7 ,
241
+ metrics .getTotalOpCount ());
242
+ assertEquals (pendingPutBlockCount ,
243
+ metrics .getPendingContainerOpCountMetrics (PutBlock ));
244
+ assertEquals (pendingWriteChunkCount ,
245
+ metrics .getPendingContainerOpCountMetrics (WriteChunk ));
246
+
224
247
validateData (keyName , data );
225
248
}
226
249
@@ -242,20 +265,22 @@ public void testMinPacketSize() throws Exception {
242
265
XceiverClientMetrics metrics =
243
266
XceiverClientManager .getXceiverClientMetrics ();
244
267
OzoneDataStreamOutput key = createKey (keyName , ReplicationType .RATIS , 0 );
245
- long writeChunkCount =
246
- metrics .getContainerOpCountMetrics ( ContainerProtos . Type . WriteChunk );
268
+ long writeChunkCount = metrics . getContainerOpCountMetrics ( WriteChunk );
269
+ long pendingWriteChunkCount = metrics .getPendingContainerOpCountMetrics ( WriteChunk );
247
270
byte [] data =
248
271
ContainerTestHelper .getFixedLengthString (keyString , chunkSize / 2 )
249
272
.getBytes (UTF_8 );
250
273
key .write (ByteBuffer .wrap (data ));
251
274
// minPacketSize= 100, so first write of 50 wont trigger a writeChunk
252
275
assertEquals (writeChunkCount ,
253
- metrics .getContainerOpCountMetrics (ContainerProtos . Type . WriteChunk ));
276
+ metrics .getContainerOpCountMetrics (WriteChunk ));
254
277
key .write (ByteBuffer .wrap (data ));
255
278
assertEquals (writeChunkCount + 1 ,
256
- metrics .getContainerOpCountMetrics (ContainerProtos . Type . WriteChunk ));
279
+ metrics .getContainerOpCountMetrics (WriteChunk ));
257
280
// now close the stream, It will update the key length.
258
281
key .close ();
282
+ assertEquals (pendingWriteChunkCount ,
283
+ metrics .getPendingContainerOpCountMetrics (WriteChunk ));
259
284
String dataString = new String (data , UTF_8 );
260
285
validateData (keyName , dataString .concat (dataString ).getBytes (UTF_8 ));
261
286
}
0 commit comments