Browse Source

add handlequery func

animeic 2 years ago
parent
commit
7a8d1bc510
5 changed files with 261 additions and 139 deletions
  1. 206 0
      boxcost/api/print.go
  2. 4 139
      boxcost/api/report.go
  3. 2 0
      boxcost/api/router.go
  4. 38 0
      boxcost/api/utils.go
  5. 11 0
      boxcost/db/model/aggregate-test.go

+ 206 - 0
boxcost/api/print.go

@@ -1,10 +1,22 @@
 package api
 
 import (
+	"box-cost/db/model"
+	"box-cost/db/repo"
+	randc "crypto/rand"
+	"encoding/json"
 	"fmt"
+	"log"
+	"math/big"
+	"math/rand"
+	"sort"
+	"sync"
 	"time"
+	"unsafe"
 
 	"github.com/gin-gonic/gin"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
 )
 
 func Printr(c *gin.Context, apictx *ApiSession) (interface{}, error) {
@@ -20,3 +32,197 @@ func Printr(c *gin.Context, apictx *ApiSession) (interface{}, error) {
 	return "success", nil
 
 }
+
+// 生成百万数据
+func GenData(_ *gin.Context, apictx *ApiSession) (interface{}, error) {
+	var wg sync.WaitGroup
+	for i := 0; i < 100; i++ {
+		wg.Add(1)
+		go insertData(apictx, &wg)
+	}
+	wg.Wait()
+
+	return true, nil
+}
+
+// 聚合查询百万数据
+func SearchData(_ *gin.Context, apictx *ApiSession) (interface{}, error) {
+	dataStatResult := make(chan bson.M)
+	var output ResultSlice
+	var wg sync.WaitGroup
+	// 82个gorutine 执行聚合
+	for i := 18; i < 100; i++ {
+		wg.Add(1)
+		go DataAggregate(apictx, i, dataStatResult, &wg)
+	}
+
+	// 从管道中获取聚合结果 保存在数组中
+	for value := range dataStatResult {
+		output = append(output, OutPut{
+			Age:       value["age"].(int32),
+			Sex:       value["sex"].(int32),
+			Total:     value["total"].(int32),
+			AvgSalary: value["avgSalary"].(float64),
+		})
+		if len(output) == 164 { // 如果大于164,不会跳出;如果小于164,wg.done != wg.add ,wg.wait阻塞
+			break
+		}
+	}
+
+	wg.Wait()
+	//倒序排列
+	sort.Sort(output)
+	for _, v := range output {
+		result, err := json.Marshal(&v)
+		if err != nil {
+			fmt.Println("json.marshal failed, err:", err)
+			return nil, err
+		}
+		fmt.Println(string(result))
+	}
+	return output, nil
+}
+
+func insertData(apictx *ApiSession, wg *sync.WaitGroup) {
+	collectName := "aggregate-test"
+	rowNum := 10000
+	for i := 0; i < rowNum; i++ {
+		repo.RepoAddDoc(apictx.CreateRepoCtx(), collectName, &model.AggregateTest{
+			Name:   randName(6),
+			Age:    randAge(),
+			Sex:    randSex(),
+			Salary: randSalary(),
+		})
+
+	}
+	wg.Done()
+}
+
+func randSex() *int {
+	result, _ := randc.Int(randc.Reader, big.NewInt(2))
+	sex := int(result.Int64())
+	return &sex
+}
+func randAge() int {
+	result, _ := randc.Int(randc.Reader, big.NewInt(82))
+	return int(result.Int64() + 18)
+}
+
+func randSalary() int {
+	result, _ := randc.Int(randc.Reader, big.NewInt(10000))
+
+	return int(result.Int64() + 2000)
+}
+
+const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+
+var src = rand.NewSource(time.Now().UnixNano())
+
+const (
+	// 6 bits to represent a letter index
+	letterIdBits = 6
+	// All 1-bits as many as letterIdBits
+	letterIdMask = 1<<letterIdBits - 1
+	letterIdMax  = 63 / letterIdBits
+)
+
+func randName(n int) string {
+	b := make([]byte, n)
+	// A rand.Int63() generates 63 random bits, enough for letterIdMax letters!
+	for i, cache, remain := n-1, src.Int63(), letterIdMax; i >= 0; {
+		if remain == 0 {
+			cache, remain = src.Int63(), letterIdMax
+		}
+		if idx := int(cache & letterIdMask); idx < len(letters) {
+			b[i] = letters[idx]
+			i--
+		}
+		cache >>= letterIdBits
+		remain--
+	}
+	return *(*string)(unsafe.Pointer(&b))
+}
+
+func genPipeline(age int) (bson.D, bson.D, bson.D) {
+
+	matchStage := bson.D{
+		{"$match", bson.D{
+			{"age",
+				bson.D{
+					{"$eq", age},
+				}},
+		}},
+	}
+	groupStage := bson.D{
+		{"$group", bson.D{
+			{"_id", bson.D{
+				{"age", "$age"},
+				{"sex", "$sex"},
+			}},
+			{"age", bson.D{
+				{"$first", "$age"},
+			}},
+			{"sex", bson.D{
+				{"$first", "$sex"},
+			}},
+			{"total", bson.D{
+				{"$sum", 1},
+			}},
+			{"avgSalary", bson.D{
+				{"$avg", "$salary"},
+			}},
+		}},
+	}
+	projectStage := bson.D{
+		{"$project", bson.D{
+			{"_id", 0},
+			{"age", 1},
+			{"sex", 1},
+			{"total", 1},
+			{"avgSalary", 1},
+		}},
+	}
+
+	return matchStage, groupStage, projectStage
+
+}
+
+func DataAggregate(apictx *ApiSession, age int, resultChan chan bson.M, wg *sync.WaitGroup) {
+	matchStage, groupStage, projectStage := genPipeline(age)
+
+	// opts := options.Aggregate().SetMaxTime(15 * time.Second)
+	cursor, err := apictx.Svc.Mongo.GetCollection("aggregate-test").Aggregate(apictx.CreateRepoCtx().Ctx, mongo.Pipeline{matchStage, groupStage, projectStage})
+	if err != nil {
+		log.Println(err)
+	}
+	//打印文档内容
+	var results []bson.M
+	if err = cursor.All(apictx.CreateRepoCtx().Ctx, &results); err != nil {
+		log.Println(err)
+	}
+	log.Printf("%#v\n", results)
+	for _, result := range results {
+		resultChan <- result
+	}
+
+	wg.Done()
+}
+
+type OutPut struct {
+	Age       int32   `json:"age"`
+	Sex       int32   `json:"sex"`
+	Total     int32   `json:"total"`
+	AvgSalary float64 `json:"avg_salary"`
+}
+
+type ResultSlice []OutPut
+
+func (a ResultSlice) Len() int { // 重写 Len() 方法
+	return len(a)
+}
+func (a ResultSlice) Swap(i, j int) { // 重写 Swap() 方法
+	a[i], a[j] = a[j], a[i]
+}
+func (a ResultSlice) Less(i, j int) bool { // 重写 Less() 方法, 从大到小排序
+	return a[j].Age < a[i].Age
+}

+ 4 - 139
boxcost/api/report.go

@@ -8,8 +8,6 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/xuri/excelize/v2"
-	"go.mongodb.org/mongo-driver/bson"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 )
 
 // 统计报表 按时间范围,供应商  包装-计划(多选) 维度进行过滤,形成报表。可以下载
