Преглед изворни кода

Merge branch 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract into dev3.4.1

* 'dev3.4.1' of http://192.168.3.207:10080/qmx/jy-data-extract:
  xg

# Conflicts:
#	udpcreateindex/src/main.go
Jianghan пре 4 година
родитељ
комит
6501e4230a
2 измењених фајлова са 55 додато и 9 уклоњено
  1. 51 8
      udpcreateindex/src/biddingall.go
  2. 4 1
      udpcreateindex/src/main.go

+ 51 - 8
udpcreateindex/src/biddingall.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"time"
 	//"fmt"
 	"log"
 	qutil "qfw/util"
@@ -234,6 +235,10 @@ func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
 				}
 				if len(cid) > 0 {
 					tmp["entidlist"] = cid
+					tmp_up := []map[string]interface{}{}
+					tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]})
+					tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}})
+					UpdataMgoCache <- tmp_up
 				}
 			}
 			//对projectscope字段的索引处理
@@ -479,7 +484,7 @@ func preNUm(data byte) int {
 	var mask byte = 0x80
 	var num int = 0
 	//8bit中首个0bit前有多少个1bits
-	for i:=0; i < 8; i++ {
+	for i := 0; i < 8; i++ {
 		if (data & mask) == mask {
 			num++
 			mask = mask >> 1
@@ -500,11 +505,11 @@ func isGBK(data []byte) bool {
 			continue
 		} else {
 			//大于127的使用双字节编码,落在gbk编码范围内的字符
-			if  data[i] >= 0x81 &&
+			if data[i] >= 0x81 &&
 				data[i] <= 0xfe &&
-				data[i + 1] >= 0x40 &&
-				data[i + 1] <= 0xfe &&
-				data[i + 1] != 0xf7 {
+				data[i+1] >= 0x40 &&
+				data[i+1] <= 0xfe &&
+				data[i+1] != 0xf7 {
 				i += 2
 				continue
 			} else {
@@ -517,7 +522,7 @@ func isGBK(data []byte) bool {
 
 func isUtf8(data []byte) bool {
 	i := 0
-	for i < len(data)  {
+	for i < len(data) {
 		if (data[i] & 0x80) == 0x00 {
 			// 0XXX_XXXX
 			i++
@@ -530,17 +535,55 @@ func isUtf8(data []byte) bool {
 			// 1111_110X 10XX_XXXX 10XX_XXXX 10XX_XXXX 10XX_XXXX 10XX_XXXX
 			// preNUm() 返回首个字节的8个bits中首个0bit前面1bit的个数,该数量也是该字符所使用的字节数
 			i++
-			for j := 0; j < num - 1; j++ {
+			for j := 0; j < num-1; j++ {
 				//判断后面的 num - 1 个字节是不是都是10开头
 				if (data[i] & 0xc0) != 0x80 {
 					return false
 				}
 				i++
 			}
-		} else  {
+		} else {
 			//其他情况说明不是utf-8
 			return false
 		}
 	}
 	return true
 }
+
+//更新extract表
+func UpdateExtract() {
+	qutil.Debug("Update Extract...")
+	extract := qutil.ObjToString(bidding["extractcollect"])
+	arru := make([][]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-UpdataMgoCache:
+			arru[indexu] = v
+			indexu++
+			if indexu == 200 {
+				SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					extractmgo.UpdateBulk(extract, arru...)
+				}(arru)
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				SP <- true
+				go func(arru [][]map[string]interface{}) {
+					defer func() {
+						<-SP
+					}()
+					extractmgo.UpdateBulk(extract, arru...)
+				}(arru[:indexu])
+				arru = make([][]map[string]interface{}, 200)
+				indexu = 0
+			}
+		}
+	}
+}

+ 4 - 1
udpcreateindex/src/main.go

@@ -43,6 +43,8 @@ var (
 
 	winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
 )
+var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
+var SP = make(chan bool, 5)
 
 func init() {
 	util.ReadConfig(&Sysconfig)
@@ -181,7 +183,8 @@ func init() {
 }
 
 func main() {
-	//go task_index()
+	go task_index()
+	go UpdateExtract() //抽取表中新增entidlist字段
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
 	udpclient.Listen(processUdpMsg)