main.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. //"time"
  12. u "util"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. mgo *mongodb.MongodbSim //mongodb操作对象
  17. extractmgo *mongodb.MongodbSim //mongodb操作对象
  18. project2db *mongodb.MongodbSim //mongodb操作对象
  19. mgostandard *mongodb.MongodbSim //mongodb操作对象
  20. qyxydb *mongodb.MongodbSim //mongodb操作对象
  21. udpclient mu.UdpClient //udp对象
  22. updport string
  23. savesizei = 500
  24. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  25. biddingIndexFieldsMap = map[string]string{}
  26. projectinfoFields []string
  27. projectinfoFieldsMap = map[string]string{}
  28. multiIndex []string
  29. purchasinglistFields []string
  30. winnerorderlistFields []string
  31. purchasinglistFieldsMap = map[string]string{}
  32. winnerorderlistFieldsMap = map[string]string{}
  33. BulkSize = 400
  34. detailLength = 50000
  35. fileLength = 50000
  36. //bidding_other连接信息
  37. bidding_other_es *elastic.Elastic
  38. other_index string
  39. other_itype string
  40. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  41. )
  42. func init() {
  43. util.ReadConfig(&Sysconfig)
  44. //inits()
  45. //go checkMapJob()
  46. detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
  47. fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
  48. updport, _ = Sysconfig["updport"].(string)
  49. winner, _ = Sysconfig["winner"].(map[string]interface{})
  50. standard, _ = Sysconfig["standard"].(map[string]interface{})
  51. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  52. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  53. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  54. project, _ = Sysconfig["project"].(map[string]interface{})
  55. project2, _ = Sysconfig["project2"].(map[string]interface{})
  56. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  57. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  58. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  59. MongodbAddr: mconf["addr"].(string),
  60. Size: util.IntAllDef(mconf["pool"], 5),
  61. DbName: mconf["db"].(string),
  62. //ReplSet: "bidding",
  63. }
  64. mgo.InitPool()
  65. project2db = &mongodb.MongodbSim{
  66. MongodbAddr: project2["addr"].(string),
  67. Size: util.IntAllDef(project2["pool"], 5),
  68. DbName: project2["db"].(string),
  69. }
  70. project2db.InitPool()
  71. //企业信用
  72. qyxydb = &mongodb.MongodbSim{
  73. MongodbAddr: qyxy_ent["addr"].(string),
  74. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  75. DbName: qyxy_ent["db"].(string),
  76. }
  77. qyxydb.InitPool()
  78. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  79. if savedb == nil {
  80. log.Println("未设置保存数据库,默认使用招标库")
  81. extractmgo = mgo
  82. } else { //savedb为抽取连接
  83. addr, _ := savedb["addr"].(string)
  84. size := util.IntAllDef(savedb["size"], 5)
  85. db, _ := savedb["db"].(string)
  86. extractmgo = &mongodb.MongodbSim{
  87. MongodbAddr: addr,
  88. Size: size,
  89. DbName: db,
  90. }
  91. extractmgo.InitPool()
  92. }
  93. mgostandard = &mongodb.MongodbSim{
  94. MongodbAddr: standard["addr"].(string),
  95. //ReplSet: "bidding",
  96. Size: util.IntAllDef(standard["pool"], 5),
  97. DbName: standard["db"].(string),
  98. }
  99. mgostandard.InitPool()
  100. //初始化es
  101. //bidding
  102. econf := Sysconfig["elastic"].(map[string]interface{})
  103. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  104. //bidding_other
  105. if Sysconfig["elastic_other"] != nil {
  106. econf_other := Sysconfig["elastic_other"].(map[string]interface{})
  107. other_index = econf_other["index"].(string)
  108. other_itype = econf_other["type"].(string)
  109. bidding_other_es = &elastic.Elastic{
  110. S_esurl: econf_other["addr"].(string),
  111. I_size: util.IntAllDef(econf_other["pool"], 5),
  112. }
  113. bidding_other_es.InitElasticSize()
  114. }
  115. //
  116. if bidding["indexfields"] != nil {
  117. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  118. }
  119. if bidding["projectinfo"] != nil {
  120. pf := util.ObjToString(bidding["projectinfo"])
  121. if pf != "" {
  122. projectinfoFields = strings.Split(pf, ",")
  123. }
  124. }
  125. if bidding["purchasinglist"] != nil {
  126. pcl := util.ObjToString(bidding["purchasinglist"])
  127. if pcl != "" {
  128. purchasinglistFields = strings.Split(pcl, ",")
  129. }
  130. }
  131. if bidding["winnerorder"] != nil {
  132. winnerorder := util.ObjToString(bidding["winnerorder"])
  133. if winnerorder != "" {
  134. winnerorderlistFields = strings.Split(winnerorder, ",")
  135. }
  136. }
  137. if bidding["multiIndex"] != nil {
  138. mi := util.ObjToString(bidding["multiIndex"])
  139. if mi != "" {
  140. multiIndex = strings.Split(mi, ",")
  141. }
  142. }
  143. //
  144. if bidding["indexfieldsmap"] != nil {
  145. for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
  146. biddingIndexFieldsMap[k] = util.ObjToString(v)
  147. }
  148. log.Println(biddingIndexFieldsMap)
  149. }
  150. if bidding["projectinfomap"] != nil {
  151. for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
  152. projectinfoFieldsMap[k] = util.ObjToString(v)
  153. }
  154. log.Println(projectinfoFieldsMap)
  155. }
  156. if bidding["purchasinglistmap"] != nil {
  157. for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
  158. purchasinglistFieldsMap[k] = util.ObjToString(v)
  159. }
  160. log.Println(purchasinglistFieldsMap)
  161. }
  162. if bidding["winnerordermap"] != nil {
  163. for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
  164. winnerorderlistFieldsMap[k] = util.ObjToString(v)
  165. }
  166. log.Println(winnerorderlistFieldsMap)
  167. }
  168. log.Println(projectinfoFields)
  169. log.Println(purchasinglistFields)
  170. //初始化oss
  171. u.InitOss()
  172. }
  173. func main() {
  174. //go task_index()
  175. updport := Sysconfig["udpport"].(string)
  176. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  177. udpclient.Listen(processUdpMsg)
  178. log.Println("Udp服务监听", updport)
  179. // time.Sleep(99999 * time.Hour)
  180. ch := make(chan bool, 1)
  181. <-ch
  182. }
  183. var pool = make(chan bool, 20)
  184. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  185. switch act {
  186. case mu.OP_TYPE_DATA: //上个节点的数据
  187. //从表中开始处理生成企业数据
  188. var mapInfo map[string]interface{}
  189. err := json.Unmarshal(data, &mapInfo)
  190. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  191. if err != nil {
  192. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  193. } else if mapInfo != nil {
  194. key, _ := mapInfo["key"].(string)
  195. if key == "" {
  196. key = "udpok"
  197. }
  198. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  199. tasktype, _ := mapInfo["stype"].(string)
  200. log.Println("tasktype:", tasktype)
  201. switch tasktype {
  202. case "winner":
  203. pool <- true
  204. go func() {
  205. defer func() {
  206. <-pool
  207. }()
  208. winnerTask(data, mapInfo)
  209. }()
  210. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  211. pool <- true
  212. go func() {
  213. defer func() {
  214. <-pool
  215. }()
  216. biddingTask(data, mapInfo)
  217. }()
  218. case "project":
  219. pool <- true
  220. go func() {
  221. defer func() {
  222. <-pool
  223. }()
  224. projectTask(data, project, mapInfo)
  225. }()
  226. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  227. pool <- true
  228. go func() {
  229. defer func() {
  230. <-pool
  231. }()
  232. biddingBackTask(data, mapInfo)
  233. }()
  234. case "biddingall": //合并并重新生成索引,不生成关键词
  235. pool <- true
  236. go func() {
  237. defer func() {
  238. <-pool
  239. }()
  240. biddingAllTask(data, mapInfo)
  241. }()
  242. case "biddingdata": //联表生成索引不合并,不生成关键词
  243. pool <- true
  244. go func() {
  245. defer func() {
  246. <-pool
  247. }()
  248. biddingDataTask(data, mapInfo)
  249. }()
  250. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  251. pool <- true
  252. go func() {
  253. defer func() {
  254. <-pool
  255. }()
  256. biddingMergeTask(data, mapInfo)
  257. }()
  258. case "buyer":
  259. pool <- true
  260. go func() {
  261. defer func() {
  262. <-pool
  263. }()
  264. buyerTask(data, mapInfo)
  265. }()
  266. case "winnerent": //标准库
  267. pool <- true
  268. go func() {
  269. defer func() {
  270. <-pool
  271. }()
  272. standardTask("winnerent", mapInfo)
  273. }()
  274. case "buyerent": //标准库
  275. pool <- true
  276. go func() {
  277. defer func() {
  278. <-pool
  279. }()
  280. standardTask("buyerent", mapInfo)
  281. }()
  282. case "agencyent": //标准库
  283. pool <- true
  284. go func() {
  285. defer func() {
  286. <-pool
  287. }()
  288. standardTask("agencyent", mapInfo)
  289. }()
  290. case "biddingdelbyextract": //根据repeat删除es
  291. pool <- true
  292. go func() {
  293. defer func() {
  294. <-pool
  295. }()
  296. biddingDelByExtract(data, mapInfo)
  297. }()
  298. case "biddingdelbyextracttype": //根据extracttype删除es
  299. pool <- true
  300. go func() {
  301. defer func() {
  302. <-pool
  303. }()
  304. biddingDelByExtracttype(data, mapInfo)
  305. }()
  306. default:
  307. pool <- true
  308. go func() {
  309. defer func() {
  310. <-pool
  311. }()
  312. defaultFunc(data, mapInfo)
  313. }()
  314. }
  315. }
  316. case mu.OP_NOOP: //下个节点回应
  317. log.Println("发送成功", string(data))
  318. }
  319. }