main.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. "time"
  12. u "util"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. mgo *mongodb.MongodbSim //mongodb操作对象
  17. extractmgo *mongodb.MongodbSim //mongodb操作对象
  18. project2db *mongodb.MongodbSim //mongodb操作对象
  19. mgostandard *mongodb.MongodbSim //mongodb操作对象
  20. qyxydb *mongodb.MongodbSim //mongodb操作对象
  21. udpclient mu.UdpClient //udp对象
  22. updport string
  23. savesizei = 500
  24. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  25. projectinfoFields []string
  26. multiIndex []string
  27. purchasinglistFields []string
  28. BulkSize = 400
  29. //bidding_other连接信息
  30. bidding_other_es *elastic.Elastic
  31. other_index string
  32. other_itype string
  33. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  34. )
  35. func init() {
  36. util.ReadConfig(&Sysconfig)
  37. inits()
  38. go checkMapJob()
  39. updport, _ = Sysconfig["updport"].(string)
  40. winner, _ = Sysconfig["winner"].(map[string]interface{})
  41. standard, _ = Sysconfig["standard"].(map[string]interface{})
  42. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  43. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  44. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  45. project, _ = Sysconfig["project"].(map[string]interface{})
  46. project2, _ = Sysconfig["project2"].(map[string]interface{})
  47. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  48. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  49. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  50. MongodbAddr: mconf["addr"].(string),
  51. Size: util.IntAllDef(mconf["pool"], 5),
  52. DbName: mconf["db"].(string),
  53. }
  54. mgo.InitPool()
  55. project2db = &mongodb.MongodbSim{
  56. MongodbAddr: project2["addr"].(string),
  57. Size: util.IntAllDef(project2["pool"], 5),
  58. DbName: project2["db"].(string),
  59. }
  60. project2db.InitPool()
  61. //企业信用
  62. qyxydb = &mongodb.MongodbSim{
  63. MongodbAddr: qyxy_ent["addr"].(string),
  64. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  65. DbName: qyxy_ent["db"].(string),
  66. }
  67. qyxydb.InitPool()
  68. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  69. if savedb == nil {
  70. log.Println("未设置保存数据库,默认使用招标库")
  71. extractmgo = mgo
  72. } else { //savedb为抽取连接
  73. addr, _ := savedb["addr"].(string)
  74. size := util.IntAllDef(savedb["size"], 5)
  75. db, _ := savedb["db"].(string)
  76. extractmgo = &mongodb.MongodbSim{
  77. MongodbAddr: addr,
  78. Size: size,
  79. DbName: db,
  80. }
  81. extractmgo.InitPool()
  82. }
  83. mgostandard = &mongodb.MongodbSim{
  84. MongodbAddr: standard["addr"].(string),
  85. Size: util.IntAllDef(standard["pool"], 5),
  86. DbName: standard["db"].(string),
  87. }
  88. mgostandard.InitPool()
  89. //初始化es
  90. //bidding
  91. econf := Sysconfig["elastic"].(map[string]interface{})
  92. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  93. //bidding_other
  94. if Sysconfig["elastic_other"] != nil {
  95. econf_other := Sysconfig["elastic_other"].(map[string]interface{})
  96. other_index = econf_other["index"].(string)
  97. other_itype = econf_other["type"].(string)
  98. bidding_other_es = &elastic.Elastic{
  99. S_esurl: econf_other["addr"].(string),
  100. I_size: util.IntAllDef(econf_other["pool"], 5),
  101. }
  102. bidding_other_es.InitElasticSize()
  103. }
  104. //
  105. if bidding["indexfields"] != nil {
  106. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  107. }
  108. if bidding["projectinfo"] != nil {
  109. pf := util.ObjToString(bidding["projectinfo"])
  110. if pf != "" {
  111. projectinfoFields = strings.Split(pf, ",")
  112. }
  113. }
  114. if bidding["purchasinglist"] != nil {
  115. pcl := util.ObjToString(bidding["purchasinglist"])
  116. if pcl != "" {
  117. purchasinglistFields = strings.Split(pcl, ",")
  118. }
  119. }
  120. if bidding["multiIndex"] != nil {
  121. mi := util.ObjToString(bidding["multiIndex"])
  122. if mi != "" {
  123. multiIndex = strings.Split(mi, ",")
  124. }
  125. }
  126. log.Println(projectinfoFields)
  127. log.Println(purchasinglistFields)
  128. //初始化oss
  129. u.InitOss()
  130. }
  131. func main() {
  132. go task_index()
  133. //task_qyxyindex()
  134. updport := Sysconfig["udpport"].(string)
  135. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  136. udpclient.Listen(processUdpMsg)
  137. log.Println("Udp服务监听", updport)
  138. time.Sleep(99999 * time.Hour)
  139. }
  140. var pool = make(chan bool, 20)
  141. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  142. switch act {
  143. case mu.OP_TYPE_DATA: //上个节点的数据
  144. //从表中开始处理生成企业数据
  145. var mapInfo map[string]interface{}
  146. err := json.Unmarshal(data, &mapInfo)
  147. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  148. if err != nil {
  149. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  150. } else if mapInfo != nil {
  151. key, _ := mapInfo["key"].(string)
  152. if key == "" {
  153. key = "udpok"
  154. }
  155. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  156. tasktype, _ := mapInfo["stype"].(string)
  157. log.Println("tasktype:", tasktype)
  158. switch tasktype {
  159. case "winner":
  160. pool <- true
  161. go func() {
  162. defer func() {
  163. <-pool
  164. }()
  165. winnerTask(data, mapInfo)
  166. }()
  167. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  168. pool <- true
  169. go func() {
  170. defer func() {
  171. <-pool
  172. }()
  173. biddingTask(data, mapInfo)
  174. }()
  175. case "project":
  176. pool <- true
  177. go func() {
  178. defer func() {
  179. <-pool
  180. }()
  181. projectTask(data, project, mapInfo)
  182. }()
  183. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  184. pool <- true
  185. go func() {
  186. defer func() {
  187. <-pool
  188. }()
  189. biddingBackTask(data, mapInfo)
  190. }()
  191. case "biddingall": //合并并重新生成索引,不生成关键词
  192. pool <- true
  193. go func() {
  194. defer func() {
  195. <-pool
  196. }()
  197. biddingAllTask(data, mapInfo)
  198. }()
  199. case "biddingdata": //联表生成索引不合并,不生成关键词
  200. pool <- true
  201. go func() {
  202. defer func() {
  203. <-pool
  204. }()
  205. biddingDataTask(data, mapInfo)
  206. }()
  207. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  208. pool <- true
  209. go func() {
  210. defer func() {
  211. <-pool
  212. }()
  213. biddingMergeTask(data, mapInfo)
  214. }()
  215. case "buyer":
  216. pool <- true
  217. go func() {
  218. defer func() {
  219. <-pool
  220. }()
  221. buyerTask(data, mapInfo)
  222. }()
  223. case "winnerent": //标准库
  224. pool <- true
  225. go func() {
  226. defer func() {
  227. <-pool
  228. }()
  229. standardTask("winnerent", mapInfo)
  230. }()
  231. case "buyerent": //标准库
  232. pool <- true
  233. go func() {
  234. defer func() {
  235. <-pool
  236. }()
  237. standardTask("buyerent", mapInfo)
  238. }()
  239. case "agencyent": //标准库
  240. pool <- true
  241. go func() {
  242. defer func() {
  243. <-pool
  244. }()
  245. standardTask("agencyent", mapInfo)
  246. }()
  247. case "biddingdel": //标准库
  248. pool <- true
  249. go func() {
  250. defer func() {
  251. <-pool
  252. }()
  253. biddingDel(data, mapInfo)
  254. }()
  255. default:
  256. pool <- true
  257. go func() {
  258. defer func() {
  259. <-pool
  260. }()
  261. defaultFunc(data, mapInfo)
  262. }()
  263. }
  264. }
  265. case mu.OP_NOOP: //下个节点回应
  266. log.Println("发送成功", string(data))
  267. }
  268. }