main.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. "time"
  12. u "./util"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. mgo *mongodb.MongodbSim //mongodb操作对象
  17. extractmgo *mongodb.MongodbSim //mongodb操作对象
  18. project2db *mongodb.MongodbSim //mongodb操作对象
  19. mgostandard *mongodb.MongodbSim //mongodb操作对象
  20. qyxydb *mongodb.MongodbSim //mongodb操作对象
  21. udpclient mu.UdpClient //udp对象
  22. updport string
  23. savesizei = 500
  24. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  25. projectinfoFields []string
  26. multiIndex []string
  27. BulkSize = 400
  28. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  29. )
  30. func init() {
  31. util.ReadConfig(&Sysconfig)
  32. inits()
  33. go checkMapJob()
  34. updport, _ = Sysconfig["updport"].(string)
  35. winner, _ = Sysconfig["winner"].(map[string]interface{})
  36. standard, _ = Sysconfig["standard"].(map[string]interface{})
  37. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  38. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  39. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  40. project, _ = Sysconfig["project"].(map[string]interface{})
  41. project2, _ = Sysconfig["project2"].(map[string]interface{})
  42. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  43. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  44. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  45. MongodbAddr: mconf["addr"].(string),
  46. Size: util.IntAllDef(mconf["pool"], 5),
  47. DbName: mconf["db"].(string),
  48. }
  49. mgo.InitPool()
  50. project2db = &mongodb.MongodbSim{
  51. MongodbAddr: project2["addr"].(string),
  52. Size: util.IntAllDef(project2["pool"], 5),
  53. DbName: project2["db"].(string),
  54. }
  55. project2db.InitPool()
  56. //企业信用
  57. qyxydb = &mongodb.MongodbSim{
  58. MongodbAddr: qyxy_ent["addr"].(string),
  59. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  60. DbName: qyxy_ent["db"].(string),
  61. }
  62. qyxydb.InitPool()
  63. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  64. if savedb == nil {
  65. log.Println("未设置保存数据库,默认使用招标库")
  66. extractmgo = mgo
  67. } else { //savedb为抽取连接
  68. addr, _ := savedb["addr"].(string)
  69. size := util.IntAllDef(savedb["size"], 5)
  70. db, _ := savedb["db"].(string)
  71. extractmgo = &mongodb.MongodbSim{
  72. MongodbAddr: addr,
  73. Size: size,
  74. DbName: db,
  75. }
  76. extractmgo.InitPool()
  77. }
  78. mgostandard = &mongodb.MongodbSim{
  79. MongodbAddr: standard["addr"].(string),
  80. Size: util.IntAllDef(standard["pool"], 5),
  81. DbName: standard["db"].(string),
  82. }
  83. mgostandard.InitPool()
  84. log.Println(standard["addr"].(string))
  85. econf := Sysconfig["elastic"].(map[string]interface{})
  86. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  87. if bidding["indexfields"] != nil {
  88. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  89. }
  90. if bidding["projectinfo"] != nil {
  91. pf := util.ObjToString(bidding["projectinfo"])
  92. if pf != "" {
  93. projectinfoFields = strings.Split(pf, ",")
  94. }
  95. }
  96. if bidding["multiIndex"] != nil {
  97. mi := util.ObjToString(bidding["multiIndex"])
  98. if mi != "" {
  99. multiIndex = strings.Split(mi, ",")
  100. }
  101. }
  102. log.Println(projectinfoFields)
  103. //初始化oss
  104. u.InitOss()
  105. }
  106. func main() {
  107. go task_index()
  108. updport := Sysconfig["udpport"].(string)
  109. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  110. udpclient.Listen(processUdpMsg)
  111. log.Println("Udp服务监听", updport)
  112. time.Sleep(99999 * time.Hour)
  113. }
  114. var pool = make(chan bool, 20)
  115. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  116. switch act {
  117. case mu.OP_TYPE_DATA: //上个节点的数据
  118. //从表中开始处理生成企业数据
  119. var mapInfo map[string]interface{}
  120. err := json.Unmarshal(data, &mapInfo)
  121. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  122. if err != nil {
  123. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  124. } else if mapInfo != nil {
  125. key, _ := mapInfo["key"].(string)
  126. if key == "" {
  127. key = "udpok"
  128. }
  129. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  130. tasktype, _ := mapInfo["stype"].(string)
  131. log.Println("tasktype:", tasktype)
  132. switch tasktype {
  133. case "winner":
  134. pool <- true
  135. go func() {
  136. defer func() {
  137. <-pool
  138. }()
  139. winnerTask(data, mapInfo)
  140. }()
  141. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  142. pool <- true
  143. go func() {
  144. defer func() {
  145. <-pool
  146. }()
  147. biddingTask(data, mapInfo)
  148. }()
  149. case "project":
  150. pool <- true
  151. go func() {
  152. defer func() {
  153. <-pool
  154. }()
  155. projectTask(data, project, mapInfo)
  156. }()
  157. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  158. pool <- true
  159. go func() {
  160. defer func() {
  161. <-pool
  162. }()
  163. biddingBackTask(data, mapInfo)
  164. }()
  165. case "biddingall": //合并并重新生成索引,不生成关键词
  166. pool <- true
  167. go func() {
  168. defer func() {
  169. <-pool
  170. }()
  171. biddingAllTask(data, mapInfo)
  172. }()
  173. case "biddingdata": //联表生成索引不合并,不生成关键词
  174. pool <- true
  175. go func() {
  176. defer func() {
  177. <-pool
  178. }()
  179. biddingDataTask(data, mapInfo)
  180. }()
  181. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  182. pool <- true
  183. go func() {
  184. defer func() {
  185. <-pool
  186. }()
  187. biddingMergeTask(data, mapInfo)
  188. }()
  189. case "buyer":
  190. pool <- true
  191. go func() {
  192. defer func() {
  193. <-pool
  194. }()
  195. buyerTask(data, mapInfo)
  196. }()
  197. case "winnerent": //标准库
  198. pool <- true
  199. go func() {
  200. defer func() {
  201. <-pool
  202. }()
  203. standardTask("winnerent", mapInfo)
  204. }()
  205. case "buyerent": //标准库
  206. pool <- true
  207. go func() {
  208. defer func() {
  209. <-pool
  210. }()
  211. standardTask("buyerent", mapInfo)
  212. }()
  213. case "agencyent": //标准库
  214. pool <- true
  215. go func() {
  216. defer func() {
  217. <-pool
  218. }()
  219. standardTask("agencyent", mapInfo)
  220. }()
  221. default:
  222. pool <- true
  223. go func() {
  224. defer func() {
  225. <-pool
  226. }()
  227. defaultFunc(data, mapInfo)
  228. }()
  229. }
  230. }
  231. case mu.OP_NOOP: //下个节点回应
  232. log.Println("发送成功", string(data))
  233. }
  234. }