main.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/util"
  8. elastic "qfw/util/elastic"
  9. "qfw/util/mongodb"
  10. "strings"
  11. "time"
  12. u "util"
  13. )
  14. var (
  15. Sysconfig map[string]interface{} //配置文件
  16. mgo *mongodb.MongodbSim //mongodb操作对象
  17. extractmgo *mongodb.MongodbSim //mongodb操作对象
  18. project2db *mongodb.MongodbSim //mongodb操作对象
  19. mgostandard *mongodb.MongodbSim //mongodb操作对象
  20. qyxydb *mongodb.MongodbSim //mongodb操作对象
  21. udpclient mu.UdpClient //udp对象
  22. updport string
  23. savesizei = 500
  24. biddingIndexFields = []string{"_id", "buyerclass", "s_winner", "title", "detail", "detail_bak", "area", "areaval", "site", "type", "amount", "bidopendate", "bidopentime", "buyer", "channel", "city", "comeintime", "contenthtml", "descript", "description", "extracttype", "href", "infoformat", "keywords", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "summary", "toptype", "urltop", "winner", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus"}
  25. projectinfoFields []string
  26. multiIndex []string
  27. purchasinglistFields []string
  28. BulkSize = 400
  29. detailLength = 50000
  30. fileLength = 50000
  31. //bidding_other连接信息
  32. bidding_other_es *elastic.Elastic
  33. other_index string
  34. other_itype string
  35. winner, bidding, biddingback, project, project2, buyer, standard, qyxy_ent map[string]interface{}
  36. )
  37. func init() {
  38. util.ReadConfig(&Sysconfig)
  39. inits()
  40. go checkMapJob()
  41. detailLength = util.IntAllDef(Sysconfig["detaillength"], 50000)
  42. fileLength = util.IntAllDef(Sysconfig["filelength"], 50000)
  43. updport, _ = Sysconfig["updport"].(string)
  44. winner, _ = Sysconfig["winner"].(map[string]interface{})
  45. standard, _ = Sysconfig["standard"].(map[string]interface{})
  46. buyer, _ = Sysconfig["buyer"].(map[string]interface{})
  47. bidding, _ = Sysconfig["bidding"].(map[string]interface{})
  48. biddingback, _ = Sysconfig["biddingback"].(map[string]interface{})
  49. project, _ = Sysconfig["project"].(map[string]interface{})
  50. project2, _ = Sysconfig["project2"].(map[string]interface{})
  51. qyxy_ent, _ = Sysconfig["qyxy_ent"].(map[string]interface{})
  52. mconf, _ := Sysconfig["mongodb"].(map[string]interface{})
  53. mgo = &mongodb.MongodbSim{ //mongodb为binding连接
  54. MongodbAddr: mconf["addr"].(string),
  55. Size: util.IntAllDef(mconf["pool"], 5),
  56. DbName: mconf["db"].(string),
  57. }
  58. mgo.InitPool()
  59. project2db = &mongodb.MongodbSim{
  60. MongodbAddr: project2["addr"].(string),
  61. Size: util.IntAllDef(project2["pool"], 5),
  62. DbName: project2["db"].(string),
  63. }
  64. project2db.InitPool()
  65. //企业信用
  66. qyxydb = &mongodb.MongodbSim{
  67. MongodbAddr: qyxy_ent["addr"].(string),
  68. Size: util.IntAllDef(qyxy_ent["pool"], 5),
  69. DbName: qyxy_ent["db"].(string),
  70. }
  71. qyxydb.InitPool()
  72. savedb, _ := Sysconfig["savedb"].(map[string]interface{})
  73. if savedb == nil {
  74. log.Println("未设置保存数据库,默认使用招标库")
  75. extractmgo = mgo
  76. } else { //savedb为抽取连接
  77. addr, _ := savedb["addr"].(string)
  78. size := util.IntAllDef(savedb["size"], 5)
  79. db, _ := savedb["db"].(string)
  80. extractmgo = &mongodb.MongodbSim{
  81. MongodbAddr: addr,
  82. Size: size,
  83. DbName: db,
  84. }
  85. extractmgo.InitPool()
  86. }
  87. mgostandard = &mongodb.MongodbSim{
  88. MongodbAddr: standard["addr"].(string),
  89. Size: util.IntAllDef(standard["pool"], 5),
  90. DbName: standard["db"].(string),
  91. }
  92. mgostandard.InitPool()
  93. //初始化es
  94. //bidding
  95. econf := Sysconfig["elastic"].(map[string]interface{})
  96. elastic.InitElasticSize(econf["addr"].(string), util.IntAllDef(econf["pool"], 5))
  97. //bidding_other
  98. if Sysconfig["elastic_other"] != nil {
  99. econf_other := Sysconfig["elastic_other"].(map[string]interface{})
  100. other_index = econf_other["index"].(string)
  101. other_itype = econf_other["type"].(string)
  102. bidding_other_es = &elastic.Elastic{
  103. S_esurl: econf_other["addr"].(string),
  104. I_size: util.IntAllDef(econf_other["pool"], 5),
  105. }
  106. bidding_other_es.InitElasticSize()
  107. }
  108. //
  109. if bidding["indexfields"] != nil {
  110. biddingIndexFields = util.ObjArrToStringArr(bidding["indexfields"].([]interface{}))
  111. }
  112. if bidding["projectinfo"] != nil {
  113. pf := util.ObjToString(bidding["projectinfo"])
  114. if pf != "" {
  115. projectinfoFields = strings.Split(pf, ",")
  116. }
  117. }
  118. if bidding["purchasinglist"] != nil {
  119. pcl := util.ObjToString(bidding["purchasinglist"])
  120. if pcl != "" {
  121. purchasinglistFields = strings.Split(pcl, ",")
  122. }
  123. }
  124. if bidding["multiIndex"] != nil {
  125. mi := util.ObjToString(bidding["multiIndex"])
  126. if mi != "" {
  127. multiIndex = strings.Split(mi, ",")
  128. }
  129. }
  130. log.Println(projectinfoFields)
  131. log.Println(purchasinglistFields)
  132. //初始化oss
  133. u.InitOss()
  134. }
  135. func main() {
  136. go task_index()
  137. updport := Sysconfig["udpport"].(string)
  138. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  139. udpclient.Listen(processUdpMsg)
  140. log.Println("Udp服务监听", updport)
  141. time.Sleep(99999 * time.Hour)
  142. }
  143. var pool = make(chan bool, 20)
  144. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  145. switch act {
  146. case mu.OP_TYPE_DATA: //上个节点的数据
  147. //从表中开始处理生成企业数据
  148. var mapInfo map[string]interface{}
  149. err := json.Unmarshal(data, &mapInfo)
  150. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  151. if err != nil {
  152. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  153. } else if mapInfo != nil {
  154. key, _ := mapInfo["key"].(string)
  155. if key == "" {
  156. key = "udpok"
  157. }
  158. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  159. tasktype, _ := mapInfo["stype"].(string)
  160. log.Println("tasktype:", tasktype)
  161. switch tasktype {
  162. case "winner":
  163. pool <- true
  164. go func() {
  165. defer func() {
  166. <-pool
  167. }()
  168. winnerTask(data, mapInfo)
  169. }()
  170. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  171. pool <- true
  172. go func() {
  173. defer func() {
  174. <-pool
  175. }()
  176. biddingTask(data, mapInfo)
  177. }()
  178. case "project":
  179. pool <- true
  180. go func() {
  181. defer func() {
  182. <-pool
  183. }()
  184. projectTask(data, project, mapInfo)
  185. }()
  186. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  187. pool <- true
  188. go func() {
  189. defer func() {
  190. <-pool
  191. }()
  192. biddingBackTask(data, mapInfo)
  193. }()
  194. case "biddingall": //合并并重新生成索引,不生成关键词
  195. pool <- true
  196. go func() {
  197. defer func() {
  198. <-pool
  199. }()
  200. biddingAllTask(data, mapInfo)
  201. }()
  202. case "biddingdata": //联表生成索引不合并,不生成关键词
  203. pool <- true
  204. go func() {
  205. defer func() {
  206. <-pool
  207. }()
  208. biddingDataTask(data, mapInfo)
  209. }()
  210. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  211. pool <- true
  212. go func() {
  213. defer func() {
  214. <-pool
  215. }()
  216. biddingMergeTask(data, mapInfo)
  217. }()
  218. case "buyer":
  219. pool <- true
  220. go func() {
  221. defer func() {
  222. <-pool
  223. }()
  224. buyerTask(data, mapInfo)
  225. }()
  226. case "winnerent": //标准库
  227. pool <- true
  228. go func() {
  229. defer func() {
  230. <-pool
  231. }()
  232. standardTask("winnerent", mapInfo)
  233. }()
  234. case "buyerent": //标准库
  235. pool <- true
  236. go func() {
  237. defer func() {
  238. <-pool
  239. }()
  240. standardTask("buyerent", mapInfo)
  241. }()
  242. case "agencyent": //标准库
  243. pool <- true
  244. go func() {
  245. defer func() {
  246. <-pool
  247. }()
  248. standardTask("agencyent", mapInfo)
  249. }()
  250. case "biddingdelbyextract": //根据repeat删除es
  251. pool <- true
  252. go func() {
  253. defer func() {
  254. <-pool
  255. }()
  256. biddingDelByExtract(data, mapInfo)
  257. }()
  258. case "biddingdelbyextracttype": //根据extracttype删除es
  259. pool <- true
  260. go func() {
  261. defer func() {
  262. <-pool
  263. }()
  264. biddingDelByExtracttype(data, mapInfo)
  265. }()
  266. default:
  267. pool <- true
  268. go func() {
  269. defer func() {
  270. <-pool
  271. }()
  272. defaultFunc(data, mapInfo)
  273. }()
  274. }
  275. }
  276. case mu.OP_NOOP: //下个节点回应
  277. log.Println("发送成功", string(data))
  278. }
  279. }