Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve compaction #149

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 101 additions & 29 deletions pkg/kine/drivers/generic/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const otelName = "generic"
var (
otelTracer trace.Tracer
otelMeter metric.Meter
setCompactRevCnt metric.Int64Counter
compactCnt metric.Int64Counter
getRevisionCnt metric.Int64Counter
deleteRevCnt metric.Int64Counter
currentRevCnt metric.Int64Counter
Expand All @@ -36,7 +36,7 @@ func init() {
var err error
otelTracer = otel.Tracer(otelName)
otelMeter = otel.Meter(otelName)
setCompactRevCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.compact", otelName), metric.WithDescription("Number of compact requests"))
compactCnt, err = otelMeter.Int64Counter(fmt.Sprintf("%s.compact", otelName), metric.WithDescription("Number of compact requests"))
if err != nil {
logrus.WithError(err).Warning("Otel failed to create create counter")
}
Expand Down Expand Up @@ -103,16 +103,12 @@ var (

revisionIntervalSQL = `
SELECT (
SELECT crkv.prev_revision
FROM kine AS crkv
WHERE crkv.name = 'compact_rev_key'
ORDER BY prev_revision
DESC LIMIT 1
SELECT MAX(prev_revision)
FROM kine
WHERE name = 'compact_rev_key'
) AS low, (
SELECT id
SELECT MAX(id)
FROM kine
ORDER BY id
DESC LIMIT 1
) AS high`
)

Expand Down Expand Up @@ -145,6 +141,7 @@ type Generic struct {
AfterSQLPrefix string
AfterSQL string
DeleteSQL string
CompactSQL string
UpdateCompactSQL string
InsertSQL string
FillSQL string
Expand Down Expand Up @@ -268,7 +265,7 @@ func Open(ctx context.Context, driverName, dataSourceName string, paramCharacter

UpdateCompactSQL: q(`
UPDATE kine
SET prev_revision = ?
SET prev_revision = max(prev_revision, ?)
WHERE name = 'compact_rev_key'`, paramCharacter, numbered),

InsertLastInsertIDSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value)
Expand Down Expand Up @@ -479,16 +476,105 @@ func (d *Generic) Create(ctx context.Context, key string, value []byte, ttl int6
return result.LastInsertId()
}

// Compact compacts to revision the database. After the call,
louiseschmidtgen marked this conversation as resolved.
Show resolved Hide resolved
// any request for a version older than revision will return
// a compacted error.
func (d *Generic) Compact(ctx context.Context, revision int64) (err error) {
compactCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.Compact", otelName))
defer func() {
span.RecordError(err)
span.End()
}()
compactStart, currentRevision, err := d.GetCompactRevision(ctx)
if err != nil {
return err
}
span.SetAttributes(
attribute.Int64("compact_start", compactStart),
attribute.Int64("current_revision", currentRevision), attribute.Int64("revision", revision),
)
if compactStart >= revision {
return nil // Nothing to compact.
}
if revision > currentRevision {
revision = currentRevision
}

for retryCount := 0; retryCount < maxRetries; retryCount++ {
err = d.tryCompact(ctx, compactStart, revision)
if err == nil || d.Retry == nil || !d.Retry(err) {
break
}
}
return err
}

func (d *Generic) tryCompact(ctx context.Context, start, end int64) (err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.tryCompact", otelName))
defer func() {
span.RecordError(err)
span.End()
}()
span.SetAttributes(attribute.Int64("start", start), attribute.Int64("end", end))

tx, err := d.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
if err := tx.Rollback(); err != nil {
logrus.WithError(err).Trace("can't rollback compaction")
}
}()

// This query adds `created = 0` as a condition
// to mitigate a bug in vXXX where newly created
// keys had `prev_revision = max(id)` instead of 0.
// Even with the bug fixed, we still need to check
// for that in older rows and if older peers are
// still running. Given that we are not yet using
// an index, it won't change much in performance
// however, it should be included in a covering
// index if ever necessary.
_, err = tx.ExecContext(ctx, `
DELETE FROM kine
WHERE id IN (
SELECT prev_revision
FROM kine
WHERE name != 'compact_rev_key'
AND created = 0
AND prev_revision != 0
AND ? < id AND id <= ?
)
`, start, end)
if err != nil {
louiseschmidtgen marked this conversation as resolved.
Show resolved Hide resolved
return err
}

_, err = tx.ExecContext(ctx, `
DELETE FROM kine
WHERE deleted = 1
AND ? < id AND id <= ?
`, start, end)
if err != nil {
return err
}

_, err = tx.ExecContext(ctx, d.UpdateCompactSQL, end)
if err != nil {
return err
}
return tx.Commit()
}

func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error) {
getCompactRevCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.get_compact_revision", otelName))
var compact, target sql.NullInt64
start := time.Now()
var err error
defer func() {
if err == sql.ErrNoRows {
err = nil
}
span.RecordError(err)
recordOpResult("revision_interval_sql", err, start)
recordTxResult("revision_interval_sql", err)
Expand All @@ -511,7 +597,7 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error)
if err := rows.Err(); err != nil {
return 0, 0, err
}
return 0, 0, nil
return 0, 0, fmt.Errorf("cannot get compact revision: aggregate query returned no rows")
}

if err := rows.Scan(&compact, &target); err != nil {
Expand All @@ -521,20 +607,6 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, int64, error)
return compact.Int64, target.Int64, err
}

