OtlpMetricsCollectorService.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.rocketmq.exporter.otlp;

import java.util.ArrayList;
import java.util.List;

import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint.ValueCase;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.prometheus.client.Collector.MetricFamilySamples;
import io.prometheus.client.Collector.MetricFamilySamples.Sample;
import io.prometheus.client.Collector.Type;
import org.apache.rocketmq.exporter.model.common.TwoTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class OtlpMetricsCollectorService extends MetricsServiceGrpc.MetricsServiceImplBase  {

    private final List<MetricFamilySamples> otlpMfs = new ArrayList<>();

    private static final String LABEL_BUCKET_BOUND = "le";

    private final static Logger log = LoggerFactory.getLogger(OtlpMetricsCollectorService.class);

    @Override
    public void export(ExportMetricsServiceRequest request,
        StreamObserver<ExportMetricsServiceResponse> responseObserver) {
        log.info("receive oltp metrics export request...");
        try {
            List<MetricFamilySamples> newMfs = new ArrayList<>();
            collectMetrics(request, newMfs);
            synchronized (otlpMfs) {
                otlpMfs.clear();
                otlpMfs.addAll(newMfs);
            }
        } catch (Exception e) {
            log.error("Unexpected error when exporting otlp metrics, request={}", request, e);
            responseObserver.onError(e);
        }
        responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance());
        responseObserver.onCompleted();
    }

    public void collectOtlpMetrics(List<MetricFamilySamples> mfs) {
        synchronized (otlpMfs) {
            mfs.addAll(otlpMfs);
        }
    }

    private void collectMetrics(ExportMetricsServiceRequest request, List<MetricFamilySamples> mfs) {
        final List<ResourceMetrics> resourceMetricsList = request.getResourceMetricsList();
        for (ResourceMetrics resourceMetrics : resourceMetricsList) {
            final List<ScopeMetrics> scopeMetricsList = resourceMetrics.getScopeMetricsList();
            for (ScopeMetrics scopeMetrics : scopeMetricsList) {
                final List<Metric> metricList = scopeMetrics.getMetricsList();
                for (Metric metric : metricList) {
                    String name = metric.getName();
                    switch (metric.getDataCase()) {
                        case GAUGE: {
                            final List<NumberDataPoint> pointList = metric.getGauge().getDataPointsList();
                            List<Sample> samples = new ArrayList<>();
                            for (NumberDataPoint point : pointList) {
                                TwoTuple<List<String>, List<String>> labelNamesAndValues = getLabelNamesAndValues(point.getAttributesList());
                                double pointValue = ValueCase.AS_DOUBLE == point.getValueCase() ? point.getAsDouble() : point.getAsInt();
                                Sample sample = new Sample(name, labelNamesAndValues.getFirst(), labelNamesAndValues.getSecond(), pointValue);
                                samples.add(sample);
                            }
                            MetricFamilySamples metricFamily = new MetricFamilySamples(name, Type.GAUGE, name, samples);
                            mfs.add(metricFamily);
                            break;
                        }
                        case SUM: {
                            final List<NumberDataPoint> pointList = metric.getSum().getDataPointsList();
                            List<Sample> samples = new ArrayList<>();
                            for (NumberDataPoint point : pointList) {
                                TwoTuple<List<String>, List<String>> labelNamesAndValues = getLabelNamesAndValues(point.getAttributesList());
                                double pointValue = ValueCase.AS_DOUBLE == point.getValueCase() ? point.getAsDouble() : point.getAsInt();
                                Sample sample = new Sample(name, labelNamesAndValues.getFirst(), labelNamesAndValues.getSecond(), pointValue);
                                samples.add(sample);
                            }
                            MetricFamilySamples metricFamily = new MetricFamilySamples(name, Type.COUNTER, name, samples);
                            mfs.add(metricFamily);
                            break;
                        }
                        case HISTOGRAM: {
                            final List<HistogramDataPoint> pointList = metric.getHistogram().getDataPointsList();
                            List<Sample> samples = new ArrayList<>();
                            for (HistogramDataPoint point : pointList) {
                                TwoTuple<List<String>, List<String>> labelNamesAndValues = getLabelNamesAndValues(point.getAttributesList());
                                List<String> labelNames = labelNamesAndValues.getFirst();
                                List<String> labelValues = labelNamesAndValues.getSecond();
                                int boundCount = point.getExplicitBoundsList().size();
                                for (int i = 0; i < boundCount; i++) {
                                    Double bound = point.getExplicitBoundsList().get(i);
                                    List<String> keys = new ArrayList<>(labelNames);
                                    keys.add(LABEL_BUCKET_BOUND);
                                    List<String> values = new ArrayList<>(labelValues);
                                    values.add(String.valueOf(bound));
                                    Long count = point.getBucketCountsList().get(i);
                                    Sample sample = new Sample(name + "_bucket", keys, values, count);
                                    samples.add(sample);
                                }
                                List<String> keys = new ArrayList<>(labelNames);
                                keys.add(LABEL_BUCKET_BOUND);
                                List<String> values = new ArrayList<>(labelValues);
                                values.add("+Inf");
                                Long count = point.getBucketCountsList().get(boundCount);
                                Sample sample = new Sample(name + "_bucket", keys, values, count);
                                samples.add(sample);
                                // count
                                samples.add(new Sample(name + "_count", labelNames, labelValues, point.getCount()));
                                // sum
                                samples.add(new Sample(name + "_sum", labelNames, labelValues, point.getSum()));
                            }
                            MetricFamilySamples metricFamily = new MetricFamilySamples(name, Type.HISTOGRAM, name, samples);
                            mfs.add(metricFamily);
                            break;
                        }
                        default:
                            break;
                    }
                }
            }
        }
    }

    private TwoTuple<List<String>, List<String>> getLabelNamesAndValues(List<KeyValue> attributesList) {
        List<String> labelNames = new ArrayList<>();
        List<String> labelValues = new ArrayList<>();
        for (KeyValue keyValue : attributesList) {
            String key = keyValue.getKey();
            String value = keyValue.getValue().getStringValue();
            labelNames.add(key);
            labelValues.add(value);
        }
        return new TwoTuple<>(labelNames, labelValues);
    }
}