Skip to content

Commit f7fe30a

Browse files
authored
HDDS-11691. Support object tags in ObjectEndpointStreaming#put (apache#7543)
1 parent 9854591 commit f7fe30a

File tree

3 files changed

+60
-3
lines changed

3 files changed

+60
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations under
15+
* the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.s3.awssdk.v1;
19+
20+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
21+
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
22+
import org.apache.hadoop.ozone.OzoneConfigKeys;
23+
import org.apache.hadoop.ozone.om.OMConfigKeys;
24+
import org.junit.jupiter.api.AfterAll;
25+
import org.junit.jupiter.api.BeforeAll;
26+
import org.junit.jupiter.api.Timeout;
27+
28+
import java.io.IOException;
29+
30+
/**
31+
* Tests the AWS S3 SDK basic operations with OM Ratis enabled and Streaming Write Pipeline.
32+
*/
33+
@Timeout(300)
34+
public class TestS3SDKV1WithRatisStreaming extends AbstractS3SDKV1Tests {
35+
36+
@BeforeAll
37+
public static void init() throws Exception {
38+
OzoneConfiguration conf = new OzoneConfiguration();
39+
conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
40+
false);
41+
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
42+
conf.setBoolean(OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
43+
true);
44+
conf.setBoolean(OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, true);
45+
conf.setBoolean(OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED, true);
46+
// Ensure that all writes use datastream
47+
conf.set(OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD, "0MB");
48+
startCluster(conf);
49+
}
50+
51+
@AfterAll
52+
public static void shutdown() throws IOException {
53+
shutdownCluster();
54+
}
55+
}

hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ public Response put(
323323
perf.appendStreamMode();
324324
Pair<String, Long> keyWriteResult = ObjectEndpointStreaming
325325
.put(bucket, keyPath, length, replicationConfig, chunkSize,
326-
customMetadata, digestInputStream, perf);
326+
customMetadata, tags, digestInputStream, perf);
327327
eTag = keyWriteResult.getKey();
328328
putLength = keyWriteResult.getValue();
329329
} else {

hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@ public static Pair<String, Long> put(
6161
OzoneBucket bucket, String keyPath,
6262
long length, ReplicationConfig replicationConfig,
6363
int chunkSize, Map<String, String> keyMetadata,
64+
Map<String, String> tags,
6465
DigestInputStream body, PerformanceStringBuilder perf)
6566
throws IOException, OS3Exception {
6667

6768
try {
6869
return putKeyWithStream(bucket, keyPath,
69-
length, chunkSize, replicationConfig, keyMetadata, body, perf);
70+
length, chunkSize, replicationConfig, keyMetadata, tags, body, perf);
7071
} catch (IOException ex) {
7172
LOG.error("Exception occurred in PutObject", ex);
7273
if (ex instanceof OMException) {
@@ -97,13 +98,14 @@ public static Pair<String, Long> putKeyWithStream(
9798
int bufferSize,
9899
ReplicationConfig replicationConfig,
99100
Map<String, String> keyMetadata,
101+
Map<String, String> tags,
100102
DigestInputStream body, PerformanceStringBuilder perf)
101103
throws IOException {
102104
long startNanos = Time.monotonicNowNanos();
103105
long writeLen;
104106
String eTag;
105107
try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath,
106-
length, replicationConfig, keyMetadata)) {
108+
length, replicationConfig, keyMetadata, tags)) {
107109
long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos);
108110
writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length);
109111
eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())

0 commit comments

Comments
 (0)