package main import ( "encoding/json" "log" mu "mfw/util" "net" "os" qu "qfw/util" "sync" "time" ) var ( sysconfig map[string]interface{} //配置文件 mgo, qy_mgo *MongodbSim //mongodb操作对象 udpclient mu.UdpClient //udp对象 udplock, dataLock sync.Mutex //udp锁 coll_name, qy_coll_name string //表名 isTest bool //是否测试 Ext_Type, Ext_From map[string]interface{} //抽取来源,方式分 buyer_score, s_winner_score map[string]interface{} budget_score, bidamount_score map[string]interface{} projectname_score, projectcode_score map[string]interface{} ) //mgo-配置等 func initMgo() { isTest = sysconfig["isTest"].(bool) mconf := sysconfig["mongodb"].(map[string]interface{}) log.Println(mconf) mgo = &MongodbSim{ MongodbAddr: mconf["addrName"].(string), DbName: mconf["dbName"].(string), Size: qu.IntAllDef(mconf["pool"], 10), } mgo.InitPool() qy_mconf := sysconfig["qy_mongodb"].(map[string]interface{}) qy_mgo = &MongodbSim{ MongodbAddr: qy_mconf["qy_addrName"].(string), DbName: qy_mconf["qy_dbName"].(string), Size: qu.IntAllDef(qy_mconf["pool"], 10), UserName: qy_mconf["qy_username"].(string), Password: qy_mconf["qy_password"].(string), } qy_mgo.InitPool() coll_name = mconf["collName"].(string) qy_coll_name = qy_mconf["qy_collName"].(string) } //初始化打分 func initScore() { Ext_Type = sysconfig["ext_type"].(map[string]interface{}) Ext_From = sysconfig["ext_from"].(map[string]interface{}) buyer_score = sysconfig["buyer_score"].(map[string]interface{}) s_winner_score = sysconfig["s_winner_score"].(map[string]interface{}) budget_score = sysconfig["budget_score"].(map[string]interface{}) bidamount_score = sysconfig["bidamount_score"].(map[string]interface{}) projectname_score = sysconfig["projectname_score"].(map[string]interface{}) projectcode_score = sysconfig["projectcode_score"].(map[string]interface{}) } //初始化 func init() { qu.ReadConfig(&sysconfig) //加载配置文件 initMgo() initScore() } func mainT() { updport := sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } //调试流程 func main() { sid := "1f0000000000000000000000" eid := "9f0000000000000000000000" log.Println(sid, "---", eid) mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid startFieldScoreTask(mapinfo) } //打分流程-方法 func startFieldScoreTask(mapInfo map[string]interface{}) { log.Println("开始字段规则链...评分流程") defer qu.Catch() q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("查询条件:", q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter() updateFieldScore, total := [][]map[string]interface{}{}, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Println("当前数量:", total) } //验证初始分字段 source := *qu.ObjToMap(tmp["field_source"]) f_s := dealWithFieldSourceScore(source) //更新集合 update_dict := make(map[string]interface{}, 0) subtype := qu.ObjToString(tmp["subtype"]) buyer_s := buyerFieldScore(tmp, f_s["buyer"]) update_dict["buyer"] = buyer_s budget_s := budgetFieldScore(tmp, f_s["budget"]) update_dict["budget"] = budget_s projectname_s := projectnameFieldScore(tmp, f_s["projectname"]) update_dict["projectname"] = projectname_s projectcode_s := projectcodeFieldScore(tmp, f_s["projectcode"]) update_dict["projectcode"] = projectcode_s if subtype == "中标" || subtype == "成交" || subtype == "合同" { s_winner_s := winnerFieldScore(tmp, f_s["s_winner"]) update_dict["s_winner"] = s_winner_s bidamount_s := bidamountFieldScore(tmp, f_s["bidamount"]) update_dict["bidamount"] = bidamount_s } updateFieldScore = append(updateFieldScore, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "field_score": update_dict, }, }, }) if len(updateFieldScore) >= 200 { mgo.UpSertBulk(coll_name, updateFieldScore...) updateFieldScore = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) } if len(updateFieldScore) > 0 { mgo.UpSertBulk(coll_name, updateFieldScore...) } log.Println("field score is over - 总计数量", total) } //udp监听 func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case mu.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) if sid == "" || eid == "" { log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid) } else { udpinfo, _ := mapInfo["key"].(string) go udpclient.WriteUdp([]byte(udpinfo), mu.OP_NOOP, ra) udplock.Lock() startFieldScoreTask(mapInfo) udplock.Unlock() } } case mu.OP_NOOP: //下个节点回应 str := string(data) log.Println("节点回应:", str) } }