Skip to content

Commit 85d6b80

Browse files
DNVindhyasergiitk
authored andcommitted
gcp-observability: update observability logging proto (#9608)
1 parent 615e820 commit 85d6b80

15 files changed

+588
-691
lines changed

gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import io.grpc.gcp.observability.interceptors.LogHelper;
3636
import io.grpc.gcp.observability.logging.GcpLogSink;
3737
import io.grpc.gcp.observability.logging.Sink;
38-
import io.grpc.internal.TimeProvider;
3938
import io.opencensus.common.Duration;
4039
import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
4140
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
@@ -79,8 +78,8 @@ public static synchronized GcpObservability grpcInit() throws IOException {
7978
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
8079
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
8180
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
82-
observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE);
83-
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
81+
SERVICES_TO_EXCLUDE);
82+
LogHelper helper = new LogHelper(sink);
8483
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
8584
instance = grpcInit(sink, observabilityConfig,
8685
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),

gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ public interface ObservabilityConfig {
3636
/** Get destination project ID - where logs will go. */
3737
String getDestinationProjectId();
3838

39-
/** Get message count threshold to flush - flush once message count is reached. */
40-
Long getFlushMessageCount();
41-
4239
/** Get filters set for logging. */
4340
List<LogFilter> getLogFilters();
4441

gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
4545
private boolean enableCloudMonitoring = false;
4646
private boolean enableCloudTracing = false;
4747
private String destinationProjectId = null;
48-
private Long flushMessageCount = null;
4948
private List<LogFilter> logFilters;
5049
private List<EventType> eventTypes;
5150
private Sampler sampler;
@@ -87,7 +86,6 @@ private void parseConfig(Map<String, ?> config) {
8786
enableCloudTracing = value;
8887
}
8988
destinationProjectId = JsonUtil.getString(config, "destination_project_id");
90-
flushMessageCount = JsonUtil.getNumberAsLong(config, "flush_message_count");
9189
List<?> rawList = JsonUtil.getList(config, "log_filters");
9290
if (rawList != null) {
9391
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
@@ -102,7 +100,7 @@ private void parseConfig(Map<String, ?> config) {
102100
List<String> jsonEventTypes = JsonUtil.checkStringList(rawList);
103101
ImmutableList.Builder<EventType> eventTypesBuilder = new ImmutableList.Builder<>();
104102
for (String jsonEventType : jsonEventTypes) {
105-
eventTypesBuilder.add(convertEventType(jsonEventType));
103+
eventTypesBuilder.add(EventType.valueOf(jsonEventType));
106104
}
107105
this.eventTypes = eventTypesBuilder.build();
108106
}
@@ -136,28 +134,6 @@ private void parseConfig(Map<String, ?> config) {
136134
}
137135
}
138136

139-
private EventType convertEventType(String val) {
140-
switch (val) {
141-
case "GRPC_CALL_UNKNOWN":
142-
return EventType.GRPC_CALL_UNKNOWN;
143-
case "GRPC_CALL_REQUEST_HEADER":
144-
return EventType.GRPC_CALL_REQUEST_HEADER;
145-
case "GRPC_CALL_RESPONSE_HEADER":
146-
return EventType.GRPC_CALL_RESPONSE_HEADER;
147-
case "GRPC_CALL_REQUEST_MESSAGE":
148-
return EventType.GRPC_CALL_REQUEST_MESSAGE;
149-
case "GRPC_CALL_RESPONSE_MESSAGE":
150-
return EventType.GRPC_CALL_RESPONSE_MESSAGE;
151-
case "GRPC_CALL_TRAILER":
152-
return EventType.GRPC_CALL_TRAILER;
153-
case "GRPC_CALL_HALF_CLOSE":
154-
return EventType.GRPC_CALL_HALF_CLOSE;
155-
case "GRPC_CALL_CANCEL":
156-
return EventType.GRPC_CALL_CANCEL;
157-
default:
158-
throw new IllegalArgumentException("Unknown event type value:" + val);
159-
}
160-
}
161137

162138
private LogFilter parseJsonLogFilter(Map<String, ?> logFilterMap) {
163139
return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"),
@@ -185,11 +161,6 @@ public String getDestinationProjectId() {
185161
return destinationProjectId;
186162
}
187163

188-
@Override
189-
public Long getFlushMessageCount() {
190-
return flushMessageCount;
191-
}
192-
193164
@Override
194165
public List<LogFilter> getLogFilters() {
195166
return logFilters;

gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/InternalLoggingChannelInterceptor.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
8585
CallOptions callOptions, Channel next) {
8686

8787
final AtomicLong seq = new AtomicLong(1);
88-
final String rpcId = UUID.randomUUID().toString();
88+
final String callId = UUID.randomUUID().toString();
8989
final String authority = next.authority();
9090
final String serviceName = method.getServiceName();
9191
final String methodName = method.getBareMethodName();
@@ -105,24 +105,24 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT
105105

106106
@Override
107107
public void start(Listener<RespT> responseListener, Metadata headers) {
108-
// Event: EventType.GRPC_CALL_REQUEST_HEADER
108+
// Event: EventType.CLIENT_HEADER
109109
// The timeout should reflect the time remaining when the call is started, so compute
110110
// remaining time here.
111111
final Duration timeout = deadline == null ? null
112112
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
113113

114-
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) {
114+
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) {
115115
try {
116-
helper.logRequestHeader(
116+
helper.logClientHeader(
117117
seq.getAndIncrement(),
118118
serviceName,
119119
methodName,
120120
authority,
121121
timeout,
122122
headers,
123123
maxHeaderBytes,
124-
EventLogger.LOGGER_CLIENT,
125-
rpcId,
124+
EventLogger.CLIENT,
125+
callId,
126126
null);
127127
} catch (Exception e) {
128128
// Catching generic exceptions instead of specific ones for all the events.
@@ -139,19 +139,20 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
139139
new SimpleForwardingClientCallListener<RespT>(responseListener) {
140140
@Override
141141
public void onMessage(RespT message) {
142-
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
143-
EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE;
142+
// Event: EventType.SERVER_MESSAGE
143+
EventType responseMessageType = EventType.SERVER_MESSAGE;
144144
if (filterHelper.isEventToBeLogged(responseMessageType)) {
145145
try {
146146
helper.logRpcMessage(
147147
seq.getAndIncrement(),
148148
serviceName,
149149
methodName,
150+
authority,
150151
responseMessageType,
151152
message,
152153
maxMessageBytes,
153-
EventLogger.LOGGER_CLIENT,
154-
rpcId);
154+
EventLogger.CLIENT,
155+
callId);
155156
} catch (Exception e) {
156157
logger.log(Level.SEVERE, "Unable to log response message", e);
157158
}
@@ -161,17 +162,18 @@ public void onMessage(RespT message) {
161162

162163
@Override
163164
public void onHeaders(Metadata headers) {
164-
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
165-
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) {
165+
// Event: EventType.SERVER_HEADER
166+
if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) {
166167
try {
167-
helper.logResponseHeader(
168+
helper.logServerHeader(
168169
seq.getAndIncrement(),
169170
serviceName,
170171
methodName,
172+
authority,
171173
headers,
172174
maxHeaderBytes,
173-
EventLogger.LOGGER_CLIENT,
174-
rpcId,
175+
EventLogger.CLIENT,
176+
callId,
175177
LogHelper.getPeerAddress(getAttributes()));
176178
} catch (Exception e) {
177179
logger.log(Level.SEVERE, "Unable to log response header", e);
@@ -182,18 +184,19 @@ public void onHeaders(Metadata headers) {
182184

183185
@Override
184186
public void onClose(Status status, Metadata trailers) {
185-
// Event: EventType.GRPC_CALL_TRAILER
186-
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) {
187+
// Event: EventType.SERVER_TRAILER
188+
if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) {
187189
try {
188190
helper.logTrailer(
189191
seq.getAndIncrement(),
190192
serviceName,
191193
methodName,
194+
authority,
192195
status,
193196
trailers,
194197
maxHeaderBytes,
195-
EventLogger.LOGGER_CLIENT,
196-
rpcId,
198+
EventLogger.CLIENT,
199+
callId,
197200
LogHelper.getPeerAddress(getAttributes()));
198201
} catch (Exception e) {
199202
logger.log(Level.SEVERE, "Unable to log trailer", e);
@@ -207,19 +210,20 @@ public void onClose(Status status, Metadata trailers) {
207210

208211
@Override
209212
public void sendMessage(ReqT message) {
210-
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
211-
EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE;
213+
// Event: EventType.CLIENT_MESSAGE
214+
EventType requestMessageType = EventType.CLIENT_MESSAGE;
212215
if (filterHelper.isEventToBeLogged(requestMessageType)) {
213216
try {
214217
helper.logRpcMessage(
215218
seq.getAndIncrement(),
216219
serviceName,
217220
methodName,
221+
authority,
218222
requestMessageType,
219223
message,
220224
maxMessageBytes,
221-
EventLogger.LOGGER_CLIENT,
222-
rpcId);
225+
EventLogger.CLIENT,
226+
callId);
223227
} catch (Exception e) {
224228
logger.log(Level.SEVERE, "Unable to log request message", e);
225229
}
@@ -229,15 +233,16 @@ public void sendMessage(ReqT message) {
229233

230234
@Override
231235
public void halfClose() {
232-
// Event: EventType.GRPC_CALL_HALF_CLOSE
233-
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) {
236+
// Event: EventType.CLIENT_HALF_CLOSE
237+
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) {
234238
try {
235239
helper.logHalfClose(
236240
seq.getAndIncrement(),
237241
serviceName,
238242
methodName,
239-
EventLogger.LOGGER_CLIENT,
240-
rpcId);
243+
authority,
244+
EventLogger.CLIENT,
245+
callId);
241246
} catch (Exception e) {
242247
logger.log(Level.SEVERE, "Unable to log half close", e);
243248
}
@@ -247,15 +252,16 @@ public void halfClose() {
247252

248253
@Override
249254
public void cancel(String message, Throwable cause) {
250-
// Event: EventType.GRPC_CALL_CANCEL
251-
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) {
255+
// Event: EventType.CANCEL
256+
if (filterHelper.isEventToBeLogged(EventType.CANCEL)) {
252257
try {
253258
helper.logCancel(
254259
seq.getAndIncrement(),
255260
serviceName,
256261
methodName,
257-
EventLogger.LOGGER_CLIENT,
258-
rpcId);
262+
authority,
263+
EventLogger.CLIENT,
264+
callId);
259265
} catch (Exception e) {
260266
logger.log(Level.SEVERE, "Unable to log cancel", e);
261267
}

0 commit comments

Comments
 (0)