@@ -26,43 +24,10 @@ func Report(r *GinRouter) {
 func ReportProduceList(c *gin.Context, apictx *ApiSession) (interface{}, error) {
 	page, size, query := UtilQueryPageSize(c)
 	// 条件处理
-	query["status"] = "complete"
-	if _supplierId, ok := query["supplierId"]; ok {
-		delete(query, "supplierId")
-		supplierId, _ := primitive.ObjectIDFromHex(_supplierId.(string))
-		if !supplierId.IsZero() {
-			query["supplierId"] = supplierId
-
-		}
-	}
-
-	if _timeRange, ok := query["timeRange"]; ok {
-		timeRange, _ := _timeRange.([]interface{})
-
-		if len(timeRange) == 2 {
-			start, end := getTimeRange(timeRange[0].(string), timeRange[1].(string))
-			query["completeTime"] = bson.M{"$gte": start, "$lte": end}
-		}
-		delete(query, "timeRange")
-	}
-
-	if _planIds, ok := query["planIds"]; ok {
-		if len(_planIds.([]interface{})) > 0 {
-			planQuery := bson.A{}
-			for _, _planId := range _planIds.([]interface{}) {
-				planId, _ := primitive.ObjectIDFromHex(_planId.(string))
-				planQuery = append(planQuery, bson.M{"planId": planId})
-			}
-			query["$or"] = planQuery
-		}
-		delete(query, "planIds")
-	}
-	fmt.Println(query)
-
 	// 获取采购单符合条件的信息
 	return repo.RepoPageSearch(apictx.CreateRepoCtx(), &repo.PageSearchOptions{
 		CollectName: repo.CollectionBillProduce,
-		Query:       query,
+		Query:       handleReportQuery(query),
 		Page:        page,
 		Size:        size,
 	})
@@ -72,43 +37,9 @@ func ReportProduceList(c *gin.Context, apictx *ApiSession) (interface{}, error)
 // 采购单
 func ReportPurchaseList(c *gin.Context, apictx *ApiSession) (interface{}, error) {
 	page, size, query := UtilQueryPageSize(c)
-	// 条件处理
-	query["status"] = "complete"
-
-	if _supplierId, ok := query["supplierId"]; ok {
-		delete(query, "supplierId")
-		supplierId, _ := primitive.ObjectIDFromHex(_supplierId.(string))
-		if !supplierId.IsZero() {
-			query["supplierId"] = supplierId
-
-		}
-	}
-
-	if _timeRange, ok := query["timeRange"]; ok {
-		timeRange, _ := _timeRange.([]interface{})
-
-		if len(timeRange) == 2 {
-			start, end := getTimeRange(timeRange[0].(string), timeRange[1].(string))
-			query["completeTime"] = bson.M{"$gte": start, "$lte": end}
-		}
-		delete(query, "timeRange")
-	}
-
-	if _planIds, ok := query["planIds"]; ok {
-		if len(_planIds.([]interface{})) > 0 {
-			planQuery := bson.A{}
-			for _, _planId := range _planIds.([]interface{}) {
-				planId, _ := primitive.ObjectIDFromHex(_planId.(string))
-				planQuery = append(planQuery, bson.M{"planId": planId})
-			}
-			query["$or"] = planQuery
-		}
-		delete(query, "planIds")
-	}
-	// 获取采购单符合条件的信息
 	return repo.RepoPageSearch(apictx.CreateRepoCtx(), &repo.PageSearchOptions{
 		CollectName: repo.CollectionBillPurchase,
-		Query:       query,
+		Query:       handleReportQuery(query),
 		Page:        page,
 		Size:        size,
 	})
@@ -117,45 +48,11 @@ func ReportPurchaseList(c *gin.Context, apictx *ApiSession) (interface{}, error)
 
 func ReportProduceDownload(c *gin.Context, apictx *ApiSession) (interface{}, error) {
 	_, _, query := UtilQueryPageSize(c)
-	// 条件处理
-	query["status"] = "complete"
-
-	if _supplierId, ok := query["supplierId"]; ok {
-		delete(query, "supplierId")
-		supplierId, _ := primitive.ObjectIDFromHex(_supplierId.(string))
-		if !supplierId.IsZero() {
-			query["supplierId"] = supplierId
-
-		}
-	}
-
-	if _timeRange, ok := query["timeRange"]; ok {
-		timeRange, _ := _timeRange.([]interface{})
-
-		if len(timeRange) == 2 {
-			start, end := getTimeRange(timeRange[0].(string), timeRange[1].(string))
-			query["completeTime"] = bson.M{"$gte": start, "$lte": end}
-		}
-		delete(query, "timeRange")
-	}
-
-	if _planIds, ok := query["planIds"]; ok {
-		if len(_planIds.([]interface{})) > 0 {
-			planQuery := bson.A{}
-			for _, _planId := range _planIds.([]interface{}) {
-				planId, _ := primitive.ObjectIDFromHex(_planId.(string))
-				planQuery = append(planQuery, bson.M{"planId": planId})
-			}
-			query["$or"] = planQuery
-		}
-		delete(query, "planIds")
-	}
-
 	// 获取采符合条件的信息
 	produces := []model.ProduceBill{}
 	err := repo.RepoDocsSearch(apictx.CreateRepoCtx(), &repo.PageSearchOptions{
 		CollectName: repo.CollectionBillProduce,
-		Query:       query,
+		Query:       handleReportQuery(query),
 	}, &produces)
 	if err != nil || len(produces) < 1 {
 		return nil, errors.New("数据不存在")
@@ -240,43 +137,11 @@ func ReportProduceDownload(c *gin.Context, apictx *ApiSession) (interface{}, err
 
 func ReportPurchaseDownload(c *gin.Context, apictx *ApiSession) (interface{}, error) {
 	_, _, query := UtilQueryPageSize(c)
-	// 条件处理
-	query["status"] = "complete"
-
-	if _supplierId, ok := query["supplierId"]; ok {
-		delete(query, "supplierId")
-		supplierId, _ := primitive.ObjectIDFromHex(_supplierId.(string))
-		if !supplierId.IsZero() {
-			query["supplierId"] = supplierId
-		}
-	}
 
-	if _timeRange, ok := query["timeRange"]; ok {
-		timeRange, _ := _timeRange.([]interface{})
-
-		if len(timeRange) == 2 {
-			start, end := getTimeRange(timeRange[0].(string), timeRange[1].(string))
-			query["completeTime"] = bson.M{"$gte": start, "$lte": end}
-		}
-		delete(query, "timeRange")
-	}
-
-	if _planIds, ok := query["planIds"]; ok {
-		if len(_planIds.([]interface{})) > 0 {
-			planQuery := bson.A{}
-			for _, _planId := range _planIds.([]interface{}) {
-				planId, _ := primitive.ObjectIDFromHex(_planId.(string))
-				planQuery = append(planQuery, bson.M{"planId": planId})
-			}
-			query["$or"] = planQuery
-		}
-		delete(query, "planIds")
-	}
-	// 获取符合条件的信息
 	purchases := []model.PurchaseBill{}
 	err := repo.RepoDocsSearch(apictx.CreateRepoCtx(), &repo.PageSearchOptions{
 		CollectName: repo.CollectionBillPurchase,
-		Query:       query,
+		Query:       handleReportQuery(query),
 	}, &purchases)
 	if err != nil || len(purchases) < 1 {
 		return nil, errors.New("数据不存在")

+ 2 - 0
boxcost/api/router.go

@@ -15,6 +15,8 @@ func RegRouters(svc *Service) {
 	//数据存储
 	boxcost.POST("/save/policy", ServiceObsUploadPolicy)
 	boxcost.GET("/printr", Printr)
+	boxcost.GET("/genData", GenData)
+	boxcost.GET("/searchData", SearchData)
 
 	// 材料管理
 	Material(boxcost)

+ 38 - 0
boxcost/api/utils.go

@@ -124,3 +124,41 @@ func getTimeRange(startDate, endDate string) (start, end time.Time) {
 	end, _ = time.ParseInLocation("2006-01-02 15:04:05", endDateTime, loc)
 	return
 }
+
+func handleReportQuery(query map[string]interface{}) map[string]interface{} {
+	// 条件处理
+	query["status"] = "complete"
+	if _supplierId, ok := query["supplierId"]; ok {
+		delete(query, "supplierId")
+		fmt.Printf("id::::%#v\n", _supplierId)
+		supplierId, _ := primitive.ObjectIDFromHex(_supplierId.(string))
+		if !supplierId.IsZero() {
+			query["supplierId"] = supplierId
+
+		}
+	}
+
+	if _timeRange, ok := query["timeRange"]; ok {
+		timeRange, _ := _timeRange.([]interface{})
+
+		if len(timeRange) == 2 {
+			start, end := getTimeRange(timeRange[0].(string), timeRange[1].(string))
+			query["completeTime"] = bson.M{"$gte": start, "$lte": end}
+		}
+		delete(query, "timeRange")
+	}
+
+	if _planIds, ok := query["planIds"]; ok {
+		if len(_planIds.([]interface{})) > 0 {
+			planQuery := bson.A{}
+			for _, _planId := range _planIds.([]interface{}) {
+				planId, _ := primitive.ObjectIDFromHex(_planId.(string))
+				planQuery = append(planQuery, bson.M{"planId": planId})
+			}
+			query["$or"] = planQuery
+		}
+		delete(query, "planIds")
+	}
+	return query
+
+}

+ 11 - 0
boxcost/db/model/aggregate-test.go

@@ -0,0 +1,11 @@
+package model
+
+import "go.mongodb.org/mongo-driver/bson/primitive"
+
+type AggregateTest struct {
+	Id     primitive.ObjectID `bson:"_id,omitempty" json:"_id"`
+	Name   string             `bson:"name,omitempty" json:"name"`
+	Age    int                `bson:"age,omitempty" json:"age"`
+	Sex    *int               `bson:"sex,omitempty" json:"sex"`
+	Salary int                `bson:"salary,omitempty" json:"salary"`
+}