main.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. "time"
  12. )
  13. var (
  14. Sysconfig map[string]interface{} //配置文件
  15. mgo *mongodb.MongodbSim //mongodb操作对象
  16. extractmgo *mongodb.MongodbSim //mongodb操作对象
  17. project2db *mongodb.MongodbSim //mongodb操作对象
  18. mgostandard *mongodb.MongodbSim //mongodb操作对象
  19. udpclient mu.UdpClient //udp对象
  20. updport string
  21. savesizei = 500
  22. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  23. projectinfoFields []string
  24. multiIndex []string
  25. BulkSize = 400
  26. winner, bidding, biddingback, project, project2, buyer, standard map[string]interface{}
  27. )
  28. func init() {
  29. util.ReadConfig(&Sysconfig)
  30. inits()
  31. go checkMapJob()
  32. updport, _ = Sysconfig["updport"].(string)
  33. winner, _ = Sysconfig["winner"].(map[string]interface{})
  34. standard, _ = Sysconfig["standard"].(map[string]interface{})
  35. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  36. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  37. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  38. project, _ = Sysconfig["project"].(map[string]interface{})
  39. project2, _ = Sysconfig["project2"].(map[string]interface{})
  40. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  41. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  42. MongodbAddr: mconf["addr"].(string),
  43. Size: util.IntAllDef(mconf["pool"], 5),
  44. DbName: mconf["db"].(string),
  45. }
  46. mgo.InitPool()
  47. project2db = &mongodb.MongodbSim{
  48. MongodbAddr: project2["addr"].(string),
  49. Size: util.IntAllDef(project2["pool"], 5),
  50. DbName: project2["db"].(string),
  51. }
  52. project2db.InitPool()
  53. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  54. if savedb == nil {
  55. log.Println("未设置保存数据库,默认使用招标库")
  56. extractmgo = mgo
  57. } else { //savedb为抽取连接
  58. addr, _ := savedb["addr"].(string)
  59. size := util.IntAllDef(savedb["size"], 5)
  60. db, _ := savedb["db"].(string)
  61. extractmgo = &mongodb.MongodbSim{
  62. MongodbAddr: addr,
  63. Size: size,
  64. DbName: db,
  65. }
  66. extractmgo.InitPool()
  67. }
  68. mgostandard = &mongodb.MongodbSim{
  69. MongodbAddr: standard["addr"].(string),
  70. Size: util.IntAllDef(standard["pool"], 5),
  71. DbName: standard["db"].(string),
  72. }
  73. mgostandard.InitPool()
  74. log.Println(standard["addr"].(string))
  75. econf := Sysconfig["elastic"].(map[string]interface{})
  76. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  77. if bidding["indexfields"] != nil {
  78. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  79. }
  80. if bidding["projectinfo"] != nil {
  81. pf := util.ObjToString(bidding["projectinfo"])
  82. if pf != "" {
  83. projectinfoFields = strings.Split(pf, ",")
  84. }
  85. }
  86. if bidding["multiIndex"] != nil {
  87. mi := util.ObjToString(bidding["multiIndex"])
  88. if mi != "" {
  89. multiIndex = strings.Split(mi, ",")
  90. }
  91. }
  92. log.Println(projectinfoFields)
  93. }
  94. func main() {
  95. go task_index()
  96. updport := Sysconfig["udpport"].(string)
  97. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  98. udpclient.Listen(processUdpMsg)
  99. log.Println("Udp服务监听", updport)
  100. time.Sleep(99999 * time.Hour)
  101. }
  102. var pool = make(chan bool, 20)
  103. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  104. switch act {
  105. case mu.OP_TYPE_DATA: //上个节点的数据
  106. //从表中开始处理生成企业数据
  107. var mapInfo map[string]interface{}
  108. err := json.Unmarshal(data, &mapInfo)
  109. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  110. if err != nil {
  111. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  112. } else if mapInfo != nil {
  113. key, _ := mapInfo["key"].(string)
  114. if key == "" {
  115. key = "udpok"
  116. }
  117. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  118. tasktype, _ := mapInfo["stype"].(string)
  119. log.Println("tasktype:", tasktype)
  120. switch tasktype {
  121. case "winner":
  122. pool <- true
  123. go func() {
  124. defer func() {
  125. <-pool
  126. }()
  127. winnerTask(data, mapInfo)
  128. }()
  129. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  130. pool <- true
  131. go func() {
  132. defer func() {
  133. <-pool
  134. }()
  135. biddingTask(data, mapInfo)
  136. }()
  137. case "project":
  138. pool <- true
  139. go func() {
  140. defer func() {
  141. <-pool
  142. }()
  143. projectTask(data, project, mapInfo)
  144. }()
  145. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  146. pool <- true
  147. go func() {
  148. defer func() {
  149. <-pool
  150. }()
  151. biddingBackTask(data, mapInfo)
  152. }()
  153. case "biddingall": //合并并重新生成索引,不生成关键词
  154. pool <- true
  155. go func() {
  156. defer func() {
  157. <-pool
  158. }()
  159. biddingAllTask(data, mapInfo)
  160. }()
  161. case "biddingdata": //联表生成索引不合并,不生成关键词
  162. pool <- true
  163. go func() {
  164. defer func() {
  165. <-pool
  166. }()
  167. biddingDataTask(data, mapInfo)
  168. }()
  169. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  170. pool <- true
  171. go func() {
  172. defer func() {
  173. <-pool
  174. }()
  175. biddingMergeTask(data, mapInfo)
  176. }()
  177. case "buyer":
  178. pool <- true
  179. go func() {
  180. defer func() {
  181. <-pool
  182. }()
  183. buyerTask(data, mapInfo)
  184. }()
  185. case "winnerent": //标准库
  186. pool <- true
  187. go func() {
  188. defer func() {
  189. <-pool
  190. }()
  191. standardTask("winnerent", mapInfo)
  192. }()
  193. case "buyerent": //标准库
  194. pool <- true
  195. go func() {
  196. defer func() {
  197. <-pool
  198. }()
  199. standardTask("buyerent", mapInfo)
  200. }()
  201. case "agencyent": //标准库
  202. pool <- true
  203. go func() {
  204. defer func() {
  205. <-pool
  206. }()
  207. standardTask("agencyent", mapInfo)
  208. }()
  209. default:
  210. pool <- true
  211. go func() {
  212. defer func() {
  213. <-pool
  214. }()
  215. defaultFunc(data, mapInfo)
  216. }()
  217. }
  218. }
  219. case mu.OP_NOOP: //下个节点回应
  220. log.Println("发送成功", string(data))
  221. }
  222. }