From 0751153613bfd2e39cf28e83bbe04b76641d222f Mon Sep 17 00:00:00 2001
From: Lunny Xiao <xiaolunwen@gmail.com>
Date: Thu, 21 Feb 2019 08:54:05 +0800
Subject: [PATCH] refactor issue indexer, add some testing and fix a bug
 (#6131)

* refactor issue indexer, add some testing and fix a bug

* fix error copyright year on comment header

* issues indexer package import keep consistent
---
 .gitignore                              |   1 +
 models/issue.go                         |   5 +
 models/issue_indexer.go                 | 148 ------------------------
 models/unit_tests.go                    |   4 -
 modules/indexer/issues/indexer.go       | 148 ++++++++++++++++++++++++
 modules/indexer/issues/indexer_test.go  |  51 ++++++++
 modules/indexer/issues/queue_disk.go    |  19 +--
 modules/notification/indexer/indexer.go |  17 +--
 routers/api/v1/repo/issue.go            |   3 +-
 routers/init.go                         |   3 +-
 routers/repo/issue.go                   |   3 +-
 11 files changed, 231 insertions(+), 171 deletions(-)
 delete mode 100644 models/issue_indexer.go
 create mode 100644 modules/indexer/issues/indexer_test.go

diff --git a/.gitignore b/.gitignore
index 941ec41e04..2fe0134f7d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -62,6 +62,7 @@ coverage.all
 /integrations/pgsql.ini
 /integrations/mssql.ini
 /node_modules
+/modules/indexer/issues/indexers
 
 
 # Snapcraft
diff --git a/models/issue.go b/models/issue.go
index 835c6cf9fc..8de26c2b15 100644
--- a/models/issue.go
+++ b/models/issue.go
@@ -1231,6 +1231,11 @@ func getIssueIDsByRepoID(e Engine, repoID int64) ([]int64, error) {
 	return ids, err
 }
 
+// GetIssueIDsByRepoID returns all issue ids by repo id
+func GetIssueIDsByRepoID(repoID int64) ([]int64, error) {
+	return getIssueIDsByRepoID(x, repoID)
+}
+
 // GetIssuesByIDs return issues with the given IDs.
 func GetIssuesByIDs(issueIDs []int64) ([]*Issue, error) {
 	return getIssuesByIDs(x, issueIDs)
diff --git a/models/issue_indexer.go b/models/issue_indexer.go
deleted file mode 100644
index d02b7164da..0000000000
--- a/models/issue_indexer.go
+++ /dev/null
@@ -1,148 +0,0 @@
-// Copyright 2017 The Gitea Authors. All rights reserved.
-// Use of this source code is governed by a MIT-style
-// license that can be found in the LICENSE file.
-
-package models
-
-import (
-	"fmt"
-
-	"code.gitea.io/gitea/modules/indexer/issues"
-	"code.gitea.io/gitea/modules/log"
-	"code.gitea.io/gitea/modules/setting"
-	"code.gitea.io/gitea/modules/util"
-)
-
-var (
-	// issueIndexerUpdateQueue queue of issue ids to be updated
-	issueIndexerUpdateQueue issues.Queue
-	issueIndexer            issues.Indexer
-)
-
-// InitIssueIndexer initialize issue indexer
-func InitIssueIndexer() error {
-	var populate bool
-	switch setting.Indexer.IssueType {
-	case "bleve":
-		issueIndexer = issues.NewBleveIndexer(setting.Indexer.IssuePath)
-		exist, err := issueIndexer.Init()
-		if err != nil {
-			return err
-		}
-		populate = !exist
-	default:
-		return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
-	}
-
-	var err error
-	switch setting.Indexer.IssueIndexerQueueType {
-	case setting.LevelQueueType:
-		issueIndexerUpdateQueue, err = issues.NewLevelQueue(
-			issueIndexer,
-			setting.Indexer.IssueIndexerQueueDir,
-			setting.Indexer.IssueIndexerQueueBatchNumber)
-		if err != nil {
-			return err
-		}
-	case setting.ChannelQueueType:
-		issueIndexerUpdateQueue = issues.NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
-	default:
-		return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
-	}
-
-	go issueIndexerUpdateQueue.Run()
-
-	if populate {
-		go populateIssueIndexer()
-	}
-
-	return nil
-}
-
-// populateIssueIndexer populate the issue indexer with issue data
-func populateIssueIndexer() {
-	for page := 1; ; page++ {
-		repos, _, err := SearchRepositoryByName(&SearchRepoOptions{
-			Page:        page,
-			PageSize:    RepositoryListDefaultPageSize,
-			OrderBy:     SearchOrderByID,
-			Private:     true,
-			Collaborate: util.OptionalBoolFalse,
-		})
-		if err != nil {
-			log.Error(4, "SearchRepositoryByName: %v", err)
-			continue
-		}
-		if len(repos) == 0 {
-			return
-		}
-
-		for _, repo := range repos {
-			is, err := Issues(&IssuesOptions{
-				RepoIDs:  []int64{repo.ID},
-				IsClosed: util.OptionalBoolNone,
-				IsPull:   util.OptionalBoolNone,
-			})
-			if err != nil {
-				log.Error(4, "Issues: %v", err)
-				continue
-			}
-			if err = IssueList(is).LoadDiscussComments(); err != nil {
-				log.Error(4, "LoadComments: %v", err)
-				continue
-			}
-			for _, issue := range is {
-				UpdateIssueIndexer(issue)
-			}
-		}
-	}
-}
-
-// UpdateIssueIndexer add/update an issue to the issue indexer
-func UpdateIssueIndexer(issue *Issue) {
-	var comments []string
-	for _, comment := range issue.Comments {
-		if comment.Type == CommentTypeComment {
-			comments = append(comments, comment.Content)
-		}
-	}
-	issueIndexerUpdateQueue.Push(&issues.IndexerData{
-		ID:       issue.ID,
-		RepoID:   issue.RepoID,
-		Title:    issue.Title,
-		Content:  issue.Content,
-		Comments: comments,
-	})
-}
-
-// DeleteRepoIssueIndexer deletes repo's all issues indexes
-func DeleteRepoIssueIndexer(repo *Repository) {
-	var ids []int64
-	ids, err := getIssueIDsByRepoID(x, repo.ID)
-	if err != nil {
-		log.Error(4, "getIssueIDsByRepoID failed: %v", err)
-		return
-	}
-
-	if len(ids) <= 0 {
-		return
-	}
-
-	issueIndexerUpdateQueue.Push(&issues.IndexerData{
-		IDs:      ids,
-		IsDelete: true,
-	})
-}
-
-// SearchIssuesByKeyword search issue ids by keywords and repo id
-func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
-	var issueIDs []int64
-	res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
-	if err != nil {
-		return nil, err
-	}
-	for _, r := range res.Hits {
-		issueIDs = append(issueIDs, r.ID)
-	}
-	return issueIDs, nil
-}
diff --git a/models/unit_tests.go b/models/unit_tests.go
index f87dd7ee96..28cd91215e 100644
--- a/models/unit_tests.go
+++ b/models/unit_tests.go
@@ -44,10 +44,6 @@ func MainTest(m *testing.M, pathToGiteaRoot string) {
 		fatalTestError("Error creating test engine: %v\n", err)
 	}
 
-	if err = InitIssueIndexer(); err != nil {
-		fatalTestError("Error InitIssueIndexer: %v\n", err)
-	}
-
 	setting.AppURL = "https://try.gitea.io/"
 	setting.RunUser = "runuser"
 	setting.SSH.Port = 3000
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index c31006d0dd..41af5c36b6 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -4,6 +4,15 @@
 
 package issues
 
+import (
+	"fmt"
+
+	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/modules/log"
+	"code.gitea.io/gitea/modules/setting"
+	"code.gitea.io/gitea/modules/util"
+)
+
 // IndexerData data stored in the issue indexer
 type IndexerData struct {
 	ID       int64
@@ -34,3 +43,142 @@ type Indexer interface {
 	Delete(ids ...int64) error
 	Search(kw string, repoID int64, limit, start int) (*SearchResult, error)
 }
+
+var (
+	// issueIndexerUpdateQueue queue of issue ids to be updated
+	issueIndexerUpdateQueue Queue
+	issueIndexer            Indexer
+)
+
+// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
+// all issue index done.
+func InitIssueIndexer(syncReindex bool) error {
+	var populate bool
+	switch setting.Indexer.IssueType {
+	case "bleve":
+		issueIndexer = NewBleveIndexer(setting.Indexer.IssuePath)
+		exist, err := issueIndexer.Init()
+		if err != nil {
+			return err
+		}
+		populate = !exist
+	default:
+		return fmt.Errorf("unknow issue indexer type: %s", setting.Indexer.IssueType)
+	}
+
+	var err error
+	switch setting.Indexer.IssueIndexerQueueType {
+	case setting.LevelQueueType:
+		issueIndexerUpdateQueue, err = NewLevelQueue(
+			issueIndexer,
+			setting.Indexer.IssueIndexerQueueDir,
+			setting.Indexer.IssueIndexerQueueBatchNumber)
+		if err != nil {
+			return err
+		}
+	case setting.ChannelQueueType:
+		issueIndexerUpdateQueue = NewChannelQueue(issueIndexer, setting.Indexer.IssueIndexerQueueBatchNumber)
+	default:
+		return fmt.Errorf("Unsupported indexer queue type: %v", setting.Indexer.IssueIndexerQueueType)
+	}
+
+	go issueIndexerUpdateQueue.Run()
+
+	if populate {
+		if syncReindex {
+			populateIssueIndexer()
+		} else {
+			go populateIssueIndexer()
+		}
+	}
+
+	return nil
+}
+
+// populateIssueIndexer populate the issue indexer with issue data
+func populateIssueIndexer() {
+	for page := 1; ; page++ {
+		repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
+			Page:        page,
+			PageSize:    models.RepositoryListDefaultPageSize,
+			OrderBy:     models.SearchOrderByID,
+			Private:     true,
+			Collaborate: util.OptionalBoolFalse,
+		})
+		if err != nil {
+			log.Error(4, "SearchRepositoryByName: %v", err)
+			continue
+		}
+		if len(repos) == 0 {
+			return
+		}
+
+		for _, repo := range repos {
+			is, err := models.Issues(&models.IssuesOptions{
+				RepoIDs:  []int64{repo.ID},
+				IsClosed: util.OptionalBoolNone,
+				IsPull:   util.OptionalBoolNone,
+			})
+			if err != nil {
+				log.Error(4, "Issues: %v", err)
+				continue
+			}
+			if err = models.IssueList(is).LoadDiscussComments(); err != nil {
+				log.Error(4, "LoadComments: %v", err)
+				continue
+			}
+			for _, issue := range is {
+				UpdateIssueIndexer(issue)
+			}
+		}
+	}
+}
+
+// UpdateIssueIndexer add/update an issue to the issue indexer
+func UpdateIssueIndexer(issue *models.Issue) {
+	var comments []string
+	for _, comment := range issue.Comments {
+		if comment.Type == models.CommentTypeComment {
+			comments = append(comments, comment.Content)
+		}
+	}
+	issueIndexerUpdateQueue.Push(&IndexerData{
+		ID:       issue.ID,
+		RepoID:   issue.RepoID,
+		Title:    issue.Title,
+		Content:  issue.Content,
+		Comments: comments,
+	})
+}
+
+// DeleteRepoIssueIndexer deletes repo's all issues indexes
+func DeleteRepoIssueIndexer(repo *models.Repository) {
+	var ids []int64
+	ids, err := models.GetIssueIDsByRepoID(repo.ID)
+	if err != nil {
+		log.Error(4, "getIssueIDsByRepoID failed: %v", err)
+		return
+	}
+
+	if len(ids) <= 0 {
+		return
+	}
+
+	issueIndexerUpdateQueue.Push(&IndexerData{
+		IDs:      ids,
+		IsDelete: true,
+	})
+}
+
+// SearchIssuesByKeyword search issue ids by keywords and repo id
+func SearchIssuesByKeyword(repoID int64, keyword string) ([]int64, error) {
+	var issueIDs []int64
+	res, err := issueIndexer.Search(keyword, repoID, 1000, 0)
+	if err != nil {
+		return nil, err
+	}
+	for _, r := range res.Hits {
+		issueIDs = append(issueIDs, r.ID)
+	}
+	return issueIDs, nil
+}
diff --git a/modules/indexer/issues/indexer_test.go b/modules/indexer/issues/indexer_test.go
new file mode 100644
index 0000000000..1b6bdec53e
--- /dev/null
+++ b/modules/indexer/issues/indexer_test.go
@@ -0,0 +1,51 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package issues
+
+import (
+	"fmt"
+	"os"
+	"path/filepath"
+	"testing"
+	"time"
+
+	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/modules/setting"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func fatalTestError(fmtStr string, args ...interface{}) {
+	fmt.Fprintf(os.Stderr, fmtStr, args...)
+	os.Exit(1)
+}
+
+func TestMain(m *testing.M) {
+	models.MainTest(m, filepath.Join("..", "..", ".."))
+}
+
+func TestSearchIssues(t *testing.T) {
+	assert.NoError(t, models.PrepareTestDatabase())
+
+	os.RemoveAll(setting.Indexer.IssueIndexerQueueDir)
+	os.RemoveAll(setting.Indexer.IssuePath)
+	if err := InitIssueIndexer(true); err != nil {
+		fatalTestError("Error InitIssueIndexer: %v\n", err)
+	}
+
+	time.Sleep(10 * time.Second)
+
+	ids, err := SearchIssuesByKeyword(1, "issue2")
+	assert.NoError(t, err)
+	assert.EqualValues(t, []int64{2}, ids)
+
+	ids, err = SearchIssuesByKeyword(1, "first")
+	assert.NoError(t, err)
+	assert.EqualValues(t, []int64{1}, ids)
+
+	ids, err = SearchIssuesByKeyword(1, "for")
+	assert.NoError(t, err)
+	assert.EqualValues(t, []int64{1, 2, 3, 5}, ids)
+}
diff --git a/modules/indexer/issues/queue_disk.go b/modules/indexer/issues/queue_disk.go
index 97e9a3d965..97bacdf99d 100644
--- a/modules/indexer/issues/queue_disk.go
+++ b/modules/indexer/issues/queue_disk.go
@@ -42,18 +42,21 @@ func (l *LevelQueue) Run() error {
 	var i int
 	var datas = make([]*IndexerData, 0, l.batchNumber)
 	for {
-		bs, err := l.queue.RPop()
-		if err != nil {
-			log.Error(4, "RPop: %v", err)
-			time.Sleep(time.Millisecond * 100)
-			continue
-		}
-
 		i++
 		if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
 			l.indexer.Index(datas)
 			datas = make([]*IndexerData, 0, l.batchNumber)
 			i = 0
+			continue
+		}
+
+		bs, err := l.queue.RPop()
+		if err != nil {
+			if err != levelqueue.ErrNotFound {
+				log.Error(4, "RPop: %v", err)
+			}
+			time.Sleep(time.Millisecond * 100)
+			continue
 		}
 
 		if len(bs) <= 0 {
@@ -69,7 +72,7 @@ func (l *LevelQueue) Run() error {
 			continue
 		}
 
-		log.Trace("LedisLocalQueue: task found: %#v", data)
+		log.Trace("LevelQueue: task found: %#v", data)
 
 		if data.IsDelete {
 			if data.ID > 0 {
diff --git a/modules/notification/indexer/indexer.go b/modules/notification/indexer/indexer.go
index 66d483c017..45752e411a 100644
--- a/modules/notification/indexer/indexer.go
+++ b/modules/notification/indexer/indexer.go
@@ -6,6 +6,7 @@ package indexer
 
 import (
 	"code.gitea.io/gitea/models"
+	issue_indexer "code.gitea.io/gitea/modules/indexer/issues"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/notification/base"
 )
@@ -35,16 +36,16 @@ func (r *indexerNotifier) NotifyCreateIssueComment(doer *models.User, repo *mode
 			issue.Comments = append(issue.Comments, comment)
 		}
 
-		models.UpdateIssueIndexer(issue)
+		issue_indexer.UpdateIssueIndexer(issue)
 	}
 }
 
 func (r *indexerNotifier) NotifyNewIssue(issue *models.Issue) {
-	models.UpdateIssueIndexer(issue)
+	issue_indexer.UpdateIssueIndexer(issue)
 }
 
 func (r *indexerNotifier) NotifyNewPullRequest(pr *models.PullRequest) {
-	models.UpdateIssueIndexer(pr.Issue)
+	issue_indexer.UpdateIssueIndexer(pr.Issue)
 }
 
 func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comment, oldContent string) {
@@ -67,7 +68,7 @@ func (r *indexerNotifier) NotifyUpdateComment(doer *models.User, c *models.Comme
 			}
 		}
 
-		models.UpdateIssueIndexer(c.Issue)
+		issue_indexer.UpdateIssueIndexer(c.Issue)
 	}
 }
 
