Thanos is built to be a set of components that can be composed into a highly available metrics system with unlimited storage capacity. But for achieving true HA we need to ensure that tenants in our write path, cannot push too much data and cause issues. There need to be limits in place so that our ingestion systems maintain the level of Quality of Service (QoS) and only block the tenant that exceeds limits.
With limiting, we also need tracking and configuration of those limits to reliably use them.
We run multiple Thanos Receive replicas in a hashring topology, to ingest metric data from multiple tenants via remote write requests and distribute the load evenly. This allows us to scalably process these requests and write tenant data into a local Receive TSDB for Thanos Querier to query. We also replicate write requests across multiple (usually three) Receive replicas, so that even during times where replicas are unavailable, we can still ingest the data.
While this composes a scalable and highly available system, sudden increased load, i.e increase in active series from any tenant can cause Receive to hit its limits and cause incidents.
We could scale horizontally automatically during such increased load (once we implement this), but it is yet safe to do so, plus a full solution cannot have unbounded cost scaling. Thus, some form of limits must be put in place, to prevent such issues from occurring and causing incidents.
Thanos Receive uses Prometheus TSDB and creates a separate TSDB database instance internally for each of its tenants. When a Receive replica gets a remote write request, it loops through the timeseries, hashes labels with tenant name as prefix and forwards remote write request to other Receive nodes. Upon receiving samples in a remote write request from a tenant, the Receive node appends the samples to the in-memory head block of a tenant.
We can leverage this fact, and generate statistics from the TSDB head block, which can give us an accurate idea of the active (head) series of a tenant. This is also exposed as a metric.
Thus, any remote write request can be failed completely, with a 429 status code and appropriate error message. We can even check if it increases the number of active series above the configured limit for a tenant in certain approaches. Partially accepting write requests might lead to confusing results and error semantics, so we propose to avoid this and focus on retries from client-side.
There are however a few challenges to this, as tenant metric data is distributed and replicated across multiple Thanos Receive replicas. Also, with a multi-replica setup, we have the concept of per-replica-tenant and per-tenant limits that can be defined as,
Per-replica-tenant limit: The limit imposed for active series per tenant, per replica of Thanos Receive. An initial implementation of this is already WIP in this PR. This can also be treated as the active series limit for non-hashring topology or single replica Receive setups.
Per-tenant limit: The overall limit imposed for active series per tenant across all replicas of Thanos Receive. This is essentially what this proposal is for.
In general, we would need three measures to impose a limit,
There are a few ways in which we can achieve the outlined goals of this proposal and get the above measurements to impose a limit. The order of approaches is based on preference.
We could leverage any meta-monitoring solution, that in the context of this proposal, would mean any Prometheus Query API compatible solution which is capable of consuming metrics exposes by all Thanos Receive instances. Such query endpoint would allows getting the scrape time seconds old number of all active series per tenant with TSDB metrics like prometheus_tsdb_head_series
, and limit based on that value.
This approach would add validation logic within Receive Router or RouterIngestor modes and can be optionally enabled via flags.
With such approach, we do not need to calculate an increase based on requests, as this will be handled by Receive instrumentation and meta-monitoring solution. We only need to query the latest HEAD series value for a tenant summed across all receives and limit remote write requests if the result of the instant query is greater than the configured limit.
The value of current active series for each tenant can be cached in a map which would be updated by meta-monitoring query which is executed periodically. This map will be referred to for latestCurrentSeries
.
So if a user configures a per-tenant limit, say globalSeriesLimit
, the resultant limiting equation here would simply be globalSeriesLimit >= latestCurrentSeries
which is checked on request.
This is the simplest solution that can be implemented within Thanos Receive that can help us achieve best-effort limiting and stability. The fact that it does not rely on inter-Receive communication, which is very complex to implement, makes it a pragmatic solution.
A full-fledged reference implementation of this can be found here: https://github.com/thanos-io/thanos/pull/5520.
There are a few alternatives to what is proposed above,
We can implement some new endpoints on Thanos Receive.
Firstly, we can take advantage of the api/v1/status/tsdb
endpoint that is exposed by Prometheus TSDB but has been implemented in Thanos Receive (PR which utilizes tenant headers to get local tenant TSDB stats in Receive).
In its current implementation, it can provide us stats for each local TSDB of a tenant which contains a measure of active series (head series). It can return stats for all tenants in a Receive instance as well (PR). We can merge this across replicas to get the total number of active series a tenant has.
Furthermore, we also have each tenant’s Appendable in multitsdb, which returns a Prometheus storage.Appender, which can in turn give us a storage.GetRef interface. This helps us know if a TSDB has a cached reference for a particular set of labels in its HEAD.
This GetRef interface returns a SeriesRef when a set of labels is passed to it. If the SeriesRef is 0, it means that that set of labels is not cached, and any sample with that set of labels will generate a new active series. This data can also be fetched from a new endpoint like api/v1/getrefmap
and merged across replicas.
This approach would add validation logic within Receive Router, which we can call as “Validator”. This can be optionally enabled via flags and a Validator can be used in front of a Receive Hashring. This is where we can get data from hashring Receivers and merge them to limit remote write requests.
The implementation would be as follows,
globalSeriesLimit
api/v1/status/tsdb
endpoint on each replica with all_tenants=true
query parameter and merge the count of active series i.e currentSeries
of a tenantapi/v1/getrefmap
, which when provided with a tenant id and a remote write request returns a map of SeriesRef and labelsetsSeriesRef == 0
for all replicas. This is the increase in the number of active series if the remote write request is ingested i.e increaseOnRequest
. For example,So, the limiting equation in this case becomes globalSeriesLimit >= currentSeries + increaseOnRequest
.
We treat the two endpoints api/v1/status/tsdb
& api/v1/getrefmap
as two different endpoints throughout this proposal but maybe exposing some gRPC API that combines the two would be much more suitable here, for example,
/// Limit represents an API on Thanos Receive, which is used for limiting remote write requests based on active series.
service Limit {
/// Status returns various cardinality statistics about any Receive tenant TSDB.
rpc Status(Tenant) returns (TSDBStatus);
/// GetRefMap returns a map of ZLabelSet and SeriesRef.
rpc GetRefMap(WriteRequest) returns (SeriesRefMap);
}
message SeriesRefMap {
map<ZLabelSet, uint64> series_ref_map = 1 [(gogoproto.nullable) = false];
}
api/v1/status/tsdb
inaccurateWe can implement the same new endpoints as mentioned in the previous approach, on Thanos Receive, but do merging and checking operations on each Receive node in the hashring, i.e change the existing Router and Ingestor modes to handle the same limting logic.
The implementation would be as follows,
globalSeriesLimit
api/v1/status/tsdb
of other replicas for a particular tenant and merge the count of HEAD series i.e currentSeries
api/v1/getrefmap
, which when provided with a tenant id and a remote write request returns a map of SeriesRef and labelsetsSeriesRef == 0
for all replicas. This is the increase in the number of active series if the remote write request is ingested i.e increaseOnRequest
.So, the limiting equation in this case is also the same as before, globalSeriesLimit >= currentSeries + increaseOnRequest
.
The option of using gRPC instead of two API calls each time is also valid here.
api/v1/status/tsdb
inaccurateAn alternative could be just not to limit active series globally and make do with local limits only.
Consistent hashing might be implemented and problems with sharding can be sorted out, which would make adding Receive replicas to hashring a non-disruptive operation, so that solutions like HPA can be used and make scale up/down operations much easier to the point where limits are not needed.
Not implementing this within Thanos, but rather using some other API gateway-like component, which can parse remote write requests and maintain running counts of active series for all tenants and limit based on that. A particular example of such a project where this can be implemented is Observatorium.