main.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. //"time"
  12. u "util"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. mgo *mongodb.MongodbSim //mongodb操作对象
  17. extractmgo *mongodb.MongodbSim //mongodb操作对象
  18. project2db *mongodb.MongodbSim //mongodb操作对象
  19. mgostandard *mongodb.MongodbSim //mongodb操作对象
  20. qyxydb *mongodb.MongodbSim //mongodb操作对象
  21. udpclient mu.UdpClient //udp对象
  22. updport string
  23. savesizei = 500
  24. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  25. biddingIndexFieldsMap = map[string]string{}
  26. projectinfoFields []string
  27. projectinfoFieldsMap = map[string]string{}
  28. multiIndex []string
  29. purchasinglistFields []string
  30. winnerorderlistFields []string
  31. purchasinglistFieldsMap = map[string]string{}
  32. winnerorderlistFieldsMap = map[string]string{}
  33. BulkSize = 400
  34. detailLength = 50000
  35. fileLength = 50000
  36. //bidding_other连接信息
  37. bidding_other_es *elastic.Elastic
  38. other_index string
  39. other_itype string
  40. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  41. )
  42. var UpdataMgoCache = make(chan []map[string]interface{}, 1000)
  43. var SP = make(chan bool, 5)
  44. func init() {
  45. util.ReadConfig(&Sysconfig)
  46. inits()
  47. go checkMapJob()
  48. detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
  49. fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
  50. updport, _ = Sysconfig["updport"].(string)
  51. winner, _ = Sysconfig["winner"].(map[string]interface{})
  52. standard, _ = Sysconfig["standard"].(map[string]interface{})
  53. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  54. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  55. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  56. project, _ = Sysconfig["project"].(map[string]interface{})
  57. project2, _ = Sysconfig["project2"].(map[string]interface{})
  58. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  59. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  60. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  61. MongodbAddr: mconf["addr"].(string),
  62. Size: util.IntAllDef(mconf["pool"], 5),
  63. DbName: mconf["db"].(string),
  64. ReplSet: "bidding",
  65. }
  66. mgo.InitPool()
  67. project2db = &mongodb.MongodbSim{
  68. MongodbAddr: project2["addr"].(string),
  69. Size: util.IntAllDef(project2["pool"], 5),
  70. DbName: project2["db"].(string),
  71. }
  72. project2db.InitPool()
  73. //企业信用
  74. qyxydb = &mongodb.MongodbSim{
  75. MongodbAddr: qyxy_ent["addr"].(string),
  76. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  77. DbName: qyxy_ent["db"].(string),
  78. }
  79. qyxydb.InitPool()
  80. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  81. if savedb == nil {
  82. log.Println("未设置保存数据库,默认使用招标库")
  83. extractmgo = mgo
  84. } else { //savedb为抽取连接
  85. addr, _ := savedb["addr"].(string)
  86. size := util.IntAllDef(savedb["size"], 5)
  87. db, _ := savedb["db"].(string)
  88. extractmgo = &mongodb.MongodbSim{
  89. MongodbAddr: addr,
  90. Size: size,
  91. DbName: db,
  92. }
  93. extractmgo.InitPool()
  94. }
  95. mgostandard = &mongodb.MongodbSim{
  96. MongodbAddr: standard["addr"].(string),
  97. ReplSet: "bidding",
  98. Size: util.IntAllDef(standard["pool"], 5),
  99. DbName: standard["db"].(string),
  100. }
  101. mgostandard.InitPool()
  102. //初始化es
  103. //bidding
  104. econf := Sysconfig["elastic"].(map[string]interface{})
  105. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  106. //bidding_other
  107. if Sysconfig["elastic_other"] != nil {
  108. econf_other := Sysconfig["elastic_other"].(map[string]interface{})
  109. other_index = econf_other["index"].(string)
  110. other_itype = econf_other["type"].(string)
  111. bidding_other_es = &elastic.Elastic{
  112. S_esurl: econf_other["addr"].(string),
  113. I_size: util.IntAllDef(econf_other["pool"], 5),
  114. }
  115. bidding_other_es.InitElasticSize()
  116. }
  117. //
  118. if bidding["indexfields"] != nil {
  119. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  120. }
  121. if bidding["projectinfo"] != nil {
  122. pf := util.ObjToString(bidding["projectinfo"])
  123. if pf != "" {
  124. projectinfoFields = strings.Split(pf, ",")
  125. }
  126. }
  127. if bidding["purchasinglist"] != nil {
  128. pcl := util.ObjToString(bidding["purchasinglist"])
  129. if pcl != "" {
  130. purchasinglistFields = strings.Split(pcl, ",")
  131. }
  132. }
  133. if bidding["winnerorder"] != nil {
  134. winnerorder := util.ObjToString(bidding["winnerorder"])
  135. if winnerorder != "" {
  136. winnerorderlistFields = strings.Split(winnerorder, ",")
  137. }
  138. }
  139. if bidding["multiIndex"] != nil {
  140. mi := util.ObjToString(bidding["multiIndex"])
  141. if mi != "" {
  142. multiIndex = strings.Split(mi, ",")
  143. }
  144. }
  145. //
  146. if bidding["indexfieldsmap"] != nil {
  147. for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
  148. biddingIndexFieldsMap[k] = util.ObjToString(v)
  149. }
  150. log.Println(biddingIndexFieldsMap)
  151. }
  152. if bidding["projectinfomap"] != nil {
  153. for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
  154. projectinfoFieldsMap[k] = util.ObjToString(v)
  155. }
  156. log.Println(projectinfoFieldsMap)
  157. }
  158. if bidding["purchasinglistmap"] != nil {
  159. for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
  160. purchasinglistFieldsMap[k] = util.ObjToString(v)
  161. }
  162. log.Println(purchasinglistFieldsMap)
  163. }
  164. if bidding["winnerordermap"] != nil {
  165. for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
  166. winnerorderlistFieldsMap[k] = util.ObjToString(v)
  167. }
  168. log.Println(winnerorderlistFieldsMap)
  169. }
  170. log.Println(projectinfoFields)
  171. log.Println(purchasinglistFields)
  172. //初始化oss
  173. u.InitOss()
  174. }
  175. func main() {
  176. go task_index()
  177. go UpdateExtract() //抽取表中新增entidlist字段
  178. updport := Sysconfig["udpport"].(string)
  179. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  180. udpclient.Listen(processUdpMsg)
  181. log.Println("Udp服务监听", updport)
  182. // time.Sleep(99999 * time.Hour)
  183. ch := make(chan bool, 1)
  184. <-ch
  185. }
  186. var pool = make(chan bool, 20)
  187. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  188. switch act {
  189. case mu.OP_TYPE_DATA: //上个节点的数据
  190. //从表中开始处理生成企业数据
  191. var mapInfo map[string]interface{}
  192. err := json.Unmarshal(data, &mapInfo)
  193. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  194. if err != nil {
  195. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  196. } else if mapInfo != nil {
  197. key, _ := mapInfo["key"].(string)
  198. if key == "" {
  199. key = "udpok"
  200. }
  201. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  202. tasktype, _ := mapInfo["stype"].(string)
  203. log.Println("tasktype:", tasktype)
  204. switch tasktype {
  205. case "winner":
  206. pool <- true
  207. go func() {
  208. defer func() {
  209. <-pool
  210. }()
  211. winnerTask(data, mapInfo)
  212. }()
  213. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  214. pool <- true
  215. go func() {
  216. defer func() {
  217. <-pool
  218. }()
  219. biddingTask(data, mapInfo)
  220. }()
  221. case "project":
  222. pool <- true
  223. go func() {
  224. defer func() {
  225. <-pool
  226. }()
  227. projectTask(data, project, mapInfo)
  228. }()
  229. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  230. pool <- true
  231. go func() {
  232. defer func() {
  233. <-pool
  234. }()
  235. biddingBackTask(data, mapInfo)
  236. }()
  237. case "biddingall": //合并并重新生成索引,不生成关键词
  238. pool <- true
  239. go func() {
  240. defer func() {
  241. <-pool
  242. }()
  243. biddingAllTask(data, mapInfo)
  244. }()
  245. case "biddingdata": //联表生成索引不合并,不生成关键词
  246. pool <- true
  247. go func() {
  248. defer func() {
  249. <-pool
  250. }()
  251. biddingDataTask(data, mapInfo)
  252. }()
  253. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  254. pool <- true
  255. go func() {
  256. defer func() {
  257. <-pool
  258. }()
  259. biddingMergeTask(data, mapInfo)
  260. }()
  261. case "buyer":
  262. pool <- true
  263. go func() {
  264. defer func() {
  265. <-pool
  266. }()
  267. buyerTask(data, mapInfo)
  268. }()
  269. case "winnerent": //标准库
  270. pool <- true
  271. go func() {
  272. defer func() {
  273. <-pool
  274. }()
  275. standardTask("winnerent", mapInfo)
  276. }()
  277. case "buyerent": //标准库
  278. pool <- true
  279. go func() {
  280. defer func() {
  281. <-pool
  282. }()
  283. standardTask("buyerent", mapInfo)
  284. }()
  285. case "agencyent": //标准库
  286. pool <- true
  287. go func() {
  288. defer func() {
  289. <-pool
  290. }()
  291. standardTask("agencyent", mapInfo)
  292. }()
  293. case "biddingdelbyextract": //根据repeat删除es
  294. pool <- true
  295. go func() {
  296. defer func() {
  297. <-pool
  298. }()
  299. biddingDelByExtract(data, mapInfo)
  300. }()
  301. case "biddingdelbyextracttype": //根据extracttype删除es
  302. pool <- true
  303. go func() {
  304. defer func() {
  305. <-pool
  306. }()
  307. biddingDelByExtracttype(data, mapInfo)
  308. }()
  309. default:
  310. pool <- true
  311. go func() {
  312. defer func() {
  313. <-pool
  314. }()
  315. defaultFunc(data, mapInfo)
  316. }()
  317. }
  318. }
  319. case mu.OP_NOOP: //下个节点回应
  320. log.Println("发送成功", string(data))
  321. }
  322. }