Skip to content

Commit 29251fe

Browse files
authored
store: Postings fetching optimizations (#2294)
* Avoid fetching duplicate keys. Simplified groups with add/remove keys. Signed-off-by: Peter Štibraný <[email protected]> * Added shortcuts Signed-off-by: Peter Štibraný <[email protected]> * Optimize away fetching of ALL postings, if possible. Only remove postings for each key once. Signed-off-by: Peter Štibraný <[email protected]> * Don't do individual index.Without, but merge them first. Signed-off-by: Peter Štibraný <[email protected]> * Don't use map for fetching postings, but return slice instead. This is in line with original code. Using a map was nicer, but more expensive in terms of allocations and hashing labels. Signed-off-by: Peter Štibraný <[email protected]> * Renamed 'all' to 'allRequested'. Signed-off-by: Peter Štibraný <[email protected]> * Typo Signed-off-by: Peter Štibraný <[email protected]> * Make linter happy. Signed-off-by: Peter Štibraný <[email protected]> * Added comment to fetchPostings. Signed-off-by: Peter Štibraný <[email protected]> * Group vars Signed-off-by: Peter Štibraný <[email protected]> * Comments Signed-off-by: Peter Štibraný <[email protected]> * Use allPostings and emptyPostings variables for special cases. Signed-off-by: Peter Štibraný <[email protected]> * Unify terminology to "special All postings" Signed-off-by: Peter Štibraný <[email protected]> * Address feedback. Signed-off-by: Peter Štibraný <[email protected]> * Added CHANGELOG.md entry. Signed-off-by: Peter Štibraný <[email protected]> * Fix check for empty group. Signed-off-by: Peter Štibraný <[email protected]> * Comment Signed-off-by: Peter Štibraný <[email protected]> * Special All postings is now added as a new group No special handling required anymore. Signed-off-by: Peter Štibraný <[email protected]> * Updated comment Signed-off-by: Peter Štibraný <[email protected]>
1 parent 594fed9 commit 29251fe

File tree

2 files changed

+129
-89
lines changed

2 files changed

+129
-89
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
2828
Since there are no consistency guarantees provided by some Object Storage providers, this PR adds a consistent lock-free way of dealing with Object Storage irrespective of the choice of object storage. In order to achieve this co-ordination, blocks are not deleted directly. Instead, blocks are marked for deletion by uploading `deletion-mark.json` file for the block that was chosen to be deleted. This file contains unix time of when the block was marked for deletion.
2929

3030
- [#2090](https://github.com/thanos-io/thanos/issues/2090) *breaking* Downsample command: the `downsample` command has moved as the `thanos bucket` sub-command, and cannot be called via `thanos downsample` any more.
31+
- [#2294](https://github.com/thanos-io/thanos/pull/2294) store: optimizations for fetching postings. Queries using `=~".*"` matchers or negation matchers (`!=...` or `!~...`) benefit the most.
3132

3233
## [v0.11.0](https://github.com/thanos-io/thanos/releases/tag/v0.11.0) - 2020.03.02
3334

pkg/store/bucket.go

+128-89
Original file line numberDiff line numberDiff line change
@@ -1310,7 +1310,12 @@ func newBucketIndexReader(ctx context.Context, block *bucketBlock) *bucketIndexR
13101310
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
13111311
// single label name=value.
13121312
func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, error) {
1313-
var postingGroups []*postingGroup
1313+
var (
1314+
postingGroups []*postingGroup
1315+
allRequested = false
1316+
hasAdds = false
1317+
keys []labels.Label
1318+
)
13141319

13151320
// NOTE: Derived from tsdb.PostingsForMatchers.
13161321
for _, m := range ms {
@@ -1320,23 +1325,71 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
13201325
return nil, errors.Wrap(err, "toPostingGroup")
13211326
}
13221327

1328+
// If this groups adds nothing, it's an empty group. We can shortcut this, since intersection with empty
1329+
// postings would return no postings anyway.
1330+
// E.g. label="non-existing-value" returns empty group.
1331+
if !pg.addAll && len(pg.addKeys) == 0 {
1332+
return nil, nil
1333+
}
1334+
13231335
postingGroups = append(postingGroups, pg)
1336+
allRequested = allRequested || pg.addAll
1337+
hasAdds = hasAdds || len(pg.addKeys) > 0
1338+
1339+
// Postings returned by fetchPostings will be in the same order as keys
1340+
// so it's important that we iterate them in the same order later.
1341+
// We don't have any other way of pairing keys and fetched postings.
1342+
keys = append(keys, pg.addKeys...)
1343+
keys = append(keys, pg.removeKeys...)
13241344
}
13251345

13261346
if len(postingGroups) == 0 {
13271347
return nil, nil
13281348
}
13291349

1330-
if err := r.fetchPostings(postingGroups); err != nil {
1350+
// We only need special All postings if there are no other adds. If there are, we can skip fetching
1351+
// special All postings completely.
1352+
if allRequested && !hasAdds {
1353+
// add group with label to fetch "special All postings".
1354+
name, value := index.AllPostingsKey()
1355+
allPostingsLabel := labels.Label{Name: name, Value: value}
1356+
1357+
postingGroups = append(postingGroups, newPostingGroup(true, []labels.Label{allPostingsLabel}, nil))
1358+
keys = append(keys, allPostingsLabel)
1359+
}
1360+
1361+
fetchedPostings, err := r.fetchPostings(keys)
1362+
if err != nil {
13311363
return nil, errors.Wrap(err, "get postings")
13321364
}
13331365

1334-
var postings []index.Postings
1366+
// Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys
1367+
// again, and this is exactly the same order as before (when building the groups), so we can simply
1368+
// use one incrementing index to fetch postings from returned slice.
1369+
postingIndex := 0
1370+
1371+
var groupAdds, groupRemovals []index.Postings
13351372
for _, g := range postingGroups {
1336-
postings = append(postings, g.Postings())
1373+
// We cannot add empty set to groupAdds, since they are intersected.
1374+
if len(g.addKeys) > 0 {
1375+
toMerge := make([]index.Postings, 0, len(g.addKeys))
1376+
for _, l := range g.addKeys {
1377+
toMerge = append(toMerge, checkNilPosting(l, fetchedPostings[postingIndex]))
1378+
postingIndex++
1379+
}
1380+
1381+
groupAdds = append(groupAdds, index.Merge(toMerge...))
1382+
}
1383+
1384+
for _, l := range g.removeKeys {
1385+
groupRemovals = append(groupRemovals, checkNilPosting(l, fetchedPostings[postingIndex]))
1386+
postingIndex++
1387+
}
13371388
}
13381389

1339-
ps, err := index.ExpandPostings(index.Intersect(postings...))
1390+
result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...))
1391+
1392+
ps, err := index.ExpandPostings(result)
13401393
if err != nil {
13411394
return nil, errors.Wrap(err, "expand")
13421395
}
@@ -1352,150 +1405,136 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er
13521405
return ps, nil
13531406
}
13541407

