main.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. "time"
  12. u "util"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. mgo *mongodb.MongodbSim //mongodb操作对象
  17. extractmgo *mongodb.MongodbSim //mongodb操作对象
  18. project2db *mongodb.MongodbSim //mongodb操作对象
  19. mgostandard *mongodb.MongodbSim //mongodb操作对象
  20. qyxydb *mongodb.MongodbSim //mongodb操作对象
  21. udpclient mu.UdpClient //udp对象
  22. updport string
  23. savesizei = 500
  24. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  25. biddingIndexFieldsMap = map[string]string{}
  26. projectinfoFields []string
  27. projectinfoFieldsMap = map[string]string{}
  28. multiIndex []string
  29. purchasinglistFields []string
  30. winnerorderlistFields []string
  31. purchasinglistFieldsMap = map[string]string{}
  32. winnerorderlistFieldsMap = map[string]string{}
  33. BulkSize = 400
  34. detailLength = 50000
  35. fileLength = 50000
  36. //bidding_other连接信息
  37. bidding_other_es *elastic.Elastic
  38. other_index string
  39. other_itype string
  40. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  41. )
  42. func init() {
  43. util.ReadConfig(&Sysconfig)
  44. inits()
  45. go checkMapJob()
  46. detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
  47. fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
  48. updport, _ = Sysconfig["updport"].(string)
  49. winner, _ = Sysconfig["winner"].(map[string]interface{})
  50. standard, _ = Sysconfig["standard"].(map[string]interface{})
  51. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  52. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  53. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  54. project, _ = Sysconfig["project"].(map[string]interface{})
  55. project2, _ = Sysconfig["project2"].(map[string]interface{})
  56. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  57. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  58. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  59. MongodbAddr: mconf["addr"].(string),
  60. Size: util.IntAllDef(mconf["pool"], 5),
  61. DbName: mconf["db"].(string),
  62. }
  63. mgo.InitPool()
  64. project2db = &mongodb.MongodbSim{
  65. MongodbAddr: project2["addr"].(string),
  66. Size: util.IntAllDef(project2["pool"], 5),
  67. DbName: project2["db"].(string),
  68. }
  69. project2db.InitPool()
  70. //企业信用
  71. qyxydb = &mongodb.MongodbSim{
  72. MongodbAddr: qyxy_ent["addr"].(string),
  73. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  74. DbName: qyxy_ent["db"].(string),
  75. }
  76. qyxydb.InitPool()
  77. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  78. if savedb == nil {
  79. log.Println("未设置保存数据库,默认使用招标库")
  80. extractmgo = mgo
  81. } else { //savedb为抽取连接
  82. addr, _ := savedb["addr"].(string)
  83. size := util.IntAllDef(savedb["size"], 5)
  84. db, _ := savedb["db"].(string)
  85. extractmgo = &mongodb.MongodbSim{
  86. MongodbAddr: addr,
  87. Size: size,
  88. DbName: db,
  89. }
  90. extractmgo.InitPool()
  91. }
  92. mgostandard = &mongodb.MongodbSim{
  93. MongodbAddr: standard["addr"].(string),
  94. Size: util.IntAllDef(standard["pool"], 5),
  95. DbName: standard["db"].(string),
  96. }
  97. mgostandard.InitPool()
  98. //初始化es
  99. //bidding
  100. econf := Sysconfig["elastic"].(map[string]interface{})
  101. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  102. //bidding_other
  103. if Sysconfig["elastic_other"] != nil {
  104. econf_other := Sysconfig["elastic_other"].(map[string]interface{})
  105. other_index = econf_other["index"].(string)
  106. other_itype = econf_other["type"].(string)
  107. bidding_other_es = &elastic.Elastic{
  108. S_esurl: econf_other["addr"].(string),
  109. I_size: util.IntAllDef(econf_other["pool"], 5),
  110. }
  111. bidding_other_es.InitElasticSize()
  112. }
  113. //
  114. if bidding["indexfields"] != nil {
  115. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  116. }
  117. if bidding["projectinfo"] != nil {
  118. pf := util.ObjToString(bidding["projectinfo"])
  119. if pf != "" {
  120. projectinfoFields = strings.Split(pf, ",")
  121. }
  122. }
  123. if bidding["purchasinglist"] != nil {
  124. pcl := util.ObjToString(bidding["purchasinglist"])
  125. if pcl != "" {
  126. purchasinglistFields = strings.Split(pcl, ",")
  127. }
  128. }
  129. if bidding["winnerorder"] != nil {
  130. winnerorder := util.ObjToString(bidding["winnerorder"])
  131. if winnerorder != "" {
  132. winnerorderlistFields = strings.Split(winnerorder, ",")
  133. }
  134. }
  135. if bidding["multiIndex"] != nil {
  136. mi := util.ObjToString(bidding["multiIndex"])
  137. if mi != "" {
  138. multiIndex = strings.Split(mi, ",")
  139. }
  140. }
  141. //
  142. if bidding["indexfieldsmap"] != nil {
  143. for k, v := range bidding["indexfieldsmap"].(map[string]interface{}) {
  144. biddingIndexFieldsMap[k] = util.ObjToString(v)
  145. }
  146. log.Println(biddingIndexFieldsMap)
  147. }
  148. if bidding["projectinfomap"] != nil {
  149. for k, v := range bidding["projectinfomap"].(map[string]interface{}) {
  150. projectinfoFieldsMap[k] = util.ObjToString(v)
  151. }
  152. log.Println(projectinfoFieldsMap)
  153. }
  154. if bidding["purchasinglistmap"] != nil {
  155. for k, v := range bidding["purchasinglistmap"].(map[string]interface{}) {
  156. purchasinglistFieldsMap[k] = util.ObjToString(v)
  157. }
  158. log.Println(purchasinglistFieldsMap)
  159. }
  160. if bidding["winnerordermap"] != nil {
  161. for k, v := range bidding["winnerordermap"].(map[string]interface{}) {
  162. winnerorderlistFieldsMap[k] = util.ObjToString(v)
  163. }
  164. log.Println(winnerorderlistFieldsMap)
  165. }
  166. log.Println(projectinfoFields)
  167. log.Println(purchasinglistFields)
  168. //初始化oss
  169. u.InitOss()
  170. }
  171. func main() {
  172. go task_index()
  173. updport := Sysconfig["udpport"].(string)
  174. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  175. udpclient.Listen(processUdpMsg)
  176. log.Println("Udp服务监听", updport)
  177. // time.Sleep(99999 * time.Hour)
  178. ch := make(chan bool, 1)
  179. <-ch
  180. }
  181. var pool = make(chan bool, 20)
  182. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  183. switch act {
  184. case mu.OP_TYPE_DATA: //上个节点的数据
  185. //从表中开始处理生成企业数据
  186. var mapInfo map[string]interface{}
  187. err := json.Unmarshal(data, &mapInfo)
  188. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  189. if err != nil {
  190. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  191. } else if mapInfo != nil {
  192. key, _ := mapInfo["key"].(string)
  193. if key == "" {
  194. key = "udpok"
  195. }
  196. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  197. tasktype, _ := mapInfo["stype"].(string)
  198. log.Println("tasktype:", tasktype)
  199. switch tasktype {
  200. case "winner":
  201. pool <- true
  202. go func() {
  203. defer func() {
  204. <-pool
  205. }()
  206. winnerTask(data, mapInfo)
  207. }()
  208. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  209. pool <- true
  210. go func() {
  211. defer func() {
  212. <-pool
  213. }()
  214. biddingTask(data, mapInfo)
  215. }()
  216. case "project":
  217. pool <- true
  218. go func() {
  219. defer func() {
  220. <-pool
  221. }()
  222. projectTask(data, project, mapInfo)
  223. }()
  224. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  225. pool <- true
  226. go func() {
  227. defer func() {
  228. <-pool
  229. }()
  230. biddingBackTask(data, mapInfo)
  231. }()
  232. case "biddingall": //合并并重新生成索引,不生成关键词
  233. pool <- true
  234. go func() {
  235. defer func() {
  236. <-pool
  237. }()
  238. biddingAllTask(data, mapInfo)
  239. }()
  240. case "biddingdata": //联表生成索引不合并,不生成关键词
  241. pool <- true
  242. go func() {
  243. defer func() {
  244. <-pool
  245. }()
  246. biddingDataTask(data, mapInfo)
  247. }()
  248. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  249. pool <- true
  250. go func() {
  251. defer func() {
  252. <-pool
  253. }()
  254. biddingMergeTask(data, mapInfo)
  255. }()
  256. case "buyer":
  257. pool <- true
  258. go func() {
  259. defer func() {
  260. <-pool
  261. }()
  262. buyerTask(data, mapInfo)
  263. }()
  264. case "winnerent": //标准库
  265. pool <- true
  266. go func() {
  267. defer func() {
  268. <-pool
  269. }()
  270. standardTask("winnerent", mapInfo)
  271. }()
  272. case "buyerent": //标准库
  273. pool <- true
  274. go func() {
  275. defer func() {
  276. <-pool
  277. }()
  278. standardTask("buyerent", mapInfo)
  279. }()
  280. case "agencyent": //标准库
  281. pool <- true
  282. go func() {
  283. defer func() {
  284. <-pool
  285. }()
  286. standardTask("agencyent", mapInfo)
  287. }()
  288. case "biddingdelbyextract": //根据repeat删除es
  289. pool <- true
  290. go func() {
  291. defer func() {
  292. <-pool
  293. }()
  294. biddingDelByExtract(data, mapInfo)
  295. }()
  296. case "biddingdelbyextracttype": //根据extracttype删除es
  297. pool <- true
  298. go func() {
  299. defer func() {
  300. <-pool
  301. }()
  302. biddingDelByExtracttype(data, mapInfo)
  303. }()
  304. default:
  305. pool <- true
  306. go func() {
  307. defer func() {
  308. <-pool
  309. }()
  310. defaultFunc(data, mapInfo)
  311. }()
  312. }
  313. }
  314. case mu.OP_NOOP: //下个节点回应
  315. log.Println("发送成功", string(data))
  316. }
  317. }