main.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. mu "mfw/util"
  8. "mongodb"
  9. "net"
  10. "net/http"
  11. _ "net/http/pprof"
  12. "qfw/util"
  13. elastic "qfw/util/elastic"
  14. "qfw/util/redis"
  15. "strings"
  16. "time"
  17. u "util"
  18. )
  19. type Province struct {
  20. P_Name string
  21. }
  22. type City struct {
  23. P_Name string
  24. C_Name string
  25. }
  26. type District struct {
  27. P_Name string
  28. C_Name string
  29. D_Name string
  30. }
  31. var (
  32. Sysconfig map[string]interface{} //配置文件
  33. mgo *mongodb.MongodbSim //mongodb操作对象
  34. extractmgo *mongodb.MongodbSim //mongodb操作对象
  35. project2db *mongodb.MongodbSim //mongodb操作对象
  36. mgostandard *mongodb.MongodbSim //mongodb操作对象
  37. qyxydb *mongodb.MongodbSim //mongodb操作对象
  38. udpclient mu.UdpClient //udp对象
  39. updport string
  40. savesizei = 500
  41. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  42. biddingIndexFieldsMap = map[string]string{}
  43. projectinfoFields []string
  44. projectinfoFieldsMap = map[string]string{}
  45. multiIndex []string
  46. purchasinglistFields []string
  47. winnerorderlistFields []string
  48. purchasinglistFieldsMap = map[string]string{}
  49. winnerorderlistFieldsMap = map[string]string{}
  50. BulkSize = 400
  51. detailLength = 50000
  52. fileLength = 50000
  53. //bidding_other连接信息
  54. bidding_other_es *elastic.Elastic
  55. other_index string
  56. other_itype string
  57. esAddr string
  58. esNode string
  59. FilterKeyword []string //正文竟品关键词过滤
  60. ProvinceDict map[string][]Province //省份-map
  61. CityDict map[string][]City //城市-map
  62. DistrictDict map[string][]District //区县-map
  63. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  64. )
  65. var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
  66. var SP = make(chan bool, 5)
  67. var SaveLogChan = make(chan []map[string]interface{}, 1000)
  68. var SaveSp = make(chan bool, 5)
  69. var StopFlag = false // 程序生索引停止标志
  70. func init() {
  71. util.ReadConfig(&Sysconfig)
  72. // company_id
  73. redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
  74. inits()
  75. //go checkMapJob()
  76. detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
  77. fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
  78. updport, _ = Sysconfig["updport"].(string)
  79. winner, _ = Sysconfig["winner"].(map[string]interface{})
  80. standard, _ = Sysconfig["standard"].(map[string]interface{})
  81. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  82. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  83. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  84. project, _ = Sysconfig["project"].(map[string]interface{})
  85. project2, _ = Sysconfig["project2"].(map[string]interface{})
  86. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  87. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  88. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  89. MongodbAddr: mconf["addr"].(string),
  90. Size: util.IntAllDef(mconf["pool"], 5),
  91. DbName: mconf["db"].(string),
  92. UserName: Sysconfig["uname"].(string),
  93. Password: Sysconfig["upwd"].(string),
  94. }
  95. mgo.InitPool()
  96. project2db = &mongodb.MongodbSim{
  97. MongodbAddr: project2["addr"].(string),
  98. Size: util.IntAllDef(project2["pool"], 5),
  99. DbName: project2["db"].(string),
  100. }
  101. project2db.InitPool()
  102. //企业数据
  103. qyxydb = &mongodb.MongodbSim{
  104. MongodbAddr: qyxy_ent["addr"].(string),
  105. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  106. DbName: qyxy_ent["db"].(string),
  107. }
  108. qyxydb.InitPool()
  109. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  110. if savedb == nil {
  111. log.Println("未设置保存数据库,默认使用招标库")
  112. extractmgo = mgo
  113. } else { //savedb为抽取连接
  114. addr, _ := savedb["addr"].(string)
  115. size := util.IntAllDef(savedb["size"], 5)
  116. db, _ := savedb["db"].(string)
  117. extractmgo = &mongodb.MongodbSim{
  118. MongodbAddr: addr,
  119. Size: size,
  120. DbName: db,
  121. }
  122. extractmgo.InitPool()
  123. }
  124. mgostandard = &mongodb.MongodbSim{
  125. MongodbAddr: standard["addr"].(string),
  126. Size: util.IntAllDef(standard["pool"], 5),
  127. DbName: standard["db"].(string),
  128. UserName: Sysconfig["uname"].(string),
  129. Password: Sysconfig["upwd"].(string),
  130. }
  131. mgostandard.InitPool()
  132. //初始化es
  133. //bidding
  134. econf := Sysconfig["elastic"].(map[string]interface{})
  135. esAddr = econf["addr"].(string)
  136. esNode = econf["node"].(string)
  137. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  138. //bidding_other
  139. if Sysconfig["elastic_other"] != nil {
  140. econf_other := Sysconfig["elastic_other"].(map[string]interface{})
  141. other_index = econf_other["index"].(string)
  142. other_itype = econf_other["type"].(string)
  143. bidding_other_es = &elastic.Elastic{
  144. S_esurl: econf_other["addr"].(string),
  145. I_size: util.IntAllDef(econf_other["pool"], 5),
  146. }
  147. bidding_other_es.InitElasticSize()
  148. }
  149. //
  150. if bidding["indexfields"] != nil {
  151. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  152. }
  153. if bidding["projectinfo"] != nil {
  154. pf := util.ObjToString(bidding["projectinfo"])
  155. if pf != "" {
  156. projectinfoFields = strings.Split(pf, ",")
  157. }
  158. }
  159. if bidding["purchasinglist"] != nil {
  160. pcl := util.ObjToString(bidding["purchasinglist"])
  161. if pcl != "" {
  162. purchasinglistFields = strings.Split(pcl, ",")
  163. }
  164. }
  165. if bidding["winnerorder"] != nil {
  166. winnerorder := util.ObjToString(bidding["winnerorder"])
  167. if winnerorder != "" {
  168. winnerorderlistFields = strings.Split(winnerorder, ",")
  169. }
  170. }
  171. if bidding["multiIndex"] != nil {
  172. mi := util.ObjToString(bidding["multiIndex"])
  173. if mi != "" {
  174. multiIndex = strings.Split(mi, ",")
  175. }
  176. }
  177. //
  178. if bidding["indexfieldsmap"] != nil {
  179. for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
  180. biddingIndexFieldsMap[k] = util.ObjToString(v)
  181. }
  182. log.Println(biddingIndexFieldsMap)
  183. }
  184. if bidding["projectinfomap"] != nil {
  185. for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
  186. projectinfoFieldsMap[k] = util.ObjToString(v)
  187. }
  188. log.Println(projectinfoFieldsMap)
  189. }
  190. if bidding["purchasinglistmap"] != nil {
  191. for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
  192. purchasinglistFieldsMap[k] = util.ObjToString(v)
  193. }
  194. log.Println(purchasinglistFieldsMap)
  195. }
  196. if bidding["winnerordermap"] != nil {
  197. for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
  198. winnerorderlistFieldsMap[k] = util.ObjToString(v)
  199. }
  200. log.Println(winnerorderlistFieldsMap)
  201. }
  202. log.Println(projectinfoFields)
  203. log.Println(purchasinglistFields)
  204. initCheckCity()
  205. FilterKeyword = util.ObjArrToStringArr(Sysconfig["filter-keyword"].([]interface{}))
  206. //初始化oss
  207. u.InitOss()
  208. }
  209. func main() {
  210. //go inspectQuery()
  211. //go task_index()
  212. go UpdateExtract() //抽取表中新增entidlist字段
  213. updport := Sysconfig["udpport"].(string)
  214. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  215. udpclient.Listen(processUdpMsg)
  216. log.Println("Udp服务监听", updport)
  217. ch := make(chan bool, 1)
  218. <-ch
  219. }
  220. /**
  221. 检查es查询队列 10s查询一次
  222. */
  223. func inspectQuery() {
  224. ticker := time.NewTicker(time.Second * 10)
  225. url := esAddr + "/_nodes/stats/thread_pool"
  226. for range ticker.C {
  227. resp, _ := http.Get(url)
  228. if resp != nil && resp.Body != nil {
  229. defer resp.Body.Close()
  230. }
  231. body, _ := ioutil.ReadAll(resp.Body)
  232. respMap := make(map[string]interface{})
  233. err := json.Unmarshal(body, &respMap)
  234. if err == nil {
  235. if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
  236. if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
  237. if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
  238. index, _ := pool["index"].(map[string]interface{})
  239. search, _ := pool["search"].(map[string]interface{})
  240. bulk, _ := pool["bulk"].(map[string]interface{})
  241. if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 {
  242. util.Debug("es thread_pool index queue---", index["queue"])
  243. util.Debug("es thread_pool search queue---", search["queue"])
  244. util.Debug("es thread_pool bulk queue---", bulk["queue"])
  245. StopFlag = true
  246. } else {
  247. StopFlag = false
  248. }
  249. }
  250. }
  251. }
  252. }
  253. }
  254. }
  255. var pool = make(chan bool, 20)
  256. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  257. switch act {
  258. case mu.OP_TYPE_DATA: //上个节点的数据
  259. //从表中开始处理生成企业数据
  260. var mapInfo map[string]interface{}
  261. err := json.Unmarshal(data, &mapInfo)
  262. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  263. if err != nil {
  264. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  265. } else if mapInfo != nil {
  266. key, _ := mapInfo["key"].(string)
  267. if key == "" {
  268. key = "udpok"
  269. }
  270. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  271. tasktype, _ := mapInfo["stype"].(string)
  272. log.Println("tasktype:", tasktype)
  273. switch tasktype {
  274. case "winner":
  275. pool <- true
  276. go func() {
  277. defer func() {
  278. <-pool
  279. }()
  280. winnerTask(data, mapInfo)
  281. }()
  282. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  283. pool <- true
  284. go func() {
  285. defer func() {
  286. <-pool
  287. }()
  288. biddingTask(data, mapInfo, tasktype)
  289. }()
  290. case "bidding_history": //增量id段历史数据
  291. pool <- true
  292. go func() {
  293. defer func() {
  294. <-pool
  295. }()
  296. biddingTask(data, mapInfo, tasktype)
  297. }()
  298. case "project":
  299. pool <- true
  300. go func() {
  301. defer func() {
  302. <-pool
  303. }()
  304. projectTask(data, project, mapInfo)
  305. }()
  306. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  307. pool <- true
  308. go func() {
  309. defer func() {
  310. <-pool
  311. }()
  312. biddingBackTask(data, mapInfo)
  313. }()
  314. case "biddingall": //合并并重新生成索引,不生成关键词
  315. pool <- true
  316. go func() {
  317. defer func() {
  318. <-pool
  319. }()
  320. biddingAllTask(data, mapInfo)
  321. }()
  322. case "biddingdata": //bidding全量数据
  323. pool <- true
  324. go func() {
  325. defer func() {
  326. <-pool
  327. }()
  328. biddingDataTask(data, mapInfo)
  329. }()
  330. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  331. pool <- true
  332. go func() {
  333. defer func() {
  334. <-pool
  335. }()
  336. biddingMergeTask(data, mapInfo)
  337. }()
  338. case "buyer":
  339. pool <- true
  340. go func() {
  341. defer func() {
  342. <-pool
  343. }()
  344. buyerTask(data, mapInfo)
  345. }()
  346. case "winnerent": //标准库
  347. pool <- true
  348. go func() {
  349. defer func() {
  350. <-pool
  351. }()
  352. standardTask("winnerent", mapInfo)
  353. }()
  354. case "buyerent": //标准库
  355. pool <- true
  356. go func() {
  357. defer func() {
  358. <-pool
  359. }()
  360. standardTask("buyerent", mapInfo)
  361. }()
  362. case "agencyent": //标准库
  363. pool <- true
  364. go func() {
  365. defer func() {
  366. <-pool
  367. }()
  368. standardTask("agencyent", mapInfo)
  369. }()
  370. case "biddingdelbyextract": //根据repeat删除es
  371. pool <- true
  372. go func() {
  373. defer func() {
  374. <-pool
  375. }()
  376. biddingDelByExtract(data, mapInfo)
  377. }()
  378. case "biddingdelbyextracttype": //根据extracttype删除es
  379. pool <- true
  380. go func() {
  381. defer func() {
  382. <-pool
  383. }()
  384. biddingDelByExtracttype(data, mapInfo)
  385. }()
  386. default:
  387. pool <- true
  388. go func() {
  389. defer func() {
  390. <-pool
  391. }()
  392. defaultFunc(data, mapInfo)
  393. }()
  394. }
  395. }
  396. case mu.OP_NOOP: //下个节点回应
  397. log.Println("发送成功", string(data))
  398. }
  399. }
  400. //初始化城市
  401. func initCheckCity() {
  402. //初始化-城市配置
  403. ProvinceDict = make(map[string][]Province, 0)
  404. CityDict = make(map[string][]City, 0)
  405. DistrictDict = make(map[string][]District, 0)
  406. q := map[string]interface{}{
  407. "town_code": map[string]interface{}{
  408. "$exists": 0,
  409. },
  410. }
  411. sess := mgostandard.GetMgoConn()
  412. defer mgostandard.DestoryMongoConn(sess)
  413. it := sess.DB("mixdata").C(util.ObjToString(standard["coll_area"])).Find(&q).Iter()
  414. total := 0
  415. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  416. if total%1000 == 0 {
  417. log.Println("当前数量:", total)
  418. }
  419. district_code := util.IntAll(tmp["district_code"])
  420. city_code := util.IntAll(tmp["city_code"])
  421. if district_code > 0 {
  422. province := util.ObjToString(tmp["province"])
  423. city := util.ObjToString(tmp["city"])
  424. district := util.ObjToString(tmp["district"])
  425. data := District{province, city, district}
  426. if DistrictDict[district] == nil {
  427. DistrictDict[district] = []District{data}
  428. } else {
  429. arr := DistrictDict[district]
  430. arr = append(arr, data)
  431. DistrictDict[district] = arr
  432. }
  433. } else {
  434. if city_code > 0 {
  435. province := util.ObjToString(tmp["province"])
  436. city := util.ObjToString(tmp["city"])
  437. data := City{province, city}
  438. if CityDict[city] == nil {
  439. CityDict[city] = []City{data}
  440. } else {
  441. arr := CityDict[city]
  442. arr = append(arr, data)
  443. CityDict[city] = arr
  444. }
  445. } else {
  446. province := util.ObjToString(tmp["province"])
  447. data := Province{province}
  448. if ProvinceDict[province] == nil {
  449. ProvinceDict[province] = []Province{data}
  450. } else {
  451. arr := ProvinceDict[province]
  452. arr = append(arr, data)
  453. ProvinceDict[province] = arr
  454. }
  455. }
  456. }
  457. tmp = make(map[string]interface{})
  458. }
  459. util.Debug(fmt.Sprintf("城市配置加载完毕...省~%d 市~%d 区~%d", len(ProvinceDict), len(CityDict), len(DistrictDict)))
  460. }
  461. func saveLog() {
  462. arru := make([][]map[string]interface{}, 200)
  463. indexu := 0
  464. for {
  465. select {
  466. case v := <-SaveLogChan:
  467. arru[indexu] = v
  468. indexu++
  469. if indexu == 200 {
  470. SaveSp <- true
  471. go func(arru [][]map[string]interface{}) {
  472. defer func() {
  473. <-SaveSp
  474. }()
  475. extractmgo.UpSertBulk("createIndex_log", arru...)
  476. }(arru)
  477. arru = make([][]map[string]interface{}, 200)
  478. indexu = 0
  479. }
  480. case <-time.After(1000 * time.Millisecond):
  481. if indexu > 0 {
  482. SaveSp <- true
  483. go func(arru [][]map[string]interface{}) {
  484. defer func() {
  485. <-SaveSp
  486. }()
  487. extractmgo.UpSertBulk("createIndex_log", arru...)
  488. }(arru[:indexu])
  489. arru = make([][]map[string]interface{}, 200)
  490. indexu = 0
  491. }
  492. }
  493. }
  494. }