init.go 8.0 KB


  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/udp"
  7. "createindex/oss"
  8. "fmt"
  9. "net"
  10. "strings"
  11. "time"
  12. )
  13. var (
  14. Sysconfig map[string]interface{}
  15. biddingMgo *mongodb.MongodbSim
  16. extractMgo *mongodb.MongodbSim
  17. qyxyMgo *mongodb.MongodbSim
  18. projectMgo *mongodb.MongodbSim
  19. standardMgo *mongodb.MongodbSim
  20. Es1 *elastic.Elastic
  21. Es2 *elastic.Elastic
  22. currentColl string
  23. udpclient udp.UdpClient //udp对象
  24. updport string
  25. multiIndex []string
  26. biddingMgoFields []string
  27. biddingEsFields map[string]interface{}
  28. projectinfoFields map[string]interface{}
  29. purchasinglistFields map[string]interface{}
  30. procurementlisFields map[string]interface{}
  31. winnerorderlistFields map[string]interface{}
  32. updateBiddingPool chan []map[string]interface{}
  33. updateExtractPool chan []map[string]interface{}
  34. saveEsPool chan map[string]interface{}
  35. saveEsAllPool chan map[string]interface{}
  36. saveEsElsePool chan map[string]interface{}
  37. saveProjectEsPool chan map[string]interface{}
  38. updateBiddingSp chan bool
  39. updateExtractSp chan bool
  40. saveEsSp chan bool
  41. saveEsAllSp chan bool
  42. saveEsElseSp chan bool
  43. saveProjectSp chan bool
  44. JyUdpAddr *net.UDPAddr
  45. esAddr, esNode string
  46. FilterKeyword []string //正文竟品关键词过滤
  47. ProvinceDict map[string][]Province //省份-map
  48. CityDict map[string][]City //城市-map
  49. DistrictDict map[string][]District //区县-map
  50. bidding, extract, qyxy, project, standard, buyer, winner, biddingIndex map[string]interface{}
  51. MgoBulkSize = 200 // mgo批量保存大小
  52. EsBulkSize = 200 // es批量保存大小
  53. detailLength = 50000 // es保存detail长度
  54. fileLength = 50000 // es保存附件文本长度
  55. pscopeLength = 32766 // projectscope长度
  56. )
  57. var StopFlag = false // 程序生索引停止标志
  58. func init() {
  59. util.ReadConfig(&Sysconfig)
  60. updport, _ = Sysconfig["updport"].(string)
  61. bidding, _ = Sysconfig["mgo_bidding"].(map[string]interface{})
  62. extract, _ = Sysconfig["mgo_extract"].(map[string]interface{})
  63. qyxy, _ = Sysconfig["mgo_qyxy"].(map[string]interface{})
  64. project, _ = Sysconfig["mgo_project"].(map[string]interface{})
  65. standard, _ = Sysconfig["standard"].(map[string]interface{})
  66. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  67. winner, _ = Sysconfig["winner"].(map[string]interface{})
  68. currentColl = util.ObjToString(bidding["collect"])
  69. biddingIndex, _ = Sysconfig["bidding_index"].(map[string]interface{})
  70. biddingMgo = &mongodb.MongodbSim{
  71. MongodbAddr: bidding["addr"].(string),
  72. Size: util.IntAllDef(bidding["pool"], 5),
  73. DbName: bidding["db"].(string),
  74. UserName: bidding["uname"].(string),
  75. Password: bidding["upwd"].(string),
  76. }
  77. biddingMgo.InitPool()
  78. extractMgo = &mongodb.MongodbSim{
  79. MongodbAddr: extract["addr"].(string),
  80. Size: util.IntAllDef(extract["pool"], 5),
  81. DbName: extract["db"].(string),
  82. }
  83. extractMgo.InitPool()
  84. qyxyMgo = &mongodb.MongodbSim{
  85. MongodbAddr: qyxy["addr"].(string),
  86. Size: util.IntAllDef(qyxy["pool"], 5),
  87. DbName: qyxy["db"].(string),
  88. }
  89. qyxyMgo.InitPool()
  90. projectMgo = &mongodb.MongodbSim{
  91. MongodbAddr: project["addr"].(string),
  92. Size: util.IntAllDef(project["pool"], 5),
  93. DbName: project["db"].(string),
  94. //UserName: bidding["uname"].(string),
  95. //Password: bidding["upwd"].(string),
  96. }
  97. projectMgo.InitPool()
  98. standardMgo = &mongodb.MongodbSim{
  99. MongodbAddr: standard["addr"].(string),
  100. Size: util.IntAllDef(standard["pool"], 5),
  101. DbName: standard["db"].(string),
  102. UserName: bidding["uname"].(string),
  103. Password: bidding["upwd"].(string),
  104. }
  105. standardMgo.InitPool()
  106. econf1 := Sysconfig["elastic_1"].(map[string]interface{})
  107. Es1 = &elastic.Elastic{
  108. S_esurl: econf1["addr"].(string),
  109. I_size: util.IntAllDef(econf1["pool"], 5),
  110. }
  111. Es1.InitElasticSize()
  112. econf2 := Sysconfig["elastic_2"].(map[string]interface{})
  113. Es2 = &elastic.Elastic{
  114. S_esurl: econf2["addr"].(string),
  115. I_size: util.IntAllDef(econf2["pool"], 5),
  116. }
  117. Es2.InitElasticSize()
  118. if mi := util.ObjToString(biddingIndex["multiIndex"]); mi != "" {
  119. multiIndex = strings.Split(mi, ",")
  120. }
  121. util.Debug(multiIndex)
  122. biddingMgoFields = strings.Split(biddingIndex["mgofields"].(string), ",")
  123. biddingEsFields = biddingIndex["esfieldsmap"].(map[string]interface{}) // bidding es字段
  124. projectinfoFields = biddingIndex["projectinfomap"].(map[string]interface{}) // projectinfo
  125. purchasinglistFields = biddingIndex["purchasinglistmap"].(map[string]interface{}) //采购清单
  126. procurementlisFields = biddingIndex["procurementlistmap"].(map[string]interface{}) //采购意向
  127. winnerorderlistFields = biddingIndex["winnerordermap"].(map[string]interface{}) //中标候选
  128. FilterKeyword = util.ObjArrToStringArr(Sysconfig["filter-keyword"].([]interface{}))
  129. initCheckCity()
  130. //初始化oss
  131. oss.InitOss()
  132. InitFileInfo()
  133. m := Sysconfig["jyfb_udp"].(map[string]interface{})
  134. JyUdpAddr = &net.UDPAddr{
  135. IP: net.ParseIP(m["addr"].(string)),
  136. Port: util.IntAll(m["port"]),
  137. }
  138. updateBiddingPool = make(chan []map[string]interface{}, 5000)
  139. updateBiddingSp = make(chan bool, 5)
  140. updateExtractPool = make(chan []map[string]interface{}, 5000)
  141. updateExtractSp = make(chan bool, 5)
  142. saveEsPool = make(chan map[string]interface{}, 5000)
  143. saveEsSp = make(chan bool, 5)
  144. saveEsAllPool = make(chan map[string]interface{}, 5000)
  145. saveEsAllSp = make(chan bool, 5)
  146. saveEsElsePool = make(chan map[string]interface{}, 5000)
  147. saveEsElseSp = make(chan bool, 5)
  148. saveProjectEsPool = make(chan map[string]interface{}, 5000)
  149. saveProjectSp = make(chan bool, 5)
  150. }
  151. func NewTk(m map[string]interface{}) *TaskInfo {
  152. p := &TaskInfo{
  153. stype: m["stype"].(string),
  154. currentTime: time.Now().Unix(),
  155. }
  156. return p
  157. }
  158. type TaskInfo struct {
  159. stype string
  160. thread int
  161. currentTime int64
  162. bidding_count int
  163. extract_count int
  164. es_count int
  165. }
  166. type Province struct {
  167. P_Name string
  168. }
  169. type City struct {
  170. P_Name string
  171. C_Name string
  172. }
  173. type District struct {
  174. P_Name string
  175. C_Name string
  176. D_Name string
  177. }
  178. //初始化城市
  179. func initCheckCity() {
  180. //初始化-城市配置
  181. ProvinceDict = make(map[string][]Province, 0)
  182. CityDict = make(map[string][]City, 0)
  183. DistrictDict = make(map[string][]District, 0)
  184. q := map[string]interface{}{
  185. "town_code": map[string]interface{}{
  186. "$exists": 0,
  187. },
  188. }
  189. sess := standardMgo.GetMgoConn()
  190. defer standardMgo.DestoryMongoConn(sess)
  191. it := sess.DB(standardMgo.DbName).C(util.ObjToString(standard["coll_area"])).Find(&q).Iter()
  192. total := 0
  193. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  194. district_code := util.IntAll(tmp["district_code"])
  195. city_code := util.IntAll(tmp["city_code"])
  196. if district_code > 0 {
  197. province := util.ObjToString(tmp["province"])
  198. city := util.ObjToString(tmp["city"])
  199. district := util.ObjToString(tmp["district"])
  200. data := District{province, city, district}
  201. if DistrictDict[district] == nil {
  202. DistrictDict[district] = []District{data}
  203. } else {
  204. arr := DistrictDict[district]
  205. arr = append(arr, data)
  206. DistrictDict[district] = arr
  207. }
  208. } else {
  209. if city_code > 0 {
  210. province := util.ObjToString(tmp["province"])
  211. city := util.ObjToString(tmp["city"])
  212. data := City{province, city}
  213. if CityDict[city] == nil {
  214. CityDict[city] = []City{data}
  215. } else {
  216. arr := CityDict[city]
  217. arr = append(arr, data)
  218. CityDict[city] = arr
  219. }
  220. } else {
  221. province := util.ObjToString(tmp["province"])
  222. data := Province{province}
  223. if ProvinceDict[province] == nil {
  224. ProvinceDict[province] = []Province{data}
  225. } else {
  226. arr := ProvinceDict[province]
  227. arr = append(arr, data)
  228. ProvinceDict[province] = arr
  229. }
  230. }
  231. }
  232. tmp = make(map[string]interface{})
  233. }
  234. util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
  235. }