main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "net/http"
  10. qu "qfw/util"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. type S_Province struct {
  16. P_Name string
  17. }
  18. type S_City struct {
  19. P_Name string
  20. C_Name string
  21. }
  22. type S_District struct {
  23. P_Name string
  24. C_Name string
  25. D_Name string
  26. }
  27. var (
  28. Sysconfig map[string]interface{} //配置文件
  29. mconf map[string]interface{} //mongodb配置信息
  30. data_mgo, qy_mgo *MongodbSim
  31. bid_mgo *MongodbSim //mongodb操作对象
  32. udpclient mu.UdpClient //udp对象
  33. nextNode []map[string]interface{} //节点信息
  34. coll_name, qy_coll_name, jy_coll_name string
  35. check_lock sync.Mutex //更新锁
  36. check_thread int //线程数
  37. UpdateTask *updateInfo //更新池
  38. S_ProvinceDict map[string][]S_Province //省份-map
  39. S_CityDict map[string][]S_City //城市-map
  40. S_DistrictDict map[string][]S_District //区县-map
  41. //删除字段
  42. unset_dict = map[string]interface{}{"winner": 1, "s_winner": 1, "bidamount": 1, "winnerorder": 1}
  43. udplock, getasklock sync.Mutex
  44. taskList []map[string]interface{}
  45. //监控相关
  46. responselock sync.Mutex
  47. lastNodeResponse int64
  48. )
  49. // 初始化城市
  50. func initCheckCity() {
  51. //初始化-城市配置
  52. S_ProvinceDict = make(map[string][]S_Province, 0)
  53. S_CityDict = make(map[string][]S_City, 0)
  54. S_DistrictDict = make(map[string][]S_District, 0)
  55. q := map[string]interface{}{
  56. "town_code": map[string]interface{}{
  57. "$exists": 0,
  58. },
  59. }
  60. sess := qy_mgo.GetMgoConn()
  61. defer qy_mgo.DestoryMongoConn(sess)
  62. it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
  63. total := 0
  64. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  65. if total%1000 == 0 {
  66. log.Println("当前数量:", total)
  67. }
  68. district_code := qu.IntAll(tmp["district_code"])
  69. city_code := qu.IntAll(tmp["city_code"])
  70. if district_code > 0 {
  71. province := qu.ObjToString(tmp["province"])
  72. city := qu.ObjToString(tmp["city"])
  73. district := qu.ObjToString(tmp["district"])
  74. data := S_District{province, city, district}
  75. if S_DistrictDict[district] == nil {
  76. S_DistrictDict[district] = []S_District{data}
  77. } else {
  78. arr := S_DistrictDict[district]
  79. arr = append(arr, data)
  80. S_DistrictDict[district] = arr
  81. }
  82. } else {
  83. if city_code > 0 {
  84. province := qu.ObjToString(tmp["province"])
  85. city := qu.ObjToString(tmp["city"])
  86. data := S_City{province, city}
  87. if S_CityDict[city] == nil {
  88. S_CityDict[city] = []S_City{data}
  89. } else {
  90. arr := S_CityDict[city]
  91. arr = append(arr, data)
  92. S_CityDict[city] = arr
  93. }
  94. } else {
  95. province := qu.ObjToString(tmp["province"])
  96. data := S_Province{province}
  97. if S_ProvinceDict[province] == nil {
  98. S_ProvinceDict[province] = []S_Province{data}
  99. } else {
  100. arr := S_ProvinceDict[province]
  101. arr = append(arr, data)
  102. S_ProvinceDict[province] = arr
  103. }
  104. }
  105. }
  106. tmp = make(map[string]interface{})
  107. }
  108. log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(S_ProvinceDict), len(S_CityDict), len(S_DistrictDict)))
  109. }
  110. // mgo-配置等
  111. func initMgo() {
  112. mconf = Sysconfig["mongodb"].(map[string]interface{})
  113. log.Println(mconf)
  114. data_mgo = &MongodbSim{
  115. MongodbAddr: mconf["addrName"].(string),
  116. DbName: mconf["dbName"].(string),
  117. Size: qu.IntAllDef(mconf["pool"], 10),
  118. }
  119. data_mgo.InitPool()
  120. qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{})
  121. qy_mgo = &MongodbSim{
  122. MongodbAddr: qy_mconf["qy_addrName"].(string),
  123. DbName: qy_mconf["qy_dbName"].(string),
  124. Size: qu.IntAllDef(qy_mconf["pool"], 10),
  125. UserName: qy_mconf["qy_username"].(string),
  126. Password: qy_mconf["qy_password"].(string),
  127. }
  128. qy_mgo.InitPool()
  129. bid_mgo = &MongodbSim{
  130. MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
  131. DbName: "qfw",
  132. Size: 10,
  133. UserName: "zhengkun",
  134. Password: "zk@123123",
  135. }
  136. bid_mgo.InitPool()
  137. coll_name = mconf["collName"].(string)
  138. qy_coll_name = qy_mconf["qy_collName"].(string)
  139. jy_coll_name = Sysconfig["jy_collName"].(string)
  140. nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  141. check_thread = qu.IntAll(Sysconfig["check_thread"])
  142. log.Println("mgo 等配置,加载完毕...")
  143. }
  144. // 初始化
  145. func init() {
  146. qu.ReadConfig(&Sysconfig) //加载配置文件
  147. log.Println(Sysconfig)
  148. if len(Sysconfig) == 0 {
  149. log.Fatal("读取配置文件失败", Sysconfig)
  150. }
  151. initMgo() //初始化mgo
  152. initCheckCity() //初始化城市
  153. //更新池
  154. UpdateTask = newUpdatePool()
  155. go UpdateTask.updateData()
  156. }
  157. func main() {
  158. lastNodeResponse = time.Now().Unix()
  159. updport := Sysconfig["udpport"].(string)
  160. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  161. udpclient.Listen(processUdpMsg)
  162. log.Println("Udp服务监听", updport)
  163. go getRepeatTask()
  164. go checkMailJob()
  165. go lastUdpJob()
  166. lock := make(chan bool)
  167. <-lock
  168. }
  169. // 开始审查数据
  170. func startCheckData(sid, eid string) {
  171. defer qu.Catch()
  172. q := map[string]interface{}{
  173. "_id": map[string]interface{}{
  174. "$gt": StringTOBsonId(sid),
  175. "$lte": StringTOBsonId(eid),
  176. },
  177. }
  178. check_pool := make(chan bool, check_thread)
  179. check_wg := &sync.WaitGroup{}
  180. sess := data_mgo.GetMgoConn()
  181. defer data_mgo.DestoryMongoConn(sess)
  182. it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
  183. total, isRepair := 0, 0
  184. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  185. if total%10000 == 0 {
  186. log.Println("当前数量:", total, isRepair, tmp["_id"])
  187. }
  188. update_id := map[string]interface{}{"_id": tmp["_id"]}
  189. check_pool <- true
  190. check_wg.Add(1)
  191. go func(tmp map[string]interface{}, update_id map[string]interface{}) {
  192. defer func() {
  193. <-check_pool
  194. check_wg.Done()
  195. }()
  196. //更新-
  197. update_check := make(map[string]interface{}, 0)
  198. //审查-城市-迁移
  199. //getCheckDataCity(tmp, &update_check)
  200. //审查-金额-迁移
  201. //getCheckDataBidamount(tmp, &update_check)
  202. //审查-分类-弃用
  203. //getCheckDataCategory(tmp,&update_check)
  204. //审查-发布时间
  205. getCheckDataPublishtime(tmp, &update_check)
  206. //审查-大模型与抽取
  207. getCheckDataAI(tmp, &update_check)
  208. //最终计算是否清洗
  209. update_dict := make(map[string]interface{}, 0)
  210. if len(update_check) > 0 {
  211. update_dict["$set"] = update_check
  212. }
  213. if len(update_dict) > 0 { //注意事项~更新key不能与删除key同时存在
  214. isRepair++
  215. UpdateTask.updatePool <- []map[string]interface{}{
  216. update_id,
  217. update_dict,
  218. }
  219. }
  220. }(tmp, update_id)
  221. tmp = make(map[string]interface{})
  222. }
  223. check_wg.Wait()
  224. log.Println("data_clean is over ", total, "~", isRepair)
  225. sendNextNode(sid, eid)
  226. }
  227. // udp监听
  228. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  229. switch act {
  230. case mu.OP_TYPE_DATA:
  231. var rep map[string]interface{}
  232. err := json.Unmarshal(data, &rep)
  233. if err != nil {
  234. log.Println(err)
  235. } else {
  236. sid, _ := rep["gtid"].(string)
  237. eid, _ := rep["lteid"].(string)
  238. stype := qu.ObjToString(rep["stype"])
  239. key := qu.ObjToString(rep["key"])
  240. if stype == "monitor" {
  241. log.Println("收到监测......")
  242. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  243. return
  244. }
  245. if sid == "" || eid == "" {
  246. log.Println("err", "sid=", sid, ",eid=", eid)
  247. return
  248. } else {
  249. lastNodeResponse = time.Now().Unix()
  250. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  251. udplock.Lock()
  252. taskList = append(taskList, map[string]interface{}{
  253. "sid": sid,
  254. "eid": eid,
  255. }) //插入任务
  256. log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
  257. udplock.Unlock()
  258. }
  259. }
  260. case mu.OP_NOOP: //下个节点回应
  261. log.Println("下节点回应:", string(data))
  262. udptaskmap.Delete(string(data))
  263. }
  264. }
  265. // 发送下阶段节点~
  266. func sendNextNode(sid string, eid string) {
  267. //更新记录状态
  268. updateProcessUdpIdsInfo(sid, eid)
  269. log.Println("判重任务完成...发送下节点udp...")
  270. for _, to := range nextNode {
  271. key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
  272. by, _ := json.Marshal(map[string]interface{}{
  273. "gtid": sid,
  274. "lteid": eid,
  275. "stype": qu.ObjToString(to["stype"]),
  276. "key": key,
  277. })
  278. addr := &net.UDPAddr{
  279. IP: net.ParseIP(to["addr"].(string)),
  280. Port: qu.IntAll(to["port"]),
  281. }
  282. node := &udpNode{by, addr, time.Now().Unix(), 0}
  283. udptaskmap.Store(key, node)
  284. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  285. }
  286. }
  287. // 更新流程记录id段落
  288. func updateProcessUdpIdsInfo(sid string, eid string) {
  289. query := map[string]interface{}{
  290. "gtid": map[string]interface{}{
  291. "$gte": sid,
  292. },
  293. "lteid": map[string]interface{}{
  294. "$lte": eid,
  295. },
  296. }
  297. task_coll := "bidding_processing_ids"
  298. datas, _ := bid_mgo.Find(task_coll, query, nil, nil)
  299. if len(datas) > 0 {
  300. log.Println("开始更新流程段落记录~~", len(datas), "段")
  301. for _, v := range datas {
  302. up_id := BsonTOStringId(v["_id"])
  303. if up_id != "" {
  304. update := map[string]interface{}{
  305. "$set": map[string]interface{}{
  306. "dataprocess": 4,
  307. "updatetime": time.Now().Unix(),
  308. },
  309. }
  310. bid_mgo.UpdateById(task_coll, up_id, update)
  311. log.Println("流程段落记录~~更新完毕~", update)
  312. }
  313. }
  314. } else {
  315. log.Println("未查询到记录id段落~", query)
  316. }
  317. }
  318. func httpDo(detail string) (e error) {
  319. client := &http.Client{}
  320. req, err := http.NewRequest("POST", "http://127.0.0.1:9991/get",
  321. strings.NewReader("detail="+detail))
  322. if err != nil {
  323. return err
  324. }
  325. req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
  326. resp, err := client.Do(req)
  327. if err != nil {
  328. return err
  329. }
  330. defer resp.Body.Close()
  331. body, err := ioutil.ReadAll(resp.Body)
  332. if err != nil {
  333. return err
  334. }
  335. log.Println("put ", string(body))
  336. return nil
  337. }
  338. // 监听-获取-分发清洗任务
  339. func getRepeatTask() {
  340. for {
  341. if len(taskList) > 0 {
  342. getasklock.Lock()
  343. len_list := len(taskList)
  344. if len_list > 1 {
  345. first_id := qu.ObjToString(taskList[0]["sid"])
  346. end_id := qu.ObjToString(taskList[len_list-1]["eid"])
  347. if first_id != "" && end_id != "" {
  348. taskList = taskList[len_list:]
  349. log.Println("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
  350. startCheckData(first_id, end_id)
  351. } else {
  352. log.Println("合并段落~错误~正常取段落~~~")
  353. mapInfo := taskList[0]
  354. if mapInfo != nil {
  355. taskList = taskList[1:]
  356. log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
  357. sid := qu.ObjToString(mapInfo["sid"])
  358. eid := qu.ObjToString(mapInfo["eid"])
  359. startCheckData(sid, eid)
  360. }
  361. }
  362. } else {
  363. mapInfo := taskList[0]
  364. if mapInfo != nil {
  365. taskList = taskList[1:]
  366. log.Println("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
  367. sid := qu.ObjToString(mapInfo["sid"])
  368. eid := qu.ObjToString(mapInfo["eid"])
  369. startCheckData(sid, eid)
  370. }
  371. }
  372. getasklock.Unlock()
  373. } else {
  374. time.Sleep(10 * time.Second)
  375. }
  376. }
  377. }
  378. func lastUdpJob() {
  379. for {
  380. responselock.Lock()
  381. if time.Now().Unix()-lastNodeResponse >= 1800 {
  382. lastNodeResponse = time.Now().Unix() //重置时间
  383. sendErrMailApi("数据清洗~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入清洗增量流程...相关人员检查..."))
  384. }
  385. responselock.Unlock()
  386. time.Sleep(300 * time.Second)
  387. }
  388. }