Owners:
Related Tickets:
Currently, within thanos, the only way to measure query latency is to look at the http_request_duration_seconds
on the query
and query_range
HTTP handlers.
There’s a few reason why measuring the latency on the query and query_range endpoints is a poor indicator of how much ‘work’ Thanos is doing. Thanos will fan-out the ‘select’ phase of the query to each of its store targets (other stores, sidecars etc.) streaming the series from each target over the network, before executing the query on the resulting stream. This means that we actually touch far more series over the network than is returned to the final query response.
An ideal metric for query performance must include the total number of samples and series retrieved remotely before the query is executed.
Mercifully, we already capture these stats in tracing spans from the query path. Unfortunately, traces are sampled and are difficult to query reliably for SLI’s. My suggestion is that we add a new histogram, thanos_store_query_duration_seconds
that persists the total elapsed time for the query, with partitions(labels) for series_le
and samples_le
attached to each observation to allow for querying for a particular dimension (e.g. thanos_query_duration_seconds
for the 0.99
request duration quantile with > 1,000,000 samples
returned). Ideally we would represent this with an N-dimensional histogram, but this is not possible with prometheus.
As the storepb.SeriesStatsCounter
already trackes this information in thanos/store/proxy.go
, we will have to write an aggregator in the same file that can sum the series/samples for each ‘query’. As one thanos query is translated into multiple, fan out queries and aggregated into a single response, we will need to do this once for all the queries here.
Yes, in fact if we used the unique samples/series I suspect this metric would be totally useless. Instead we should define a set of 5-10 distinct ’t-shirt size’ buckets for the samples/series dimensions.
e.g. Exponential Scale
Samples:
s <= 1000 samples, m <= 10_000 samples, l <= 100_000 samples, xl > 1_000_000 samples
Series:
s <= 10 series, m <= 100 series, m <= 1000 series, l <= 10_000 series, xl <= 100_000 series
So if a query returns 50,000 samples
spanning 10 series
in 32s
it would be an observation of 32s
with the labels sample-size=l, series-size=s
.
This would allow us to define query SLI’s with respect to a specific number of series/samples and mean response time
e.g. 90% of queries for up to 1,000,000 samples and up to 10 series complete in less than 2s.
This is my primary concern. Given the countless permutations of node size, data distribution and thanos topologies that might influence the query sizes and response times, it is unlikely any set thresholds will be appropriate for all deployments of Thanos. As a result, the ’t-shirt sizes’ will have to be configurable (possibly via runtime args) with some sensible defaults to allow users to tweak them. The obvious caveat being that if this metric is recorded, any changes to the bucketing would render previous data corrupted/incomprehensible. I would like guidance here if possible.
Suggested flags for configuring query performance histogram quantiles
// Buckets for labelling query performance metrics with respect to duration of query
--query.telemetry.request-duration-seconds-quantiles = [ 0.1, 0.25, 0.75, 1.25, 1.75, 2.5, 3, 5, 10 ]
// Buckets for labelling query performance metrics with respect to number of samples
--query.telemetry.sample-quantiles = [ 100, 1_000, 10_000, 100_000, 1_000_000 ]
// Buckets for labelling query performance metrics with respect to number of series
--query.telemetry.series-quantiles = [ 10, 100, 1000, 10_000, 100_000 ]
With our new metric we:
Series
request level so that our observations include allseriesStats
merging mutex for each incoming response, so maintaining a central seriesStats
reference and passing it into each of the proxied store requests is out of the questionPromQL engine accepts a storage.Queryable
which denotes the source of series data in a particular query/query engine instance. The implementation of the thanos query store proxy implements this interface, providing an API for aggregating series across disparate store targets registered to the store instance.
As the ‘shape’ of the query is a consequence of our storage.Queryable
implementation (backed by the proxy store API), there is no way to pass the SeriesStats
through the PromQL engine query exec in the Thanos query path without changing the Querier
prometheus interface.
Example of how upstream Querier
interface change would look like if we included the series stats (or some generic representation of SelectFnStats):
// Querier provides querying access over time series data of a fixed time range.
type Querier interface {
LabelQuerier
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, SeriesStats)
}
By amending the prometheus storage.Querier
interface to include the SeriesStats
(or some form of it) alongside the SeriesSet
when a Select
is performed, all Queriers
must return stats alongside selects (which may be a good thing but a breaking API change). I want to explore doing this in upstream Prometheus, but the below implementation is an intermediate step to see if this approach is useful at all in capturing the select phase shape/latencies independently.
Due to the limitations of the prom Querier API, we can instead use the reporter pattern and override the QueryableCreator
constructor to take an extra function parameter that exfiltrates the series stats from the Select
function and submits the query time observation in the API query handler.
tl;dr: Longer term to capture the entire query path by amending the Prometheus Querier API to return some stats alongside the query, and creating this generic metric inside the Prometheus PromQL engine. Short term, pass a func parameter to the Queryable constructor for the proxy StoreAPI querier that will exfiltrate the SeriesStats
, circumventing PromQL engine.
First we would create a new SeriesQueryPerformanceCalculator
for aggregating/tracking the SeriesStatsCounters
for each fanned out query
go pseudo:
type SeriesQueryPerformanceMetricsAggregator struct {
QueryDuration *prometheus.HistogramVec
SeriesLeBuckets []float64
SamplesLeBuckets []float64
SeriesStats storepb.SeriesStatsCounter
}
func NewSeriesQueryPerformanceMetricsAggregator(reg prometheus.Registerer) *SeriesQueryPerformanceMetrics {
return &SeriesQueryPerformanceMetrics{
QueryDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_duration_seconds",
Help: "duration of the thanos store select phase for a query",
Buckets: []float64{0.1, 0.25, 0.5, 1, 2, 3, 5}, // These quantiles will be passed via commandline arg
}, []string{"series_le", "samples_le"}),
SeriesStats: storepb.SeriesStatsCounter{},
}
}
// Aggregator for merging `storepb.SeriesStatsCounter` for each incoming fanned out query
func (s *SeriesQueryPerformanceMetricsAggregator) Aggregate(seriesStats *storepb.SeriesStatsCounter) {
s.SeriesStats.Count(seriesStats)
}
// Commit the aggregated SeriesStatsCounter as an observation
func (s *SeriesQueryPerformanceMetricsAggregator) Observe(duration float64) {
// Bucket matching for series/labels matchSeriesBucket/matchSamplesBucket => float64, float64
seriesLeBucket := s.findBucket(s.SeriesStats.Series, &s.SeriesLeBuckets)
samplesLeBucket := s.findBucket(s.SeriesStats.Samples, &s.SamplesLeBuckets)
s.QueryDuration.With(prometheus.Labels{
"series_le": strconv.Itoa(seriesLeBucket),
"samples_le": strconv.Itoa(samplesLeBucket),
}).Observe(duration)
}
// Determine the appropriate bucket for a given value relative to a set of quantiles
func (s *SeriesQueryPerformanceMetricsAggregator) findBucket(value int, quantiles *[]float64) int
Current query fanout logic:
for _, st := range s.stores() {
// [Error handling etc.]
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, reqLogger, span, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses, seriesStatsCh))
}
Propagating the SeriesStats
via storepb.SeriesServer
:
type seriesServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
storepb.Store_SeriesServer
ctx context.Context
seriesSet []storepb.Series
seriesSetStats storepb.SeriesStatsCounter
warnings []string
}
func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
if r.GetWarning() != "" {
s.warnings = append(s.warnings, r.GetWarning())
return nil
}
if r.GetSeries() != nil {
s.seriesSet = append(s.seriesSet, *r.GetSeries())
// For each appended series, increment the seriesStats
s.seriesSetStats.Count(r.GetSeries())
return nil
}
// Unsupported field, skip.
return nil
}
Now that the SeriesStats
are propagated into the storepb.SeriesServer
, we can ammend the selectFn
function to return a tuple of (storage.SeriesSet, storage.SeriesSetCounter, error)
Ammending the QueryableCreator to provide a func parameter:
type SeriesStatsReporter func(seriesStats storepb.SeriesStatsCounter)
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool, seriesStatsReporter SeriesStatsReporter) storage.Queryable
// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration, seriesStatsReporter SeriesStatsReporter) QueryableCreator {
duration := promauto.With(
extprom.WrapRegistererWithPrefix("concurrent_selects_", reg),
).NewHistogram(gate.DurationHistogramOpts)
return func(deduplicate bool, replicaLabels []string, storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
storeDebugMatchers: storeDebugMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
gateProviderFn: func() gate.Gate {
return gate.InstrumentGateDuration(duration, promgate.New(maxConcurrentSelects))
},
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
seriesStatsReporter: seriesStatsReporter,
}
}
}
Injecting the reporter into the qapi Queryable static constructor:
var (
ssmtx sync.Mutex
seriesStats []storepb.SeriesStatsCounter
)
seriesStatsReporter := func(ss storepb.SeriesStatsCounter) {
ssmtx.Lock()
defer ssmtx.Unlock()
seriesStats = append(seriesStats, ss)
}
qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, false, seriesStatsReporter),
r.FormValue("query"),
start,
end,
step,
)
In summary, we will:
seriesServer
to keep track of all SeriesStats
for each series pushed to itqapi.queryableCreate
to take a SeriesStatsReporter
func parameter that will exfiltrate the seriesStats from the Thanos Proxy StoreAPIthanos_query_duration_seconds
in the queryRange
API handlerSeriesStats
to aforementioned metricthanos_query_duration_seconds
histogram after bucketing samples_le/series_le bucketsseriesStats
to the SeriesResponse
and aggregate it in the responseCh
, this would enable us to propagate the seriesStats further up/in other components if need be