123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- package main
- import (
- "encoding/json"
- "log"
- mu "mfw/util"
- "net"
- "os"
- qu "qfw/util"
- "sync"
- "time"
- )
- var (
- sysconfig map[string]interface{} //配置文件
- mgo, qy_mgo *MongodbSim //mongodb操作对象
- udpclient mu.UdpClient //udp对象
- udplock, dataLock sync.Mutex //udp锁
- coll_name, qy_coll_name string //表名
- isTest bool //是否测试
- Ext_Type, Ext_From map[string]interface{} //抽取来源,方式分
- buyer_score, s_winner_score map[string]interface{}
- budget_score, bidamount_score map[string]interface{}
- projectname_score, projectcode_score map[string]interface{}
- )
- //mgo-配置等
- func initMgo() {
- isTest = sysconfig["isTest"].(bool)
- mconf := sysconfig["mongodb"].(map[string]interface{})
- log.Println(mconf)
- mgo = &MongodbSim{
- MongodbAddr: mconf["addrName"].(string),
- DbName: mconf["dbName"].(string),
- Size: qu.IntAllDef(mconf["pool"], 10),
- }
- mgo.InitPool()
- qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{})
- qy_mgo = &MongodbSim{
- MongodbAddr: qy_mconf["qy_addrName"].(string),
- DbName: qy_mconf["qy_dbName"].(string),
- Size: qu.IntAllDef(qy_mconf["pool"], 10),
- UserName: qy_mconf["qy_username"].(string),
- Password: qy_mconf["qy_password"].(string),
- }
- qy_mgo.InitPool()
- coll_name = mconf["collName"].(string)
- qy_coll_name = qy_mconf["qy_collName"].(string)
- }
- //初始化打分
- func initScore() {
- Ext_Type = sysconfig["ext_type"].(map[string]interface{})
- Ext_From = sysconfig["ext_from"].(map[string]interface{})
- buyer_score = sysconfig["buyer_score"].(map[string]interface{})
- s_winner_score = sysconfig["s_winner_score"].(map[string]interface{})
- budget_score = sysconfig["budget_score"].(map[string]interface{})
- bidamount_score = sysconfig["bidamount_score"].(map[string]interface{})
- projectname_score = sysconfig["projectname_score"].(map[string]interface{})
- projectcode_score = sysconfig["projectcode_score"].(map[string]interface{})
- }
- //初始化
- func init() {
- qu.ReadConfig(&sysconfig) //加载配置文件
- initMgo()
- initScore()
- }
- func mainT() {
- updport := sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- time.Sleep(99999 * time.Hour)
- }
- //调试流程
- func main() {
- sid := "1f0000000000000000000000"
- eid := "9f0000000000000000000000"
- log.Println(sid, "---", eid)
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- os.Exit(0)
- }
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- startFieldScoreTask(mapinfo)
- }
- //打分流程-方法
- func startFieldScoreTask(mapInfo map[string]interface{}) {
- log.Println("开始字段规则链...评分流程")
- defer qu.Catch()
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:", q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- updateFieldScore, total := [][]map[string]interface{}{}, 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Println("当前数量:", total)
- }
- //验证初始分字段
- source := *qu.ObjToMap(tmp["field_source"])
- f_s := dealWithFieldSourceScore(source)
- //更新集合
- update_dict := make(map[string]interface{}, 0)
- subtype := qu.ObjToString(tmp["subtype"])
- buyer_s := buyerFieldScore(tmp, f_s["buyer"])
- update_dict["buyer"] = buyer_s
- budget_s := budgetFieldScore(tmp, f_s["budget"])
- update_dict["budget"] = budget_s
- projectname_s := projectnameFieldScore(tmp, f_s["projectname"])
- update_dict["projectname"] = projectname_s
- projectcode_s := projectcodeFieldScore(tmp, f_s["projectcode"])
- update_dict["projectcode"] = projectcode_s
- if subtype == "中标" || subtype == "成交" || subtype == "合同" {
- s_winner_s := winnerFieldScore(tmp, f_s["s_winner"])
- update_dict["s_winner"] = s_winner_s
- bidamount_s := bidamountFieldScore(tmp, f_s["bidamount"])
- update_dict["bidamount"] = bidamount_s
- }
- updateFieldScore = append(updateFieldScore, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "field_score": update_dict,
- },
- },
- })
- if len(updateFieldScore) >= 200 {
- mgo.UpSertBulk(coll_name, updateFieldScore...)
- updateFieldScore = [][]map[string]interface{}{}
- }
- tmp = make(map[string]interface{})
- }
- if len(updateFieldScore) > 0 {
- mgo.UpSertBulk(coll_name, updateFieldScore...)
- }
- log.Println("field score is over - 总计数量", total)
- }
- //udp监听
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case mu.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- if sid == "" || eid == "" {
- log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
- } else {
- udpinfo, _ := mapInfo["key"].(string)
- go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra)
- udplock.Lock()
- startFieldScoreTask(mapInfo)
- udplock.Unlock()
- }
- }
- case mu.OP_NOOP: //下个节点回应
- str := string(data)
- log.Println("节点回应:", str)
- }
- }
|