1408+
// postingGroup keeps posting keys for single matcher. Logical result of the group is:
1409+
// If addAll is set: special All postings minus postings for removeKeys labels. No need to merge postings for addKeys in this case.
1410+
// If addAll is not set: Merge of postings for "addKeys" labels minus postings for removeKeys labels
1411+
// This computation happens in ExpandedPostings.
13551412
type postingGroup struct {
1356-
keys labels.Labels
1357-
postings []index.Postings
1358-
1359-
aggregate func(postings []index.Postings) index.Postings
1413+
addAll bool
1414+
addKeys []labels.Label
1415+
removeKeys []labels.Label
13601416
}
13611417

1362-
func newPostingGroup(keys labels.Labels, aggr func(postings []index.Postings) index.Postings) *postingGroup {
1418+
func newPostingGroup(addAll bool, addKeys, removeKeys []labels.Label) *postingGroup {
13631419
return &postingGroup{
1364-
keys: keys,
1365-
postings: make([]index.Postings, len(keys)),
1366-
aggregate: aggr,
1420+
addAll: addAll,
1421+
addKeys: addKeys,
1422+
removeKeys: removeKeys,
13671423
}
13681424
}
13691425

1370-
func (p *postingGroup) Fill(i int, posting index.Postings) {
1371-
p.postings[i] = posting
1372-
}
1373-
1374-
func (p *postingGroup) Postings() index.Postings {
1375-
if len(p.keys) == 0 {
1376-
return index.EmptyPostings()
1426+
func checkNilPosting(l labels.Label, p index.Postings) index.Postings {
1427+
if p == nil {
1428+
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
1429+
return index.ErrPostings(errors.Errorf("postings is nil for %s. It was never fetched.", l))
13771430
}
1378-
1379-
for i, posting := range p.postings {
1380-
if posting == nil {
1381-
// This should not happen. Debug for https://github.com/thanos-io/thanos/issues/874.
1382-
return index.ErrPostings(errors.Errorf("at least one of %d postings is nil for %s. It was never fetched.", i, p.keys[i]))
1383-
}
1384-
}
1385-
1386-
return p.aggregate(p.postings)
1387-
}
1388-
1389-
func merge(p []index.Postings) index.Postings {
1390-
return index.Merge(p...)
1431+
return p
13911432
}
13921433

1393-
func allWithout(p []index.Postings) index.Postings {
1394-
return index.Without(p[0], index.Merge(p[1:]...))
1395-
}
1434+
var (
1435+
allPostingsGroup = newPostingGroup(true, nil, nil)
1436+
emptyPostingsGroup = newPostingGroup(false, nil, nil)
1437+
)
13961438

13971439
// NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication.
13981440
func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) {
1399-
var matchingLabels labels.Labels
1441+
// This matches any label value, and also series that don't have this label at all.
1442+
if m.Type == labels.MatchRegexp && (m.Value == ".*" || m.Value == "^.*$") {
1443+
return allPostingsGroup, nil
1444+
}
1445+
1446+
// NOT matching any value = match nothing. We can shortcut this easily.
1447+
if m.Type == labels.MatchNotRegexp && (m.Value == ".*" || m.Value == "^.*$") {
1448+
return emptyPostingsGroup, nil
1449+
}
14001450

14011451
// If the matcher selects an empty value, it selects all the series which don't
14021452
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
14031453
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555.
14041454
if m.Matches("") {
1405-
allName, allValue := index.AllPostingsKey()
1406-
1407-
matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue})
14081455
vals, err := lvalsFn(m.Name)
14091456
if err != nil {
14101457
return nil, err
14111458
}
1459+
1460+
var toRemove []labels.Label
14121461
for _, val := range vals {
14131462
if !m.Matches(val) {
1414-
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
1463+
toRemove = append(toRemove, labels.Label{Name: m.Name, Value: val})
14151464
}
14161465
}
14171466

1418-
if len(matchingLabels) == 1 {
1419-
// This is known hack to return all series.
1420-
// Ask for x != <not existing value>. Allow for that as Prometheus does,
1421-
// even though it is expensive.
1422-
return newPostingGroup(matchingLabels, merge), nil
1423-
}
1424-
1425-
return newPostingGroup(matchingLabels, allWithout), nil
1467+
return newPostingGroup(true, nil, toRemove), nil
14261468
}
14271469

14281470
// Fast-path for equal matching.
14291471
if m.Type == labels.MatchEqual {
1430-
return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil
1472+
return newPostingGroup(false, []labels.Label{{Name: m.Name, Value: m.Value}}, nil), nil
14311473
}
14321474

14331475
vals, err := lvalsFn(m.Name)
14341476
if err != nil {
14351477
return nil, err
14361478
}
14371479

1480+
var toAdd []labels.Label
14381481
for _, val := range vals {
14391482
if m.Matches(val) {
1440-
matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val})
1483+
toAdd = append(toAdd, labels.Label{Name: m.Name, Value: val})
14411484
}
14421485
}
14431486

