diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java new file mode 100644 index 00000000000..127166c8cd8 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java @@ -0,0 +1,138 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import io.delta.kernel.metrics.MetricsReport; +import io.delta.kernel.metrics.SnapshotMetricsResult; +import io.delta.kernel.metrics.SnapshotReport; +import java.io.IOException; +import java.util.Optional; + +/** Defines JSON serializers for {@link MetricsReport} types */ +public final class MetricsReportSerializers { + + ///////////////// + // Public APIs // + ///////////////// + + /** + * Serializes a {@link SnapshotReport} to a JSON string + * + * @throws JsonProcessingException + */ + public static String serializeSnapshotReport(SnapshotReport snapshotReport) + throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsString(snapshotReport); + } + + ///////////////////////////////// + // Private fields and methods // + //////////////////////////////// + + private static final ObjectMapper OBJECT_MAPPER = + new ObjectMapper() + .registerModule( + new SimpleModule() + .addSerializer(SnapshotReport.class, new SnapshotReportSerializer())); + + private MetricsReportSerializers() {} + + ///////////////// + // Serializers // + //////////////// + + static class SnapshotReportSerializer extends StdSerializer { + + SnapshotReportSerializer() { + super(SnapshotReport.class); + } + + @Override + public void serialize( + SnapshotReport snapshotReport, JsonGenerator gen, SerializerProvider provider) + throws IOException { + gen.writeStartObject(); + gen.writeStringField("tablePath", snapshotReport.tablePath()); + gen.writeStringField("operationType", snapshotReport.operationType()); + gen.writeStringField("reportUUID", snapshotReport.reportUUID().toString()); + writeOptionalField( + "version", snapshotReport.version(), item -> gen.writeNumberField("version", item), gen); + writeOptionalField( + "providedTimestamp", + snapshotReport.providedTimestamp(), + item -> gen.writeNumberField("providedTimestamp", item), + gen); + gen.writeFieldName("snapshotMetrics"); + writeSnapshotMetrics(snapshotReport.snapshotMetrics(), gen); + writeOptionalField( + "exception", + snapshotReport.exception(), + item -> gen.writeStringField("exception", item.toString()), + gen); + gen.writeEndObject(); + } + + private void writeSnapshotMetrics(SnapshotMetricsResult snapshotMetrics, JsonGenerator gen) + throws IOException { + gen.writeStartObject(); + writeOptionalField( + "timestampToVersionResolutionDuration", + snapshotMetrics.timestampToVersionResolutionDuration(), + item -> gen.writeNumberField("timestampToVersionResolutionDuration", item), + gen); + gen.writeNumberField( + "loadProtocolAndMetadataDuration", snapshotMetrics.loadInitialDeltaActionsDuration()); + gen.writeEndObject(); + } + } + + ////////////////////////////////// + // Helper fx for serialization // + ///////////////////////////////// + + /** + * For an optional item - If it is empty, writes out a null value - If it is non-empty, writes the + * items value using the provided nonNullConsumer + * + * @param fieldName name of the field to write + * @param item optional item + * @param nonNullConsumer consumes an items non-null value + * @throws IOException + */ + private static void writeOptionalField( + String fieldName, + Optional item, + ConsumerThrowsIOException nonNullConsumer, + JsonGenerator gen) + throws IOException { + if (item.isPresent()) { + nonNullConsumer.accept(item.get()); + } else { + gen.writeNullField(fieldName); + } + } + + // Need to create custom consumer so we can propagate the IOException without wrapping it + private interface ConsumerThrowsIOException { + void accept(T t) throws IOException; + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala new file mode 100644 index 00000000000..51dc3b884e2 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala @@ -0,0 +1,105 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics + +import java.util.Optional + +import io.delta.kernel.metrics.SnapshotReport +import org.scalatest.funsuite.AnyFunSuite + +class MetricsReportSerializerSuite extends AnyFunSuite { + + private def optionToString[T](option: Optional[T]): String = { + if (option.isPresent) { + if (option.get().isInstanceOf[String]) { + s""""${option.get()}"""" // For string objects wrap with quotes + } else { + option.get().toString + } + } else { + "null" + } + } + + def testSnapshotReport(snapshotReport: SnapshotReport): Unit = { + val timestampToVersionResolutionDuration = optionToString( + snapshotReport.snapshotMetrics().timestampToVersionResolutionDuration()) + val loadProtocolAndMetadataDuration = + snapshotReport.snapshotMetrics().loadInitialDeltaActionsDuration() + val exception: Optional[String] = snapshotReport.exception().map(_.toString) + val expectedJson = + s""" + |{"tablePath":"${snapshotReport.tablePath()}", + |"operationType":"Snapshot", + |"reportUUID":"${snapshotReport.reportUUID()}", + |"version":${optionToString(snapshotReport.version())}, + |"providedTimestamp":${optionToString(snapshotReport.providedTimestamp())}, + |"snapshotMetrics":{ + |"timestampToVersionResolutionDuration":${timestampToVersionResolutionDuration}, + |"loadProtocolAndMetadataDuration":${loadProtocolAndMetadataDuration} + |}, + |"exception":${optionToString(exception)} + |} + |""".stripMargin.replaceAll("\n", "") + assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport)) + } + + test("SnapshotReport serializer") { + val snapshotMetrics1 = new SnapshotMetrics() + snapshotMetrics1.timestampToVersionResolutionDuration.record(10) + snapshotMetrics1.loadProtocolAndMetadataDuration.record(1000) + val exception = new RuntimeException("something something failed") + + val snapshotReport1 = new SnapshotReportImpl( + "/table/path", + Optional.of(1), + Optional.of(0), + snapshotMetrics1, + Optional.of(exception) + ) + + // Manually check expected JSON + val expectedJson = + s""" + |{"tablePath":"/table/path", + |"operationType":"Snapshot", + |"reportUUID":"${snapshotReport1.reportUUID()}", + |"version":1, + |"providedTimestamp":0, + |"snapshotMetrics":{ + |"timestampToVersionResolutionDuration":10, + |"loadProtocolAndMetadataDuration":1000 + |}, + |"exception":"$exception" + |} + |""".stripMargin.replaceAll("\n", "") + assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1)) + + // Check with test function + testSnapshotReport(snapshotReport1) + + // Empty options for all possible fields + val snapshotMetrics2 = new SnapshotMetrics() + val snapshotReport2 = new SnapshotReportImpl( + "/table/path", + Optional.empty(), + Optional.empty(), + snapshotMetrics2, + Optional.empty() + ) + testSnapshotReport(snapshotReport2) + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java index 610b9d62a55..0eac963d701 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultEngine.java @@ -16,6 +16,8 @@ package io.delta.kernel.defaults.engine; import io.delta.kernel.engine.*; +import java.util.Collections; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -53,6 +55,11 @@ public CommitCoordinatorClientHandler getCommitCoordinatorClientHandler( return new DefaultCommitCoordinatorClientHandler(hadoopConf, name, conf); } + @Override + public List getMetricsReporters() { + return Collections.singletonList(new LoggingMetricsReporter()); + }; + /** * Create an instance of {@link DefaultEngine}. * diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java new file mode 100644 index 00000000000..7b987b45b46 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java @@ -0,0 +1,51 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.engine; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.delta.kernel.engine.MetricsReporter; +import io.delta.kernel.internal.metrics.MetricsReportSerializers; +import io.delta.kernel.internal.snapshot.SnapshotManager; +import io.delta.kernel.metrics.MetricsReport; +import io.delta.kernel.metrics.SnapshotReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link MetricsReporter} that logs the reports (as JSON) to Log4J at the info + * level. + */ +public class LoggingMetricsReporter implements MetricsReporter { + + private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); + + @Override + public void report(MetricsReport report) { + try { + if (report instanceof SnapshotReport) { + logger.info( + "SnapshotReport = %s", + MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report)); + } else { + logger.info( + "%s = [%s does not support serializing this type of MetricReport]", + report.getClass(), this.getClass()); + } + } catch (JsonProcessingException e) { + logger.info("Encountered exception while serializing report %s: %s", report, e); + } + } +}