main.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. "time"
  12. )
  13. var (
  14. Sysconfig map[string]interface{} //配置文件
  15. mgo *mongodb.MongodbSim //mongodb操作对象
  16. extractmgo *mongodb.MongodbSim //mongodb操作对象
  17. mgostandard *mongodb.MongodbSim //mongodb操作对象
  18. udpclient mu.UdpClient //udp对象
  19. updport string
  20. savesizei = 500
  21. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  22. projectinfoFields []string
  23. multiIndex []string
  24. BulkSize = 400
  25. winner, bidding, biddingback, project, buyer, standard map[string]interface{}
  26. )
  27. func init() {
  28. util.ReadConfig(&Sysconfig)
  29. inits()
  30. go checkMapJob()
  31. updport, _ = Sysconfig["updport"].(string)
  32. winner, _ = Sysconfig["winner"].(map[string]interface{})
  33. standard, _ = Sysconfig["standard"].(map[string]interface{})
  34. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  35. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  36. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  37. project, _ = Sysconfig["project"].(map[string]interface{})
  38. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  39. mgo = &mongodb.MongodbSim{
  40. MongodbAddr: mconf["addr"].(string),
  41. Size: util.IntAllDef(mconf["pool"], 5),
  42. DbName: mconf["db"].(string),
  43. }
  44. mgo.InitPool()
  45. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  46. if savedb == nil {
  47. log.Println("未设置保存数据库,默认使用招标库")
  48. extractmgo = mgo
  49. } else {
  50. addr, _ := savedb["addr"].(string)
  51. size := util.IntAllDef(savedb["size"], 5)
  52. db, _ := savedb["db"].(string)
  53. extractmgo = &mongodb.MongodbSim{
  54. MongodbAddr: addr,
  55. Size: size,
  56. DbName: db,
  57. }
  58. extractmgo.InitPool()
  59. }
  60. mgostandard = &mongodb.MongodbSim{
  61. MongodbAddr: standard["addr"].(string),
  62. Size: util.IntAllDef(standard["pool"], 5),
  63. DbName: standard["db"].(string),
  64. }
  65. mgostandard.InitPool()
  66. log.Println(standard["addr"].(string))
  67. econf := Sysconfig["elastic"].(map[string]interface{})
  68. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  69. if bidding["indexfields"] != nil {
  70. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  71. }
  72. if bidding["projectinfo"] != nil {
  73. pf := util.ObjToString(bidding["projectinfo"])
  74. if pf != "" {
  75. projectinfoFields = strings.Split(pf, ",")
  76. }
  77. }
  78. if bidding["multiIndex"] != nil {
  79. mi := util.ObjToString(bidding["multiIndex"])
  80. if mi != "" {
  81. multiIndex = strings.Split(mi, ",")
  82. }
  83. }
  84. log.Println(projectinfoFields)
  85. }
  86. func main() {
  87. updport := Sysconfig["udpport"].(string)
  88. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  89. udpclient.Listen(processUdpMsg)
  90. log.Println("Udp服务监听", updport)
  91. time.Sleep(99999 * time.Hour)
  92. }
  93. var pool = make(chan bool, 20)
  94. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  95. switch act {
  96. case mu.OP_TYPE_DATA: //上个节点的数据
  97. //从表中开始处理生成企业数据
  98. var mapInfo map[string]interface{}
  99. err := json.Unmarshal(data, &mapInfo)
  100. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  101. if err != nil {
  102. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  103. } else if mapInfo != nil {
  104. key, _ := mapInfo["key"].(string)
  105. if key == "" {
  106. key = "udpok"
  107. }
  108. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  109. tasktype, _ := mapInfo["stype"].(string)
  110. log.Println("tasktype:", tasktype)
  111. switch tasktype {
  112. case "winner":
  113. pool <- true
  114. go func() {
  115. defer func() {
  116. <-pool
  117. }()
  118. winnerTask(data, mapInfo)
  119. }()
  120. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  121. pool <- true
  122. go func() {
  123. defer func() {
  124. <-pool
  125. }()
  126. biddingTask(data, mapInfo)
  127. }()
  128. case "project":
  129. pool <- true
  130. go func() {
  131. defer func() {
  132. <-pool
  133. }()
  134. projectTask(data, mapInfo)
  135. }()
  136. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  137. pool <- true
  138. go func() {
  139. defer func() {
  140. <-pool
  141. }()
  142. biddingBackTask(data, mapInfo)
  143. }()
  144. case "biddingall": //合并并重新生成索引,不生成关键词
  145. pool <- true
  146. go func() {
  147. defer func() {
  148. <-pool
  149. }()
  150. biddingAllTask(data, mapInfo)
  151. }()
  152. case "biddingdata": //联表生成索引不合并,不生成关键词
  153. pool <- true
  154. go func() {
  155. defer func() {
  156. <-pool
  157. }()
  158. biddingDataTask(data, mapInfo)
  159. }()
  160. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  161. pool <- true
  162. go func() {
  163. defer func() {
  164. <-pool
  165. }()
  166. biddingMergeTask(data, mapInfo)
  167. }()
  168. case "buyer":
  169. pool <- true
  170. go func() {
  171. defer func() {
  172. <-pool
  173. }()
  174. buyerTask(data, mapInfo)
  175. }()
  176. case "winnerent": //标准库
  177. pool <- true
  178. go func() {
  179. defer func() {
  180. <-pool
  181. }()
  182. standardTask("winnerent", mapInfo)
  183. }()
  184. case "buyerent": //标准库
  185. pool <- true
  186. go func() {
  187. defer func() {
  188. <-pool
  189. }()
  190. standardTask("buyerent", mapInfo)
  191. }()
  192. case "agencyent": //标准库
  193. pool <- true
  194. go func() {
  195. defer func() {
  196. <-pool
  197. }()
  198. standardTask("agencyent", mapInfo)
  199. }()
  200. default:
  201. pool <- true
  202. go func() {
  203. defer func() {
  204. <-pool
  205. }()
  206. defaultFunc(data, mapInfo)
  207. }()
  208. }
  209. }
  210. case mu.OP_NOOP: //下个节点回应
  211. log.Println("发送成功", string(data))
  212. }
  213. }