jianghan 2 ay önce
ebeveyn
işleme
e6f1f4afee
8 değiştirilmiş dosya ile 88 ekleme ve 588 silme
  1. 32 35
      src/front/front.go
  2. 29 34
      src/front/server.go
  3. 3 3
      src/front/user.go
  4. 6 11
      src/main.go
  5. 12 11
      src/util/config.go
  6. 0 423
      src/util/elasticSim.go
  7. 0 61
      src/util/influxdb.go
  8. 6 10
      src/util/util.go

+ 32 - 35
src/front/front.go

@@ -1,37 +1,34 @@
 package front
 
 import (
+	"dataCheck/util"
+	"fmt"
 	"github.com/shopspring/decimal"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"math"
-	"strconv"
-
-	//"container/list"
-	"fmt"
-	qu "qfw/util"
-	"qfw/util/redis"
-	"sync/atomic"
-
-	mgo "mongodb"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
-	"util"
 )
 
-//查找未被标注的数据id
+// 查找未被标注的数据id
 func GetNoCheckedId(id, coll string) (string, bool) {
 	defer qu.Catch()
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gt": mgo.StringTOBsonId(id),
+			"$gt": mongodb.StringTOBsonId(id),
 		},
 	}
 	sess := util.MgoM.GetMgoConn()
 	defer util.MgoM.DestoryMongoConn(sess)
 	it := sess.DB(util.MgoM.DbName).C(coll).Find(q).Sort("_id").Select(map[string]interface{}{"_id": 1}).Iter()
 	for tmp := make(map[string]interface{}); it.Next(&tmp); {
-		id = mgo.BsonIdToSId(tmp["_id"])
+		id = mongodb.BsonIdToSId(tmp["_id"])
 		exists, err := redis.Exists("extcheck", coll+"_"+id)
 		if err == nil && !exists {
 			return id, exists
@@ -41,7 +38,7 @@ func GetNoCheckedId(id, coll string) (string, bool) {
 	return id, true
 }
 
-//查询列表数据
+// 查询列表数据
 func getListInfo(coll string, query map[string]interface{}, currentpage int) []map[string]interface{} {
 	start := (currentpage - 1) * 50
 	infoList, _ := util.MgoM.Find(coll, query, `{"_id":1}`, `{"_id":1,"title":1,"detail":1,"site":1,"href":1,"ck_data":1}`, false, start, 50)
@@ -56,7 +53,7 @@ func getListInfo(coll string, query map[string]interface{}, currentpage int) []m
 		if !strings.HasPrefix(href, "http") {
 			v["href"] = "http://" + href
 		}
-		v["_id"] = mgo.BsonIdToSId(v["_id"])
+		v["_id"] = mongodb.BsonIdToSId(v["_id"])
 		(*infoList)[k] = v
 		v["num"] = k + 1 + start
 	}
@@ -74,7 +71,7 @@ func getDetail(id, coll string) map[string]interface{} {
 	// 	}
 	// }
 	info["detail"] = qu.ObjToString(info["title"]) + "</br>" + qu.ObjToString(info["detail"])
-	info["_id"] = mgo.BsonIdToSId(info["_id"])
+	info["_id"] = mongodb.BsonIdToSId(info["_id"])
 	//ck_pclisext, _ := info["ck_pclisext"].(bool)
 	ck_pclistag, _ := info["ck_pclistag"].(bool)
 	//ck_wodrisext, _ := info["ck_wodrisext"].(bool)
@@ -127,7 +124,7 @@ func getDetail(id, coll string) map[string]interface{} {
 	return rep
 }
 
-//获取公告关联信息
+// 获取公告关联信息
 func setOtherInfo(info map[string]interface{}) (result, moreArr []map[string]interface{}) {
 	if otherInfo, ok := info["info"].([]interface{}); ok && len(otherInfo) > 0 {
 		//中标,成交、合同、招标(不含变更)
@@ -227,7 +224,7 @@ func DealData(tmpLen int, publishtime float64, tmp []map[string]interface{}, mor
 	return
 }
 
-//拼装中标候选人
+// 拼装中标候选人
 func setWorderMap(info map[string]interface{}) ([]interface{}, []map[string]interface{}) {
 	//基本参数--中标候选人
 	winnerorder, _ := util.Config.Biaozhu["winnerorder"].([]interface{})
@@ -265,7 +262,7 @@ func setWorderMap(info map[string]interface{}) ([]interface{}, []map[string]inte
 	return worders, isNewAndStatus
 }
 
-//拼装标的物
+// 拼装标的物
 func setPurchasingMap(info map[string]interface{}) ([]interface{}, []map[string]interface{}) {
 	purchasinglist, _ := util.Config.Biaozhu["purchasinglist"].([]interface{})
 	purchasinglists := []interface{}{}
@@ -309,7 +306,7 @@ func setPurchasingMap(info map[string]interface{}) ([]interface{}, []map[string]
 	return purchasinglists, isNewAndStatus
 }
 
-//拼装子包信息
+// 拼装子包信息
 func setPaceMap(info map[string]interface{}) ([]map[string]interface{}, []string, []bool) {
 	var confpack []interface{}
 	confpack, _ = util.Config.Biaozhu["package"].([]interface{})
@@ -404,7 +401,7 @@ func setPaceMap(info map[string]interface{}) ([]map[string]interface{}, []string
 	return packages, sortpackskey, isNewPkgArr
 }
 
-//拼装抽取common值
+// 拼装抽取common值
 func setExtComMap(info map[string]interface{}) ([]interface{}, []interface{}, []interface{}) {
 	//基本参数
 	common, _ := util.Config.Biaozhu["common"].([]interface{})
@@ -488,7 +485,7 @@ func setExtComMap(info map[string]interface{}) ([]interface{}, []interface{}, []
 	return common, timeplace, other
 }
 
-//标注基本字段
+// 标注基本字段
 func BzJBZD(content []interface{}, set, unset, errset map[string]interface{}) {
 	info, _ := content[0].(map[string]interface{})
 	if uInputs, ok := info["uInput"].([]interface{}); ok {
@@ -532,7 +529,7 @@ func BzJBZD(content []interface{}, set, unset, errset map[string]interface{}) {
 	}
 }
 
-//标注时间地点
+// 标注时间地点
 func BzSJDD(content []interface{}, set, unset, errset map[string]interface{}) {
 	info, _ := content[0].(map[string]interface{})
 	if uInputs, ok := info["uInput"].([]interface{}); ok {
@@ -567,7 +564,7 @@ func BzSJDD(content []interface{}, set, unset, errset map[string]interface{}) {
 	// qu.Debug("unset---", unset)
 }
 
-//标注标的信息
+// 标注标的信息
 func BzBDXX(content []interface{}, set, unset, errset map[string]interface{}, isext, istag bool, status int) {
 	//qu.Debug("是否抽取:", status, isext, len(content), errset)
 	if status == -1 {
@@ -632,7 +629,7 @@ func BzBDXX(content []interface{}, set, unset, errset map[string]interface{}, is
 	}
 }
 
-//标注多包信息
+// 标注多包信息
 func BzDBXX(content []interface{}, set, unset, errset map[string]interface{}, isext bool, status int) {
 	//qu.Debug("是否抽取:", status, isext, len(content), errset)
 	if status == -1 {
@@ -751,7 +748,7 @@ func BzDBXX(content []interface{}, set, unset, errset map[string]interface{}, is
 	// qu.Debug("errset---", errset)
 }
 
-//标注中标候选人信息
+// 标注中标候选人信息
 func BzZBHXRXX(content []interface{}, set, unset, errset map[string]interface{}, isext bool, status int) {
 	if status == -1 {
 		return
@@ -811,7 +808,7 @@ func BzZBHXRXX(content []interface{}, set, unset, errset map[string]interface{},
 	}
 }
 
-//标注其余信息
+// 标注其余信息
 func BzQYXX(content []interface{}, set, unset, errset map[string]interface{}) {
 	info, _ := content[0].(map[string]interface{})
 	if uInputs, ok := info["uInput"].([]interface{}); ok {
@@ -874,7 +871,7 @@ func mapIntAdd(k, val string, tmp map[string]map[string]int) map[string]map[stri
 	return tmp
 }
 
-//通过id查询数据
+// 通过id查询数据
 func GetDataById(coll string, ids []string, stype string, tmp map[string]map[string]interface{}) (bool, string, int64) {
 	defer qu.Catch()
 	success := true
@@ -987,7 +984,7 @@ func GetDataById(coll string, ids []string, stype string, tmp map[string]map[str
 	return success, msg, n
 }
 
-//通过id查询数据
+// 通过id查询数据
 func GetDataById1(coll string, ids []string, stype string, tmp map[string]map[string]interface{}) (bool, string, int64) {
 	defer qu.Catch()
 	success := true
@@ -1152,7 +1149,7 @@ func GetDataById1(coll string, ids []string, stype string, tmp map[string]map[st
 	return success, msg, n
 }
 
-//更新数据
+// 更新数据
 func UpdateMarkColl(bidData, markData *map[string]interface{}) {
 	defer qu.Catch()
 	ck_data := qu.IntAll((*markData)["ck_data"])
@@ -1174,11 +1171,11 @@ func UpdateMarkColl(bidData, markData *map[string]interface{}) {
 	}
 }
 
-//获取当前数据下一条的id
+// 获取当前数据下一条的id
 func GetNextDataId(id, coll string, query map[string]interface{}) string {
 	nextIdQuery := map[string]interface{}{
 		"_id": map[string]interface{}{
-			"$gt": mgo.StringTOBsonId(id),
+			"$gt": mongodb.StringTOBsonId(id),
 		},
 	}
 	for k, v := range query {
@@ -1187,12 +1184,12 @@ func GetNextDataId(id, coll string, query map[string]interface{}) string {
 
 	one, _ := util.MgoM.Find(coll, nextIdQuery, `{"_id":1}`, `{"_id":1}`, true, 0, 1)
 	if one != nil && len(*one) == 1 {
-		return mgo.BsonIdToSId((*one)[0]["_id"])
+		return mongodb.BsonIdToSId((*one)[0]["_id"])
 	}
 	return id
 }
 
-//获取已标注和数据总数的信息
+// 获取已标注和数据总数的信息
 func GetCheckedAndAllDataInfo(query map[string]interface{}, coll string) (int, int) {
 	allCount := util.MgoM.Count(coll, query)
 	ckDataQuery := map[string]interface{}{
@@ -1207,7 +1204,7 @@ func GetCheckedAndAllDataInfo(query map[string]interface{}, coll string) (int, i
 	return checkedCount, allCount
 }
 
-//查询表中已标数据的标注人
+// 查询表中已标数据的标注人
 func GetLabeler(coll string) (labeler []string) {
 	defer qu.Catch()
 	sess := util.MgoM.GetMgoConn()

+ 29 - 34
src/front/server.go

@@ -3,25 +3,20 @@ package front
 
 import (
 	"context"
+	"dataCheck/util"
 	"encoding/json"
-	"io/ioutil"
-
-	//"container/list"
 	"fmt"
-	qu "qfw/util"
-	"qfw/util/redis"
-
-	mgo "mongodb"
+	"github.com/go-xweb/xweb"
+	es "github.com/olivere/elastic/v7"
+	"github.com/tealeg/xlsx"
+	"go.mongodb.org/mongo-driver/bson"
+	"io/ioutil"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"strings"
 	"sync"
 	"time"
-	"util"
-
-	"github.com/go-xweb/xweb"
-	"go.mongodb.org/mongo-driver/bson"
-
-	"github.com/tealeg/xlsx"
-	es "gopkg.in/olivere/elastic.v7"
 )
 
 type Front struct {
@@ -66,7 +61,7 @@ var LABELER, AUDITOR, ADMIN = 1, 2, 3 //标注人员,审核人员,管理员
 		4:删除
 */
 
-//列表
+// 列表
 func (i *Front) ListInfo() error {
 	//loginuser := i.GetSession("loginuser").(string)
 	spidercode := i.GetString("spidercode")
@@ -77,7 +72,7 @@ func (i *Front) ListInfo() error {
 	max := i.GetString("maxval")
 	hasno, _ := i.GetBool("hasno")
 	notag, _ := i.GetBool("notag")
-	pagenum, _ := i.GetInteger("pagenum")
+	pagenum, _ := i.GetInt("pagenum")
 	labeler := i.GetString("labeler")
 	if pagenum == 0 { //页码
 		pagenum = 1
@@ -150,7 +145,7 @@ func (i *Front) ListInfo() error {
 	i.SetSession("coll", coll)   //session中存入查询表
 	i.SetSession("query", query) //session中存入查询条件
 
-	listData := getListInfo(coll, query, pagenum)
+	listData := getListInfo(coll, query, int(pagenum))
 	i.T["list"] = listData
 	i.T["type"] = stype
 	i.T["field"] = field
@@ -180,7 +175,7 @@ func (i *Front) ListInfo() error {
 	return i.Render("list.html", &i.T)
 }
 
-//判断id是否正在被标记
+// 判断id是否正在被标记
 func (i *Front) CheckId() {
 	defer qu.Catch()
 	CheckLock.Lock()
@@ -203,14 +198,14 @@ func (i *Front) CheckId() {
 	i.ServeJson(map[string]interface{}{"msg": "数据校验表出错!"})
 }
 
-//标注
+// 标注
 func (i *Front) Biaozhu() error {
 	b := false
 	obj := []map[string]interface{}{}
 	//ispackage := i.GetString("ispackage")
 	key := i.GetString("key")
 	_id := i.GetString("_id")
-	stype, _ := i.GetInteger("stype")
+	stype, _ := i.GetInt("stype")
 	err := json.Unmarshal([]byte(key), &obj)
 	if err != nil {
 		i.ServeJson(b)
@@ -412,7 +407,7 @@ func (i *Front) Biaozhu() error {
 	return nil
 }
 
-//查询信息
+// 查询信息
 func (i *Front) Detail(id string) error {
 	coll, _ := i.GetSession("coll").(string)
 	//if coll != "" {
@@ -466,7 +461,7 @@ func (i *Front) Detail(id string) error {
 	return i.Render("detail.html", &i.T)
 }
 
-//通过excel表格导入
+// 通过excel表格导入
 func (i *Front) ImportByExcel() {
 	defer qu.Catch()
 	//success := false
@@ -549,12 +544,12 @@ func (i *Front) GetEsCount() {
 	if json.Unmarshal([]byte(estext), &esJson) != nil || len(esJson) == 0 {
 		msg = "Es语句错误"
 	} else {
-		count = util.Es.Count(util.Index, util.Itype, estext)
+		count = util.Es.Count(util.Index, estext)
 	}
 	i.ServeJson(map[string]interface{}{"count": count, "msg": msg})
 }
 
-//通过es语句导入
+// 通过es语句导入
 func (i *Front) ImportByEs() {
 	defer qu.Catch()
 	//success := false
@@ -571,7 +566,7 @@ func (i *Front) ImportByEs() {
 	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
-	escount := util.Es.Count(util.Index, util.Itype, estext)
+	escount := util.Es.Count(util.Index, estext)
 	qu.Debug("查询总数:", escount)
 	if escount > 0 {
 		//查询条件类型转换
@@ -640,7 +635,7 @@ func (i *Front) ImportByEs() {
 	i.ServeJson(map[string]interface{}{"msg": msg})
 }
 
-//同步数据
+// 同步数据
 func (i *Front) SyncMarked() {
 	syncColl := i.GetString("coll")
 	sess := util.MgoM.GetMgoConn()
@@ -696,7 +691,7 @@ func (i *Front) SyncMarked() {
 	i.ServeJson(map[string]interface{}{"msg": msg, "flag": success})
 }
 
-//将marked表中ck_data:2更新为0
+// 将marked表中ck_data:2更新为0
 func (i *Front) MarkedInit() {
 	set := map[string]interface{}{
 		"$set": map[string]interface{}{
@@ -707,17 +702,17 @@ func (i *Front) MarkedInit() {
 	i.ServeJson(map[string]interface{}{"success": b})
 }
 
-//标注完成
+// 标注完成
 func (i *Front) FinishCheck() {
 	i.Render("finish.html")
 }
 
-//错误页面
+// 错误页面
 func (i *Front) ErrCheck() {
 	i.Render("err.html")
 }
 
-//统计抽查
+// 统计抽查
 func (i *Front) Tj() error {
 	comm := map[string]map[string]int{}
 	comm_win := map[string]map[string]int{}
@@ -776,13 +771,13 @@ func (i *Front) Tj() error {
 	return i.Render("tj.html", &i.T)
 }
 
-//标错列表
+// 标错列表
 func (i *Front) Elist() error {
 	attrname := i.GetString("attrname")
 	common := util.Config.Biaozhu["common"]
 	elist, _ := util.MgoM.Find(util.Config.Totablel, `{"ck_`+attrname+`":"0"}`, `{"_id":1}`, `{"_id":1}`, false, -1, -1)
 	for _, v := range *elist {
-		v["_id"] = mgo.BsonIdToSId(v["_id"])
+		v["_id"] = mongodb.BsonIdToSId(v["_id"])
 	}
 	i.T["elist"] = *elist
 	i.T["attrname"] = attrname
@@ -1087,10 +1082,10 @@ func (i *Front) ReviewNext() {
 	defer qu.Catch()
 	id := i.GetString("id")
 	coll := i.GetSession("coll").(string)
-	q := bson.M{"_id": bson.M{"$gt": mgo.StringTOBsonId(id)}}
+	q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(id)}}
 	info, b := util.MgoM.Find(coll, q, bson.M{"_id": 1}, bson.M{"_id": 1}, false, 0, 1)
 	if b && len(*info) > 0 {
-		nxid := mgo.BsonIdToSId((*info)[0]["_id"])
+		nxid := mongodb.BsonIdToSId((*info)[0]["_id"])
 		i.ServeJson(bson.M{"id": nxid, "exists": true})
 	} else {
 		i.ServeJson(bson.M{"exists": false})

+ 3 - 3
src/front/user.go

@@ -1,11 +1,11 @@
 package front
 
 import (
-	qu "qfw/util"
-	"util"
+	"dataCheck/util"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 )
 
-//登录
+// 登录
 func (i *Front) Login() error {
 	if i.Method() == "GET" {
 		return i.Render("login.html")

+ 6 - 11
src/main.go

@@ -2,19 +2,14 @@
 package main
 
 import (
-	//"crypto/md5"
-	//"encoding/hex"
-	_ "filter"
-	"front"
+	_ "dataCheck/filter"
+	"dataCheck/front"
+	"dataCheck/util"
+	"github.com/go-xweb/xweb"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
 	"log"
-
-	"qfw/util/redis"
 	"time"
-
-	qu "qfw/util"
-	"util"
-
-	"github.com/go-xweb/xweb"
 )
 
 // func MD5(appid, t, secret string) string {

+ 12 - 11
src/util/config.go

@@ -4,12 +4,13 @@
 package util
 
 import (
-	mgo "mongodb"
-	qu "qfw/util"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
 	"sort"
 )
 
-//系统配置
+// 系统配置
 type config struct {
 	Port          string                 `json:"port"`
 	Dbaddress     string                 `json:"dbaddress"`
@@ -35,14 +36,14 @@ const BIDDINGSTARTID = "5a862f0640d2d9bbe88e3cec"
 
 var (
 	Config   config
-	MgoB     *mgo.MongodbSim //bidding
+	MgoB     *mongodb.MongodbSim //bidding
 	BidColl1 string
 	BidColl2 string
-	MgoE     *mgo.MongodbSim //extract
+	MgoE     *mongodb.MongodbSim //extract
 	ExtColl1 string
 	ExtColl2 string
-	MgoM     *mgo.MongodbSim //标注
-	Es       *Elastic        //es
+	MgoM     *mongodb.MongodbSim //标注
+	Es       *elastic.Elastic    //es
 	Index    string
 	Itype    string
 	//
@@ -56,7 +57,7 @@ var (
 
 func InitConfig() {
 	//标注
-	MgoM = &mgo.MongodbSim{
+	MgoM = &mongodb.MongodbSim{
 		MongodbAddr: Config.Dbaddress,
 		DbName:      Config.Dbname,
 		Size:        Config.Size,
@@ -67,7 +68,7 @@ func InitConfig() {
 	bid := Config.Bidding
 	BidColl1 = qu.ObjToString(bid["coll1"])
 	BidColl2 = qu.ObjToString(bid["coll2"])
-	MgoB = &mgo.MongodbSim{
+	MgoB = &mongodb.MongodbSim{
 		MongodbAddr: qu.ObjToString(bid["addr"]),
 		DbName:      qu.ObjToString(bid["db"]),
 		Size:        qu.IntAll(bid["size"]),
@@ -80,7 +81,7 @@ func InitConfig() {
 	ext := Config.Extract
 	ExtColl1 = qu.ObjToString(ext["coll1"])
 	ExtColl2 = qu.ObjToString(ext["coll2"])
-	MgoE = &mgo.MongodbSim{
+	MgoE = &mongodb.MongodbSim{
 		MongodbAddr: qu.ObjToString(ext["addr"]),
 		DbName:      qu.ObjToString(ext["db"]),
 		Size:        qu.IntAll(ext["size"]),
@@ -91,7 +92,7 @@ func InitConfig() {
 	es := Config.Elas
 	Index = qu.ObjToString(es["index"])
 	Itype = qu.ObjToString(es["itype"])
-	Es = &Elastic{
+	Es = &elastic.Elastic{
 		S_esurl:  qu.ObjToString(es["addr"]),
 		I_size:   qu.IntAll(es["pool"]),
 		Username: qu.ObjToString(es["user"]),

+ 0 - 423
src/util/elasticSim.go

@@ -1,423 +0,0 @@
-package util
-
-import (
-	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
-	es "gopkg.in/olivere/elastic.v7"
-	"log"
-	"qfw/util"
-	"runtime"
-	"strings"
-	"sync"
-	"time"
-)
-
-type Elastic struct {
-	S_esurl      string
-	I_size       int
-	Addrs        []string
-	Pool         chan *es.Client
-	lastTime     int64
-	lastTimeLock sync.Mutex
-	ntimeout     int
-	Username     string
-	Password     string
-}
-
-func (e *Elastic) InitElasticSize() {
-	e.Pool = make(chan *es.Client, e.I_size)
-	for _, s := range strings.Split(e.S_esurl, ",") {
-		e.Addrs = append(e.Addrs, s)
-	}
-	log.Println(e.Password, e.Username)
-	for i := 0; i < e.I_size; i++ {
-		client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
-		e.Pool <- client
-	}
-}
-
-//关闭连接
-func (e *Elastic) DestoryEsConn(client *es.Client) {
-	select {
-	case e.Pool <- client:
-		break
-	case <-time.After(time.Second * 1):
-		if client != nil {
-			client.Stop()
-		}
-		client = nil
-	}
-}
-
-func (e *Elastic) GetEsConn() *es.Client {
-	select {
-	case c := <-e.Pool:
-		if c == nil || !c.IsRunning() {
-			log.Println("new esclient.", len(e.Pool))
-			client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
-				es.SetSniff(false))
-			if err == nil && client.IsRunning() {
-				return client
-			}
-		}
-		return c
-	case <-time.After(time.Second * 4):
-		//超时
-		e.ntimeout++
-		e.lastTimeLock.Lock()
-		defer e.lastTimeLock.Unlock()
-		//12秒后允许创建链接
-		c := time.Now().Unix() - e.lastTime
-		if c > 12 {
-			e.lastTime = time.Now().Unix()
-			log.Println("add client..", len(e.Pool))
-			c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
-			go func() {
-				for i := 0; i < 2; i++ {
-					client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetSniff(false))
-					e.Pool <- client
-				}
-			}()
-			return c
-		}
-		return nil
-	}
-}
-
-func (e *Elastic) Get(index, query string) *[]map[string]interface{} {
-	client := e.GetEsConn()
-	defer func() {
-		go e.DestoryEsConn(client)
-	}()
-	var res []map[string]interface{}
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
-		if err != nil {
-			log.Println("从ES查询出错", err.Error())
-			return nil
-		}
-		if searchResult.Hits != nil {
-			resNum := len(searchResult.Hits.Hits)
-			if resNum < 5000 {
-				res = make([]map[string]interface{}, resNum)
-				for i, hit := range searchResult.Hits.Hits {
-					parseErr := json.Unmarshal(hit.Source, &res[i])
-					if parseErr == nil && hit.Highlight != nil && res[i] != nil {
-						res[i]["highlight"] = map[string][]string(hit.Highlight)
-					}
-				}
-			} else {
-				log.Println("查询结果太多,查询到:", resNum, "条")
-			}
-		}
-	}
-	return &res
-}
-
-//关闭elastic
-func (e *Elastic) Close() {
-	for i := 0; i < e.I_size; i++ {
-		cli := <-e.Pool
-		cli.Stop()
-		cli = nil
-	}
-	e.Pool = nil
-	e = nil
-}
-
-//获取连接
-//func (e *Elastic) GetEsConn() (c *es.Client) {
-//	defer util.Catch()
-//	select {
-//	case c = <-e.Pool:
-//		if c == nil || !c.IsRunning() {
-//			client, err := es.NewClient(es.SetURL(addrs...),
-//				es.SetMaxRetries(2), es.SetSniff(false))
-//			if err == nil && client.IsRunning() {
-//				return client
-//			}
-//			return nil
-//		}
-//		return
-//	case <-time.After(time.Second * 7):
-//		//超时
-//		ntimeout++
-//		log.Println("timeout times:", ntimeout)
-//		return nil
-//	}
-//}
-
-func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	if client != nil {
-		req := client.Bulk()
-		for _, v := range obj {
-			//if isDelBefore {
-			//	req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
-			//}
-			id := util.ObjToString(v["_id"])
-			delete(v, "_id")
-			req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(v))
-		}
-		_, err := req.Do(context.Background())
-		if err != nil {
-			log.Println("批量保存到ES出错", err.Error())
-		}
-	}
-}
-
-//根据id删除索引对象
-func (e *Elastic) DelById(index, itype, id string) bool {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	b := false
-	if client != nil {
-		var err error
-		_, err = client.Delete().Index(index).Type(itype).Id(id).Do(context.Background())
-		if err != nil {
-			log.Println("更新检索出错:", err.Error())
-		} else {
-			b = true
-		}
-	}
-	return b
-}
-
-func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	var res []map[string]interface{}
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
-		if err != nil {
-			log.Println("从ES查询出错", err.Error())
-			return nil
-		}
-
-		if searchResult.Hits != nil {
-			resNum := len(searchResult.Hits.Hits)
-			util.Debug(resNum)
-			res = make([]map[string]interface{}, resNum)
-			for i, hit := range searchResult.Hits.Hits {
-				json.Unmarshal(hit.Source, &res[i])
-			}
-		}
-	}
-	return &res
-}
-
-//func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
-//	client := e.GetEsConn()
-//	defer e.DestoryEsConn(client)
-//	if client != nil {
-//		defer func() {
-//			if r := recover(); r != nil {
-//				log.Println("[E]", r)
-//				for skip := 1; ; skip++ {
-//					_, file, line, ok := runtime.Caller(skip)
-//					if !ok {
-//						break
-//					}
-//					go log.Printf("%v,%v\n", file, line)
-//				}
-//			}
-//		}()
-//		query := `{"query":{"term":{"_id":"` + id + `"}}`
-//		if len(fields) > 0 {
-//			query = query + `,"_source":[` + fields + `]`
-//		}
-//		query = query + "}"
-//		searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
-//		if err != nil {
-//			log.Println("从ES查询出错", err.Error())
-//			return nil
-//		}
-//		var res map[string]interface{}
-//		if searchResult.Hits != nil {
-//			resNum := len(searchResult.Hits.Hits)
-//			if resNum == 1 {
-//				res = make(map[string]interface{})
-//				for _, hit := range searchResult.Hits.Hits {
-//					json.Unmarshal(*hit.Source., &res)
-//				}
-//				return &res
-//			}
-//		}
-//	}
-//	return nil
-//}
-
-func (e *Elastic) Count(index, itype string, query interface{}) int64 {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	if client != nil {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Println("[E]", r)
-				for skip := 1; ; skip++ {
-					_, file, line, ok := runtime.Caller(skip)
-					if !ok {
-						break
-					}
-					go log.Printf("%v,%v\n", file, line)
-				}
-			}
-		}()
-		var qq es.Query
-		if qi, ok2 := query.(es.Query); ok2 {
-			qq = qi
-		}
-		n, err := client.Count(index).Query(qq).Do(context.Background())
-		if err != nil {
-			log.Println("统计出错", err.Error())
-		}
-
-		return n
-	}
-	return 0
-}
-
-//更新一个字段
-//func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
-//	client := e.GetEsConn()
-//	defer e.DestoryEsConn(client)
-//	if client != nil {
-//		defer func() {
-//			if r := recover(); r != nil {
-//				log.Println("[E]", r)
-//				for skip := 1; ; skip++ {
-//					_, file, line, ok := runtime.Caller(skip)
-//					if !ok {
-//						break
-//					}
-//					go log.Printf("%v,%v\n", file, line)
-//				}
-//			}
-//		}()
-//		for _, data := range update {
-//			id := data["id"]
-//			updateStr := data["updateStr"]
-//			if id != "" && updateStr != "" {
-//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
-//				if err != nil {
-//					log.Println("更新检索出错:", err.Error())
-//				}
-//			} else {
-//				log.Println("数据错误")
-//			}
-//		}
-//	}
-//}
-
-//更新多个字段
-//func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
-//	client := e.GetEsConn()
-//	defer e.DestoryEsConn(client)
-//	if client != nil {
-//		defer func() {
-//			if r := recover(); r != nil {
-//				log.Println("[E]", r)
-//				for skip := 1; ; skip++ {
-//					_, file, line, ok := runtime.Caller(skip)
-//					if !ok {
-//						break
-//					}
-//					go log.Printf("%v,%v\n", file, line)
-//				}
-//			}
-//		}()
-//		for _, arr := range arrs {
-//			id := arr[0]["id"].(string)
-//			update := arr[1]["update"].([]string)
-//			for _, str := range update {
-//				_, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
-//				if err != nil {
-//					log.Println("更新检索出错:", err.Error())
-//				}
-//			}
-//		}
-//	}
-//}
-
-// UpdateBulk 批量修改文档
-func (e *Elastic) UpdateBulk(index, itype string, docs ...[]map[string]interface{}) {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	bulkService := client.Bulk().Index(index).Refresh("true")
-	bulkService.Type(itype)
-	for _, d := range docs {
-		id := d[0]["_id"].(string)
-		doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
-		bulkService.Add(doc)
-	}
-	_, err := bulkService.Do(context.Background())
-	if err != nil {
-		fmt.Printf("UpdateBulk all success err is %v\n", err)
-	}
-	//if len(res.Failed()) > 0 {
-	//	fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
-	//}
-}
-
-// UpsertBulk 批量修改文档(不存在则插入)
-func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	bulkService := client.Bulk().Index(index).Refresh("true")
-	bulkService.Type("bidding")
-	for i := range ids {
-		doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
-		bulkService.Add(doc)
-	}
-	res, err := bulkService.Do(context.Background())
-	if err != nil {
-		return err
-	}
-	if len(res.Failed()) > 0 {
-		return errors.New(res.Failed()[0].Error.Reason)
-	}
-	return nil
-}
-
-// 批量删除
-func (e *Elastic) DeleteBulk(index string, ids []string) {
-	client := e.GetEsConn()
-	defer e.DestoryEsConn(client)
-	bulkService := client.Bulk().Index(index).Refresh("true")
-	bulkService.Type("bidding")
-	for i := range ids {
-		req := es.NewBulkDeleteRequest().Id(ids[i])
-		bulkService.Add(req)
-	}
-	res, err := bulkService.Do(context.Background())
-	if err != nil {
-		fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
-	}
-}

+ 0 - 61
src/util/influxdb.go

@@ -1,61 +0,0 @@
-package util
-
-import (
-	"github.com/influxdata/influxdb-client"
-	"time"
-)
-
-var influxdburl string
-
-//
-func InitInfluxdb(url string) {
-	influxdburl = url
-}
-
-//
-func InsertInto(dbname string, measurement string, tags []influxdb.Tag, fields map[string]interface{}, timestamp time.Time, rp /*数据保留策略,默认autogen*/ string) error {
-	client, err := influxdb.NewClient(influxdburl)
-	if err != nil {
-		return err
-	}
-	writer := client.Writer()
-	writer.Database = dbname
-	writer.RetentionPolicy = rp
-	pt := influxdb.Point{
-		Name:   measurement,
-		Tags:   tags,
-		Fields: fields,
-		Time:   timestamp,
-	}
-
-	if _, err := pt.WriteTo(writer); err != nil {
-		return err
-	}
-	return nil
-}
-
-//查询接口
-func Search(dbname string, fn func(row influxdb.Row) error, query string, queryoption ...influxdb.QueryOption) error {
-	client, err := influxdb.NewClient(influxdburl)
-	if err != nil {
-		return err
-	}
-
-	querier := client.Querier()
-	querier.Database = dbname
-	cur, err := querier.Select(query, queryoption...)
-	if err != nil {
-		return err
-	}
-	defer cur.Close()
-	result, err := cur.NextSet()
-	if err != nil {
-		return err
-	}
-	series, err := result.NextSeries()
-	if err != nil {
-		return err
-	}
-	influxdb.EachRow(series, fn)
-	return nil
-}

+ 6 - 10
src/util/util.go

@@ -3,18 +3,15 @@ package util
 
 import (
 	"fmt"
+	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 	"io/ioutil"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"math/rand"
 	"os"
-	"qfw/util"
 	"regexp"
 	"strconv"
 	"strings"
 	"time"
-
-	//"unicode/utf8"
-
-	"github.com/aliyun/aliyun-oss-go-sdk/oss"
 )
 
 var regNum *regexp.Regexp
@@ -55,7 +52,6 @@ var (
 	numberReg = regexp.MustCompile("[一二三四五六七八九十0-9]+")
 )
 
-//
 func ToNumber(value string, defaultNum int) int {
 	value = numberReg.FindString(value)
 	if value == "" {
@@ -71,7 +67,7 @@ func ToNumber(value string, defaultNum int) int {
 	return defaultNum
 }
 
-//中文数字转换
+// 中文数字转换
 func ChineseNumberToInt(value string) int {
 	result, temp, count := 0, 1, 0
 	cnArr := []string{"一", "二", "三", "四", "五", "六", "七", "八", "九"}
@@ -179,7 +175,7 @@ func PackageNumberConvert(val string) string {
 	return result
 }
 
-//罗马数字转换
+// 罗马数字转换
 func romeNumberToInt(value string) int {
 	for k, v := range []rune("ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ") {
 		if value == string(v) {
@@ -189,7 +185,7 @@ func romeNumberToInt(value string) int {
 	return -1
 }
 
-//带圆圈的数字换行
+// 带圆圈的数字换行
 func circleNumberToInt(value string) int {
 	for k, v := range []rune("①②③④⑤⑥⑦⑧⑨⑩⑪⑫⑬⑭⑮⑯⑰⑱⑲⑳") {
 		if value == string(v) {
@@ -199,7 +195,7 @@ func circleNumberToInt(value string) int {
 	return -1
 }
 
-//oss
+// oss
 var (
 	ossEndpoint        = "oss-cn-beijing.aliyuncs.com" //正式环境用:oss-cn-beijing-internal.aliyuncs.com 测试:oss-cn-beijing.aliyuncs.com
 	ossAccessKeyId     = "LTAI4G5x9aoZx8dDamQ7vfZi"