Skip to content

Commit

Permalink
Improving compaction query
Browse files Browse the repository at this point in the history
  • Loading branch information
marco6 committed Jul 29, 2024
1 parent 2f99fc6 commit 1ee6dd4
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 159 deletions.
111 changes: 87 additions & 24 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const otelName = "generic"
var (
otelTracer trace.Tracer
otelMeter metric.Meter
setCompactRevCnt metric.Int64Counter
compactCnt metric.Int64Counter
getRevisionCnt metric.Int64Counter
deleteRevCnt metric.Int64Counter
currentRevCnt metric.Int64Counter
Expand All @@ -35,7 +35,7 @@ func init() {
var err error
otelTracer = otel.Tracer(otelName)
otelMeter = otel.Meter(otelName)
setCompactRevCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.compact", otelName), metric.WithDescription("Number of compact requests"))
compactCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.compact", otelName), metric.WithDescription("Number of compact requests"))
if err != nil {
logrus.WithError(err).Warning("Otel failed to create create counter")
}
Expand Down Expand Up @@ -104,11 +104,9 @@ var (

revisionIntervalSQL = `
SELECT (
SELECT prev_revision
SELECT MAX(prev_revision)
FROM kine
WHERE name = 'compact_rev_key'
ORDER BY prev_revision
DESC LIMIT 1
) AS low, (
SELECT MAX(id)
FROM kine
Expand Down Expand Up @@ -143,6 +141,7 @@ type Generic struct {
AfterSQLPrefix string
AfterSQL string
DeleteSQL string
CompactSQL string
UpdateCompactSQL string
InsertSQL string
FillSQL string
Expand Down Expand Up @@ -279,7 +278,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter

UpdateCompactSQL: q(`
UPDATE kine
SET prev_revision = ?
SET prev_revision = max(prev_revision, ?)
WHERE name = 'compact_rev_key'`, paramCharacter, numbered),

InsertLastInsertIDSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
Expand Down Expand Up @@ -440,16 +439,94 @@ func (d *Generic) Count(ctx context.Context, prefix, startKey string, revision i
return rev.Int64, id, err
}

// Compact compacts to revision the database. After the call,
// any request for a version older than revision will return
// a compacted error.
func (d *Generic) Compact(ctx context.Context, revision int64) (err error) {
compactCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.compact", otelName))
defer func() {
span.RecordError(err)
span.End()
}()
compactStart, currentRevision, err := d.GetCompactRevision(ctx)
if err != nil {
return err
}
if compactStart >= revision {
return nil // Nothing to compact.
}
if revision > currentRevision {
revision = currentRevision
}

for i := 0; i < retryCount; i++ {
err = d.tryCompact(ctx, compactStart, revision)
if err == nil || d.Retry == nil || !d.Retry(err) {
break
}
}
return err
}

func (d *Generic) tryCompact(ctx context.Context, start, end int64) error {
tx, err := d.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
if err := tx.Rollback(); err != nil {
logrus.WithError(err).Trace("can't rollback compaction")
}
}()

// This query adds `created = 0` as a condition
// to mitigate a bug in vXXX where newly created
// keys had `prev_revision = max(id)` instead of 0.
// Even with the bug fixed, we still need to check
// for that in older rows and if older peers are
// still running. Given that we are not yet using
// an index, it won't change much in performance
// however, it should be included in a covering
// index if ever necessary.
_, err = tx.ExecContext(ctx, `
DELETE FROM kine
WHERE id IN (
SELECT prev_revision
FROM kine
WHERE name != 'compact_rev_key'
AND created = 0
AND prev_revision != 0
AND ? < id AND id <= ?
)
`, start, end)
if err != nil {
return err
}

_, err = tx.ExecContext(ctx, `
DELETE FROM kine
WHERE deleted = 1
AND ? < id AND id <= ?
`, start, end)
if err != nil {
return err
}

_, err = tx.ExecContext(ctx, d.UpdateCompactSQL, end)
if err != nil {
return err
}
return tx.Commit()
}

func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error) {
getCompactRevCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.get_compact_revision", otelName))
var compact, target sql.NullInt64
start := time.Now()
var err error
defer func() {
if err == sql.ErrNoRows {
err = nil
}
span.RecordError(err)
recordOpResult("revision_interval_sql", err, start)
recordTxResult("revision_interval_sql", err)
Expand All @@ -472,7 +549,7 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error)
if err := rows.Err(); err != nil {
return 0, 0, err
}
return 0, 0, nil
return 0, 0, fmt.Errorf("cannot get compact revision: aggregate query returned no rows")
}

if err := rows.Scan(&compact, &target); err != nil {
Expand All @@ -482,20 +559,6 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error)
return compact.Int64, target.Int64, err
}

func (d *Generic) SetCompactRevision(ctx context.Context, revision int64) error {
var err error
setCompactRevCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.set_compact_revision", otelName))
defer func() {
span.End()
span.RecordError(err)
}()
span.SetAttributes(attribute.Int64("revision", revision))

_, err = d.execute(ctx, "update_compact_sql", d.UpdateCompactSQL, revision)
return err
}

func (d *Generic) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
var err error
getRevisionCnt.Add(ctx, 1)
Expand Down
163 changes: 39 additions & 124 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
"github.com/canonical/k8s-dqlite/pkg/kine/server"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

const (
SupersededCount = 100
otelName = "sqllog"
SupersededCount = 100
compactBatchSize = 1000
otelName = "sqllog"
)

var (
Expand Down Expand Up @@ -67,7 +67,7 @@ type Dialect interface {
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Compact(ctx context.Context, revision int64) error
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
GetSize(ctx context.Context) (int64, error)
Expand Down Expand Up @@ -143,133 +143,36 @@ func (s *SQLLog) DoCompact(ctx context.Context) error {
return fmt.Errorf("failed to initialise compaction: %v", err)
}

nextEnd, _ := s.d.CurrentRevision(ctx)
_, err := s.compactor(ctx, nextEnd)

return err
}

func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
var err error
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.compactor", otelName))
defer func() {
span.RecordError(err)
span.End()
}()
span.SetAttributes(attribute.Int64("nextEnd", nextEnd))

currentRev, err := s.d.CurrentRevision(ctx)
span.SetAttributes(attribute.Int64("currentRev", currentRev))
if err != nil {
logrus.Errorf("failed to get current revision: %v", err)
return nextEnd, fmt.Errorf("failed to get current revision: %v", err)
}

cursor, _, err := s.d.GetCompactRevision(ctx)
if err != nil {
logrus.Errorf("failed to get compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to get compact revision: %v", err)
}
span.SetAttributes(attribute.Int64("cursor", cursor))

end := nextEnd
nextEnd = currentRev

// NOTE(neoaggelos): Ignoring the last 1000 revisions causes the following CNCF conformance test to fail.
// NOTE: Upstream is ignoring the last 1000 revisions, however that causes the following CNCF conformance test to fail.
// This is because of low activity, where the created list is part of the last 1000 revisions and is not compacted.
// Link to failing test: https://github.com/kubernetes/kubernetes/blob/f2cfbf44b1fb482671aedbfff820ae2af256a389/test/e2e/apimachinery/chunking.go#L144
// To address this, we only ignore the last 100 revisions instead
end = end - SupersededCount

savedCursor := cursor
// Purposefully start at the current and redo the current as
// it could have failed before actually compacting
compactCnt.Add(ctx, 1)
span.AddEvent(fmt.Sprintf("start compaction from %d to %d", cursor, end))
for ; cursor <= end; cursor++ {
rows, err := s.d.GetRevision(ctx, cursor)
if err != nil {
logrus.Errorf("failed to get revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to get revision %d: %v", cursor, err)
}

events, err := RowsToEvents(rows)
if err != nil {
logrus.Errorf("failed to convert to events: %v", err)
return nextEnd, fmt.Errorf("failed to convert to events: %v", err)
}

if len(events) == 0 {
continue
}

event := events[0]

if event.KV.Key == "compact_rev_key" {
span.AddEvent("skip compact_rev_key")
// don't compact the compact key
continue
}

setRev := false
if event.PrevKV != nil && event.PrevKV.ModRevision != 0 {
if savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
span.AddEvent(fmt.Sprintf("failed to record compact revision: %v", err))
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
setRev = true
}

if err := s.d.DeleteRevision(ctx, event.PrevKV.ModRevision); err != nil {
span.AddEvent(fmt.Sprintf("failed to delete revision %d", event.PrevKV.ModRevision))
logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
return nextEnd, fmt.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
}
}

if event.Delete {
if !setRev && savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
}

if err := s.d.DeleteRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to delete current revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to delete current revision %d: %v", cursor, err)
}
}
}

if savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
rev, err := s.d.CurrentRevision(ctx)
if err != nil {
return err
}
span.SetAttributes(attribute.Int64("new-nextEnd", nextEnd), attribute.Int64("cursor-ended:", cursor))
return nextEnd, nil
return s.d.Compact(ctx, rev-SupersededCount)
}

func (s *SQLLog) compact() {
var nextEnd int64
t := time.NewTicker(s.d.GetCompactInterval())
nextEnd, _ = s.d.CurrentRevision(s.ctx)

for {
select {
case <-s.ctx.Done():
return
case <-t.C:
func (s *SQLLog) compact(ctx context.Context) error {
// When executing compaction as a background operation
// it's best not to take too much time away from query
// operation and similar. As such, we do compaction in
// small batches. Given that this logic runs every second,
// on regime it should take usually just a couple batches
// to keep the pace.
start, target, err := s.d.GetCompactRevision(ctx)
if err != nil {
return err
}
for start < target {
batchRevision := start + compactBatchSize
if err := s.d.Compact(s.ctx, batchRevision); err != nil {
return err
}

nextEnd, _ = s.compactor(s.ctx, nextEnd)
start = batchRevision
}
return nil
}

func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -418,7 +321,19 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {

go func() {
defer s.wg.Done()
s.compact()

t := time.NewTicker(s.d.GetCompactInterval())

for {
select {
case <-s.ctx.Done():
return
case <-t.C:
if err := s.compact(s.ctx); err != nil {
logrus.WithError(err).Trace("compaction failed")
}
}
}
}()

go func() {
Expand Down
Loading

0 comments on commit 1ee6dd4

Please sign in to comment.