1444-
return newPostingGroup(matchingLabels, merge), nil
1487+
return newPostingGroup(false, toAdd, nil), nil
14451488
}
14461489

14471490
type postingPtr struct {
1448-
groupID int
1449-
keyID int
1450-
ptr index.Range
1491+
keyID int
1492+
ptr index.Range
14511493
}
14521494

14531495
// fetchPostings fill postings requested by posting groups.
1454-
func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
1496+
// It returns one postings for each key, in the same order.
1497+
// If postings for given key is not fetched, entry at given index will be nil.
1498+
func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings, error) {
14551499
var ptrs []postingPtr
14561500

1457-
// Fetch postings from the cache with a single call.
1458-
keys := make([]labels.Label, 0)
1459-
for _, g := range groups {
1460-
keys = append(keys, g.keys...)
1461-
}
1501+
output := make([]index.Postings, len(keys))
14621502

1503+
// Fetch postings from the cache with a single call.
14631504
fromCache, _ := r.block.indexCache.FetchMultiPostings(r.ctx, r.block.meta.ULID, keys)
14641505

14651506
// Iterate over all groups and fetch posting from cache.
14661507
// If we have a miss, mark key to be fetched in `ptrs` slice.
14671508
// Overlaps are well handled by partitioner, so we don't need to deduplicate keys.
1468-
for i, g := range groups {
1469-
for j, key := range g.keys {
1470-
// Get postings for the given key from cache first.
1471-
if b, ok := fromCache[key]; ok {
1472-
r.stats.postingsTouched++
1473-
r.stats.postingsTouchedSizeSum += len(b)
1474-
1475-
_, l, err := r.dec.Postings(b)
1476-
if err != nil {
1477-
return errors.Wrap(err, "decode postings")
1478-
}
1509+
for ix, key := range keys {
1510+
// Get postings for the given key from cache first.
1511+
if b, ok := fromCache[key]; ok {
1512+
r.stats.postingsTouched++
1513+
r.stats.postingsTouchedSizeSum += len(b)
14791514

1480-
g.Fill(j, l)
1481-
continue
1515+
_, l, err := r.dec.Postings(b)
1516+
if err != nil {
1517+
return nil, errors.Wrap(err, "decode postings")
14821518
}
14831519

1484-
// Cache miss; save pointer for actual posting in index stored in object store.
1485-
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
1486-
if err == indexheader.NotFoundRangeErr {
1487-
// This block does not have any posting for given key.
1488-
g.Fill(j, index.EmptyPostings())
1489-
continue
1490-
}
1520+
output[ix] = l
1521+
continue
1522+
}
14911523

1492-
if err != nil {
1493-
return errors.Wrap(err, "index header PostingsOffset")
1494-
}
1524+
// Cache miss; save pointer for actual posting in index stored in object store.
1525+
ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value)
1526+
if err == indexheader.NotFoundRangeErr {
1527+
// This block does not have any posting for given key.
1528+
output[ix] = index.EmptyPostings()
1529+
continue
1530+
}
14951531

1496-
r.stats.postingsToFetch++
1497-
ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j})
1532+
if err != nil {
1533+
return nil, errors.Wrap(err, "index header PostingsOffset")
14981534
}
1535+
1536+
r.stats.postingsToFetch++
1537+
ptrs = append(ptrs, postingPtr{ptr: ptr, keyID: ix})
14991538
}
15001539

15011540
sort.Slice(ptrs, func(i, j int) bool {
@@ -1543,8 +1582,8 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
15431582
r.mtx.Lock()
15441583
// Return postings and fill LRU cache.
15451584
// Truncate first 4 bytes which are length of posting.
1546-
groups[p.groupID].Fill(p.keyID, newBigEndianPostings(pBytes[4:]))
1547-
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], pBytes)
1585+
output[p.keyID] = newBigEndianPostings(pBytes[4:])
1586+
r.block.indexCache.StorePostings(r.ctx, r.block.meta.ULID, keys[p.keyID], pBytes)
15481587

15491588
// If we just fetched it we still have to update the stats for touched postings.
15501589
r.stats.postingsTouched++
@@ -1555,7 +1594,7 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
15551594
})
15561595
}
15571596

1558-
return g.Wait()
1597+
return output, g.Wait()
15591598
}
15601599

15611600
func resizePostings(b []byte) ([]byte, error) {

0 commit comments

Comments
 (0)