func (d *Generic) SetCompactRevision(ctx context.Context, revision int64) error {
var err error
setCompactRevCnt.Add(ctx, 1)
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.set_compact_revision", otelName))
defer func() {
span.End()
span.RecordError(err)
}()
span.SetAttributes(attribute.Int64("revision", revision))

_, err = d.execute(ctx, "update_compact_sql", d.UpdateCompactSQL, revision)
return err
}

func (d *Generic) GetRevision(ctx context.Context, revision int64) (*sql.Rows, error) {
var err error
getRevisionCnt.Add(ctx, 1)
Expand Down
163 changes: 41 additions & 122 deletions pkg/kine/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

const (
SupersededCount = 100
otelName = "sqllog"
SupersededCount = 100
compactBatchSize = 1000
Copy link
Contributor

@louiseschmidtgen louiseschmidtgen Jul 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a compaction txn timeout?

bschimke95 marked this conversation as resolved.
Show resolved Hide resolved
otelName = "sqllog"
)

var (
Expand Down Expand Up @@ -68,7 +69,7 @@ type Dialect interface {
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
DeleteRevision(ctx context.Context, revision int64) error
GetCompactRevision(ctx context.Context) (int64, int64, error)
SetCompactRevision(ctx context.Context, revision int64) error
Compact(ctx context.Context, revision int64) error
Fill(ctx context.Context, revision int64) error
IsFill(key string) bool
GetSize(ctx context.Context) (int64, error)
Expand Down Expand Up @@ -139,138 +140,44 @@ func (s *SQLLog) compactStart(ctx context.Context) error {

// DoCompact makes a single compaction run when called. It is intended to be called
// from test functions that have access to the backend.
func (s *SQLLog) DoCompact(ctx context.Context) error {
if err := s.compactStart(ctx); err != nil {
return fmt.Errorf("failed to initialise compaction: %v", err)
}

nextEnd, _ := s.d.CurrentRevision(ctx)
_, err := s.compactor(ctx, nextEnd)

return err
}

func (s *SQLLog) compactor(ctx context.Context, nextEnd int64) (int64, error) {
var err error
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.compactor", otelName))
func (s *SQLLog) DoCompact(ctx context.Context) (err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.DoCompact", otelName))
defer func() {
span.RecordError(err)
span.End()
}()
span.SetAttributes(attribute.Int64("nextEnd", nextEnd))

currentRev, err := s.d.CurrentRevision(ctx)
span.SetAttributes(attribute.Int64("currentRev", currentRev))
if err != nil {
logrus.Errorf("failed to get current revision: %v", err)
return nextEnd, fmt.Errorf("failed to get current revision: %v", err)
if err := s.compactStart(ctx); err != nil {
return fmt.Errorf("failed to initialise compaction: %v", err)
}

cursor, _, err := s.d.GetCompactRevision(ctx)
// When executing compaction as a background operation
// it's best not to take too much time away from query
// operation and similar. As such, we do compaction in
// small batches. Given that this logic runs every second,
// on regime it should take usually just a couple batches
// to keep the pace.
start, target, err := s.d.GetCompactRevision(ctx)
if err != nil {
logrus.Errorf("failed to get compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to get compact revision: %v", err)
return err
}
span.SetAttributes(attribute.Int64("cursor", cursor))

end := nextEnd
nextEnd = currentRev

// NOTE(neoaggelos): Ignoring the last 1000 revisions causes the following CNCF conformance test to fail.
span.SetAttributes(attribute.Int64("start", start))
// NOTE: Upstream is ignoring the last 1000 revisions, however that causes the following CNCF conformance test to fail.
// This is because of low activity, where the created list is part of the last 1000 revisions and is not compacted.
// Link to failing test: https://github.com/kubernetes/kubernetes/blob/f2cfbf44b1fb482671aedbfff820ae2af256a389/test/e2e/apimachinery/chunking.go#L144
// To address this, we only ignore the last 100 revisions instead
end = end - SupersededCount

savedCursor := cursor
// Purposefully start at the current and redo the current as
// it could have failed before actually compacting
compactCnt.Add(ctx, 1)
span.AddEvent(fmt.Sprintf("start compaction from %d to %d", cursor, end))
for ; cursor <= end; cursor++ {
rows, err := s.d.GetRevision(ctx, cursor)
if err != nil {
logrus.Errorf("failed to get revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to get revision %d: %v", cursor, err)
}

events, err := RowsToEvents(rows)
if err != nil {
logrus.Errorf("failed to convert to events: %v", err)
return nextEnd, fmt.Errorf("failed to convert to events: %v", err)
target -= SupersededCount
span.SetAttributes(attribute.Int64("target", target))
for start < target {
batchRevision := start + compactBatchSize
if batchRevision > target {
batchRevision = target
}

if len(events) == 0 {
continue
}

event := events[0]

if event.KV.Key == "compact_rev_key" {
span.AddEvent("skip compact_rev_key")
// don't compact the compact key
continue
}

setRev := false
if event.PrevKV != nil && event.PrevKV.ModRevision != 0 {
if savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
span.AddEvent(fmt.Sprintf("failed to record compact revision: %v", err))
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
setRev = true
}

if err := s.d.DeleteRevision(ctx, event.PrevKV.ModRevision); err != nil {
span.AddEvent(fmt.Sprintf("failed to delete revision %d", event.PrevKV.ModRevision))
logrus.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
return nextEnd, fmt.Errorf("failed to delete revision %d: %v", event.PrevKV.ModRevision, err)
}
}

if event.Delete {
if !setRev && savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
savedCursor = cursor
}

if err := s.d.DeleteRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to delete current revision %d: %v", cursor, err)
return nextEnd, fmt.Errorf("failed to delete current revision %d: %v", cursor, err)
}
}
}

if savedCursor != cursor {
if err := s.d.SetCompactRevision(ctx, cursor); err != nil {
logrus.Errorf("failed to record compact revision: %v", err)
return nextEnd, fmt.Errorf("failed to record compact revision: %v", err)
}
}
span.SetAttributes(attribute.Int64("new-nextEnd", nextEnd), attribute.Int64("cursor-ended:", cursor))
return nextEnd, nil
}

func (s *SQLLog) compact() {
var nextEnd int64
t := time.NewTicker(s.d.GetCompactInterval())
nextEnd, _ = s.d.CurrentRevision(s.ctx)

for {
select {
case <-s.ctx.Done():
return
case <-t.C:
if err := s.d.Compact(s.ctx, batchRevision); err != nil {
return err
}

nextEnd, _ = s.compactor(s.ctx, nextEnd)
start = batchRevision
}
return nil
}

func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
Expand Down Expand Up @@ -442,7 +349,19 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {

go func() {
defer s.wg.Done()
s.compact()

t := time.NewTicker(s.d.GetCompactInterval())

for {
select {
case <-s.ctx.Done():
return
case <-t.C:
if err := s.DoCompact(s.ctx); err != nil {
logrus.WithError(err).Trace("compaction failed")
}
}
}
}()

go func() {
Expand Down
Loading
Loading