Skip to content

Commit

Permalink
implement /replace in dcs-index-backend
Browse files Browse the repository at this point in the history
dcs-package-importer triggers loading the new shard (and deleting the
old one afterwards) after merging index files.
  • Loading branch information
stapelberg committed Oct 9, 2014
1 parent c2bf13c commit e7a4a05
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 7 deletions.
45 changes: 42 additions & 3 deletions cmd/dcs-index-backend/index-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"log"
"net/http"
"os"
"path"
"path/filepath"
"runtime/pprof"
"time"
)
Expand Down Expand Up @@ -72,17 +72,56 @@ func Index(w http.ResponseWriter, r *http.Request) {
fmt.Printf("[%s] written in %v\n", id, t3.Sub(t2))
}

// XXX: theoretically, we need to make sure no Index() call is currently in
// progress since they may return incorrect result if “ix” suddenly points to a
// different index.
func Replace(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
newShard := r.Form.Get("shard")

file, err := os.Open(filepath.Dir(*indexPath))
if err != nil {
log.Fatal(err)
}
defer file.Close()
names, err := file.Readdirnames(-1)
if err != nil {
log.Fatal(err)
}

for _, name := range names {
if name == newShard {
newShard = filepath.Join(filepath.Dir(*indexPath), name)
// We verified the given argument refers to an index shard within
// this directory, so let’s load this shard.
oldIndex := ix
log.Printf("Trying to load %q\n", newShard)
ix = index.Open(newShard)
// Overwrite the old full shard with the new one. This is necessary
// so that the state is persistent across restarts and has the nice
// side-effect of cleaning up the old full shard.
if err := os.Rename(newShard, *indexPath); err != nil {
log.Fatal(err)
}
oldIndex.Close()
return
}
}

http.Error(w, "No such shard.", http.StatusInternalServerError)
}

func main() {
flag.Parse()
if *indexPath == "" {
log.Fatal("You need to specify a non-empty -index_path")
}
fmt.Println("Debian Code Search index-backend")

id = path.Base(*indexPath)

id = filepath.Base(*indexPath)
ix = index.Open(*indexPath)

http.HandleFunc("/index", Index)
http.HandleFunc("/replace", Replace)
log.Fatal(http.ListenAndServe(*listenAddress, nil))
}
22 changes: 22 additions & 0 deletions cmd/dcs-package-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,28 @@ func mergeToShard() {
// log.Printf("merged in %v\n", t1.Sub(t0))
//}
log.Printf("merged into shard %s\n", tmpIndexPath.Name())

// If full.idx does not exist (i.e. on initial deployment), just move the
// new index to full.idx, the dcs-index-backend will not be running anyway.
fullIdxPath := filepath.Join(*unpackedPath, "full.idx")
if _, err := os.Stat(fullIdxPath); os.IsNotExist(err) {
if err := os.Rename(tmpIndexPath.Name(), fullIdxPath); err != nil {
log.Fatal(err)
}
return
}

// Replace the current index with the newly created index.
resp, err := http.Get(fmt.Sprintf("http://localhost:28081/replace?shard=%s", filepath.Base(tmpIndexPath.Name())))
if err != nil {
log.Fatal(err)
}

defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
log.Fatalf("dcs-index-backend /replace response: %+v (body: %s)\n", resp, body)
}
}

func indexPackage(pkg string) {
Expand Down
4 changes: 2 additions & 2 deletions index/mmap_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ func mmapFile(f *os.File) mmapData {
}
n := int(size)
if n == 0 {
return mmapData{f, nil}
return mmapData{f, nil, nil}
}
data, err := syscall.Mmap(int(f.Fd()), 0, (n+4095)&^4095, syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
log.Fatalf("mmap %s: %v", f.Name(), err)
}
return mmapData{f, data[:n]}
return mmapData{f, data[:n], data}
}
13 changes: 11 additions & 2 deletions index/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"os"
"runtime"
"sort"
"syscall"
)

const (
Expand Down Expand Up @@ -112,6 +113,13 @@ func Open(file string) *Index {
return ix
}

func (ix *Index) Close() {
if err := syscall.Munmap(ix.data.orig); err != nil {
log.Fatalf("munmap: %v", err)
}
ix.data.f.Close()
}

// slice returns the slice of index data starting at the given byte offset.
// If n >= 0, the slice must have length at least n and is truncated to length n.
func (ix *Index) slice(off uint32, n int) []byte {
Expand Down Expand Up @@ -428,8 +436,9 @@ func corrupt(file string) {

// An mmapData is mmap'ed read-only data from a file.
type mmapData struct {
f *os.File
d []byte
f *os.File
d []byte
orig []byte
}

// mmap maps the given file into memory.
Expand Down

0 comments on commit e7a4a05

Please sign in to comment.