main.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. qu "qfw/util"
  9. "sync"
  10. "time"
  11. )
  12. type Province struct {
  13. P_Name string
  14. }
  15. type City struct {
  16. P_Name string
  17. C_Name string
  18. }
  19. type District struct {
  20. P_Name string
  21. C_Name string
  22. D_Name string
  23. }
  24. var (
  25. Sysconfig map[string]interface{} //配置文件
  26. mconf map[string]interface{} //mongodb配置信息
  27. data_mgo,qy_mgo *MongodbSim //mongodb操作对象
  28. udpclient mu.UdpClient //udp对象
  29. nextNode []map[string]interface{} //节点信息
  30. coll_name,qy_coll_name,jy_coll_name string //表名
  31. check_lock sync.Mutex //更新锁
  32. check_thread int //线程数
  33. UpdateTask *updateInfo //更新池
  34. ProvinceDict map[string][]Province //省份-map
  35. CityDict map[string][]City //城市-map
  36. DistrictDict map[string][]District //区县-map
  37. )
  38. //初始化城市
  39. func initCheckCity() {
  40. //初始化-城市配置
  41. ProvinceDict = make(map[string][]Province,0)
  42. CityDict = make(map[string][]City,0)
  43. DistrictDict = make(map[string][]District,0)
  44. q := map[string]interface{}{
  45. "town_code":map[string]interface{}{
  46. "$exists":0,
  47. },
  48. }
  49. sess := qy_mgo.GetMgoConn()
  50. defer qy_mgo.DestoryMongoConn(sess)
  51. it := sess.DB(qy_mgo.DbName).C(jy_coll_name).Find(&q).Iter()
  52. total := 0
  53. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  54. if total%1000 == 0 {
  55. log.Println("当前数量:", total)
  56. }
  57. district_code := qu.IntAll(tmp["district_code"])
  58. city_code := qu.IntAll(tmp["city_code"])
  59. if district_code > 0 {
  60. province := qu.ObjToString(tmp["province"])
  61. city := qu.ObjToString(tmp["city"])
  62. district := qu.ObjToString(tmp["district"])
  63. data := District{province,city,district}
  64. if DistrictDict[district]==nil {
  65. DistrictDict[district] = []District{data}
  66. }else {
  67. arr := DistrictDict[district]
  68. arr = append(arr,data)
  69. DistrictDict[district] = arr
  70. }
  71. }else {
  72. if city_code>0 {
  73. province := qu.ObjToString(tmp["province"])
  74. city := qu.ObjToString(tmp["city"])
  75. data := City{province,city}
  76. if CityDict[city]==nil {
  77. CityDict[city] = []City{data}
  78. }else {
  79. arr := CityDict[city]
  80. arr = append(arr,data)
  81. CityDict[city] = arr
  82. }
  83. }else {
  84. province := qu.ObjToString(tmp["province"])
  85. data := Province{province}
  86. if ProvinceDict[province]==nil {
  87. ProvinceDict[province] = []Province{data}
  88. }else {
  89. arr := ProvinceDict[province]
  90. arr = append(arr,data)
  91. ProvinceDict[province] = arr
  92. }
  93. }
  94. }
  95. tmp = make(map[string]interface{})
  96. }
  97. log.Println(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d",len(ProvinceDict),len(CityDict),len(DistrictDict)))
  98. }
  99. //mgo-配置等
  100. func initMgo() {
  101. mconf := Sysconfig["mongodb"].(map[string]interface{})
  102. log.Println(mconf)
  103. data_mgo = &MongodbSim{
  104. MongodbAddr: mconf["addrName"].(string),
  105. DbName: mconf["dbName"].(string),
  106. Size: qu.IntAllDef(mconf["pool"], 10),
  107. }
  108. data_mgo.InitPool()
  109. qy_mconf := Sysconfig["qy_mongodb"].(map[string]interface{})
  110. qy_mgo = &MongodbSim{
  111. MongodbAddr: qy_mconf["qy_addrName"].(string),
  112. DbName: qy_mconf["qy_dbName"].(string),
  113. Size: qu.IntAllDef(qy_mconf["pool"], 10),
  114. UserName: qy_mconf["qy_username"].(string),
  115. Password: qy_mconf["qy_password"].(string),
  116. }
  117. qy_mgo.InitPool()
  118. coll_name = mconf["collName"].(string)
  119. qy_coll_name = qy_mconf["qy_collName"].(string)
  120. jy_coll_name = Sysconfig["jy_collName"].(string)
  121. nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  122. check_thread = qu.IntAll(Sysconfig["check_thread"])
  123. log.Println("mgo 等配置,加载完毕...")
  124. }
  125. //初始化
  126. func init() {
  127. qu.ReadConfig(&Sysconfig) //加载配置文件
  128. log.Println(Sysconfig)
  129. if len(Sysconfig) == 0 {
  130. log.Fatal("读取配置文件失败", Sysconfig)
  131. }
  132. initMgo() //初始化mgo
  133. initCheckCity() //初始化城市
  134. //更新池
  135. UpdateTask = newUpdatePool()
  136. go UpdateTask.updateData()
  137. }
  138. func mainT() {
  139. updport := Sysconfig["udpport"].(string)
  140. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  141. udpclient.Listen(processUdpMsg)
  142. log.Println("Udp服务监听", updport)
  143. time.Sleep(99999 * time.Hour)
  144. }
  145. //临时校验
  146. func main() {
  147. sid := "618dc3b045a326c6c3f2f230"
  148. eid := "618e137545a326c6c3f44195"
  149. startCheckData(sid,eid)
  150. time.Sleep(99999 * time.Hour)
  151. }
  152. //开始审查数据
  153. func startCheckData(sid, eid string) {
  154. log.Println("开始审查数据...")
  155. defer qu.Catch()
  156. q := map[string]interface{}{
  157. "_id": map[string]interface{}{
  158. "$gt": StringTOBsonId(sid),
  159. "$lte": StringTOBsonId(eid),
  160. },
  161. }
  162. log.Println("查询条件:",q)
  163. check_pool := make(chan bool, check_thread)
  164. check_wg := &sync.WaitGroup{}
  165. sess := data_mgo.GetMgoConn()
  166. defer data_mgo.DestoryMongoConn(sess)
  167. it := sess.DB(data_mgo.DbName).C(coll_name).Find(&q).Iter()
  168. total,isRepair := 0,0
  169. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  170. if total%10000 == 0 {
  171. log.Println("当前数量:", total,isRepair,tmp["_id"])
  172. }
  173. update_id := map[string]interface{}{"_id":tmp["_id"]}
  174. check_pool <- true
  175. check_wg.Add(1)
  176. go func(tmp map[string]interface{},update_id map[string]interface{}) {
  177. defer func() {
  178. <-check_pool
  179. check_wg.Done()
  180. }()
  181. //更新-
  182. update_check := make(map[string]interface{},0)
  183. //审查-城市
  184. getCheckDataCity(tmp,&update_check)
  185. //审查-中标金额
  186. getCheckDataBidamount(tmp,&update_check)
  187. if len(update_check)>0 {
  188. isRepair++
  189. UpdateTask.updatePool <- []map[string]interface{}{
  190. update_id,
  191. map[string]interface{}{
  192. "$set": update_check,
  193. },
  194. }
  195. }
  196. }(tmp,update_id)
  197. tmp = make(map[string]interface{})
  198. }
  199. check_wg.Wait()
  200. log.Println("check is over - 总计数量",total,isRepair)
  201. }
  202. //udp监听
  203. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  204. switch act {
  205. case mu.OP_TYPE_DATA:
  206. var rep map[string]interface{}
  207. err := json.Unmarshal(data, &rep)
  208. if err != nil {
  209. log.Println(err)
  210. } else {
  211. sid, _ := rep["gtid"].(string)
  212. eid, _ := rep["lteid"].(string)
  213. if sid == "" || eid == "" {
  214. log.Println("err", "sid=", sid, ",eid=", eid)
  215. return
  216. } else {
  217. go udpclient.WriteUdp(data, mu.OP_NOOP, ra)
  218. log.Println("udp通知id段-审查数据", sid, "~", eid)
  219. startCheckData(sid, eid)
  220. log.Println("udp通知审查数据完成,下节点响应")
  221. for _, m := range nextNode {
  222. by, _ := json.Marshal(map[string]interface{}{
  223. "gtid": sid,
  224. "lteid": eid,
  225. "stype": qu.ObjToString(m["stype"]),
  226. })
  227. err := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  228. IP: net.ParseIP(m["addr"].(string)),
  229. Port: qu.IntAll(m["port"]),
  230. })
  231. if err != nil {
  232. log.Println(err)
  233. }
  234. }
  235. }
  236. }
  237. case mu.OP_NOOP: //下个节点回应
  238. log.Println("下节点回应",string(data))
  239. }
  240. }