@@ -91,18 +92,18 @@ func (r *indexerNotifier) NotifyDeleteComment(doer *models.User, comment *models
 			}
 		}
 		// reload comments to delete the old comment
-		models.UpdateIssueIndexer(comment.Issue)
+		issue_indexer.UpdateIssueIndexer(comment.Issue)
 	}
 }
 
 func (r *indexerNotifier) NotifyDeleteRepository(doer *models.User, repo *models.Repository) {
-	models.DeleteRepoIssueIndexer(repo)
+	issue_indexer.DeleteRepoIssueIndexer(repo)
 }
 
 func (r *indexerNotifier) NotifyIssueChangeContent(doer *models.User, issue *models.Issue, oldContent string) {
-	models.UpdateIssueIndexer(issue)
+	issue_indexer.UpdateIssueIndexer(issue)
 }
 
 func (r *indexerNotifier) NotifyIssueChangeTitle(doer *models.User, issue *models.Issue, oldTitle string) {
-	models.UpdateIssueIndexer(issue)
+	issue_indexer.UpdateIssueIndexer(issue)
 }
diff --git a/routers/api/v1/repo/issue.go b/routers/api/v1/repo/issue.go
index a129447c09..e63db4b6a4 100644
--- a/routers/api/v1/repo/issue.go
+++ b/routers/api/v1/repo/issue.go
@@ -13,6 +13,7 @@ import (
 
 	"code.gitea.io/gitea/models"
 	"code.gitea.io/gitea/modules/context"
+	issue_indexer "code.gitea.io/gitea/modules/indexer/issues"
 	"code.gitea.io/gitea/modules/notification"
 	"code.gitea.io/gitea/modules/setting"
 	"code.gitea.io/gitea/modules/util"
@@ -77,7 +78,7 @@ func ListIssues(ctx *context.APIContext) {
 	var labelIDs []int64
 	var err error
 	if len(keyword) > 0 {
-		issueIDs, err = models.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword)
+		issueIDs, err = issue_indexer.SearchIssuesByKeyword(ctx.Repo.Repository.ID, keyword)
 	}
 
 	if splitted := strings.Split(ctx.Query("labels"), ","); len(splitted) > 0 {
diff --git a/routers/init.go b/routers/init.go
index 1da21a351b..2fc3bb9668 100644
--- a/routers/init.go
+++ b/routers/init.go
@@ -15,6 +15,7 @@ import (
 	"code.gitea.io/gitea/modules/cache"
 	"code.gitea.io/gitea/modules/cron"
 	"code.gitea.io/gitea/modules/highlight"
+	issue_indexer "code.gitea.io/gitea/modules/indexer/issues"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/mailer"
 	"code.gitea.io/gitea/modules/markup"
@@ -90,7 +91,7 @@ func GlobalInit() {
 
 		// Booting long running goroutines.
 		cron.NewContext()
-		if err := models.InitIssueIndexer(); err != nil {
+		if err := issue_indexer.InitIssueIndexer(false); err != nil {
 			log.Fatal(4, "Failed to initialize issue indexer: %v", err)
 		}
 		models.InitRepoIndexer()
diff --git a/routers/repo/issue.go b/routers/repo/issue.go
index f0a6d1bd5f..33d3ec6a74 100644
--- a/routers/repo/issue.go
+++ b/routers/repo/issue.go
@@ -23,6 +23,7 @@ import (
 	"code.gitea.io/gitea/modules/auth"
 	"code.gitea.io/gitea/modules/base"
 	"code.gitea.io/gitea/modules/context"
+	issue_indexer "code.gitea.io/gitea/modules/indexer/issues"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/markup/markdown"
 	"code.gitea.io/gitea/modules/notification"
@@ -146,7 +147,7 @@ func issues(ctx *context.Context, milestoneID int64, isPullOption util.OptionalB
 
 	var issueIDs []int64
 	if len(keyword) > 0 {
-		issueIDs, err = models.SearchIssuesByKeyword(repo.ID, keyword)
+		issueIDs, err = issue_indexer.SearchIssuesByKeyword(repo.ID, keyword)
 		if err != nil {
 			ctx.ServerError("issueIndexer.Search", err)
 			return