task.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/cron"
  5. "go.mongodb.org/mongo-driver/bson"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "log"
  8. "mongodb"
  9. "qfw/util"
  10. "regexp"
  11. "strings"
  12. "time"
  13. )
  14. func TimeTask() {
  15. //GetPaData()
  16. c := cron.New()
  17. cronstrBd := "0 0 */" + fmt.Sprint(BdTaskTime) + " * * ?" //每TaskTime小时执行一次
  18. //cronstr := "0 0 " + fmt.Sprint(TaskTime) + " * * ?" //每天TaskTime跑一次
  19. cronstrPa := "0 0 15 ? * " + fmt.Sprint(PaTaskTime) //凭安增量数据每周二跑一次
  20. _ = c.AddFunc(cronstrBd, func() { GetBdData() })
  21. _ = c.AddFunc(cronstrPa, func() { GetPaData() })
  22. c.Start()
  23. }
  24. func GetBdData() {
  25. count := 0
  26. lastid := ""
  27. sess := MgoBd.GetMgoConn()
  28. defer MgoBd.DestoryMongoConn(sess)
  29. fields := map[string]interface{}{"data": 1, "down_time": 1}
  30. q := bson.M{"down_time": bson.M{"$gt": LastTime}}
  31. query := sess.DB(Dbname_bd).C(CollBd).Find(q).Select(fields).Iter()
  32. tmp := make(map[string]interface{})
  33. for query.Next(&tmp) {
  34. lastid = mongodb.BsonIdToSId(tmp["_id"])
  35. if count%1000 == 0 {
  36. util.Debug("baidu ----current----", count, lastid)
  37. }
  38. findEnt(tmp)
  39. count++
  40. }
  41. util.Debug("baidu 处理", count, "条数据", ",lasttime---", LastTime)
  42. }
  43. func GetPaData() {
  44. count := 0
  45. lastid := ""
  46. sess := MgoMix.GetMgoConn()
  47. defer MgoMix.DestoryMongoConn(sess)
  48. fields := map[string]interface{}{"changes": 1, "company_id": 1, "company_name": 1, "company_type": 1, "establish_date": 1, "create_time": 1}
  49. query := sess.DB(Dbname_pa).C(CollPa).Find(nil).Select(fields).Iter()
  50. c := MgoMix.Count(CollPa, nil)
  51. util.Debug("ping an count ------", c)
  52. tmp := make(map[string]interface{})
  53. for query.Next(&tmp) {
  54. lastid = mongodb.BsonIdToSId(tmp["company_id"])
  55. if count%1000 == 0 {
  56. util.Debug("ping an ----current-----", count, lastid)
  57. }
  58. if strings.Contains(util.ObjToString(tmp["company_type"]), "个体") {
  59. continue
  60. }
  61. currentTime := time.Now().Unix()
  62. if tmp["changes"] != nil && len(tmp["changes"].([]interface{})) > 0 {
  63. delete(tmp, "establish_date")
  64. q := bson.M{"company_name": tmp["company_name"]}
  65. changeEnt, _ := MgoMix.FindOne(CollSave, q)
  66. if changeEnt != nil && len(*changeEnt) > 0 {
  67. tmpList := tmp["changes"].([]interface{})
  68. changeList := clearRepeat((*changeEnt)["changes"].([]interface{}))
  69. if len(tmpList) > len(changeList) {
  70. infoList := clearRepeat(tmp["changes"].([]interface{}))
  71. for _, item := range infoList {
  72. item1 := item.(map[string]interface{})
  73. setMark(item1)
  74. }
  75. tmp["changes"] = infoList
  76. tmp["updatetime"] = currentTime
  77. }
  78. }else {
  79. infoList := clearRepeat(tmp["changes"].([]interface{}))
  80. for _, item := range infoList {
  81. item1 := item.(map[string]interface{})
  82. setMark(item1)
  83. }
  84. tmp["_id"] = primitive.NewObjectID()
  85. tmp["createtime"] = currentTime
  86. tmp["updatetime"] = currentTime
  87. }
  88. update := make(map[string]interface{})
  89. tmp["datasource"] = "pingan"
  90. update["$set"] = tmp
  91. updateInfo := []map[string]interface{}{
  92. {
  93. "_id": tmp["_id"],
  94. },
  95. update,
  96. }
  97. MgoSaveCache <- updateInfo
  98. count++
  99. }else {
  100. //{
  101. // "change_code": "100000",
  102. // "change_name": "新设立公司",
  103. // "change_push": true,
  104. // "change_info": "新设立公司",
  105. // "change_keyword": ["新设立"]
  106. //},
  107. setupData := ""
  108. if tmp["establish_date"] != nil {
  109. if timeTmp, ok := tmp["establish_date"].(primitive.DateTime); ok {
  110. t := timeTmp.Time()
  111. setupData = util.FormatDate(&t, util.Date_Short_Layout)
  112. } else if timeTmp, ok := tmp["establish_date"].(string); ok && timeTmp != "" {
  113. t := timeReg.FindString(timeTmp)
  114. if t != "" {
  115. setupData = t
  116. }
  117. }
  118. }
  119. createData := ""
  120. if tmp["create_time"] != nil {
  121. if timeTmp, ok := tmp["create_time"].(primitive.DateTime); ok {
  122. t := timeTmp.Time()
  123. createData = util.FormatDate(&t, util.Date_Short_Layout)
  124. } else if timeTmp, ok := tmp["create_time"].(string); ok && timeTmp != "" {
  125. t := timeReg.FindString(timeTmp)
  126. if t != "" {
  127. createData = t
  128. }
  129. }
  130. }
  131. tm2, _ := time.Parse("2006-01-02", createData)
  132. //当前时间17天内
  133. if tm2.Unix() < (time.Now().Unix() - 17 * 60 * 60 * 24) {
  134. continue
  135. }
  136. delete(tmp, "establish_date")
  137. delete(tmp, "create_time")
  138. changeInfo := make(map[string]interface{})
  139. changeInfo["change_field"] = "新设立公司"
  140. changeInfo["change_name_new"] = "新设立公司"
  141. changeInfo["content_before"] = ""
  142. changeInfo["content_after"] = "新设立公司"
  143. changeInfo["change_date"] = setupData
  144. tmp["changes"] = []map[string]interface{}{changeInfo}
  145. tmp["_id"] = primitive.NewObjectID()
  146. tmp["createtime"] = currentTime
  147. tmp["updatetime"] = currentTime
  148. tmp["datasource"] = "pingan"
  149. update := make(map[string]interface{})
  150. update["$set"] = tmp
  151. updateInfo := []map[string]interface{}{
  152. {
  153. "_id": tmp["_id"],
  154. },
  155. update,
  156. }
  157. MgoSaveCache <- updateInfo
  158. count++
  159. }
  160. }
  161. util.Debug("pingan 处理", count, "条数据")
  162. }
  163. func findEnt(tmp map[string]interface{}) {
  164. if LastTime < util.Int64All(tmp["down_time"]) {
  165. LastTime = util.Int64All(tmp["down_time"])
  166. }
  167. data := util.ObjToMap(tmp["data"])
  168. ent := util.ObjToMap((*data)["basicData"])
  169. changeData := util.ObjToMap((*data)["changeRecordData"])
  170. infoList := (*changeData)["list"].([]interface{})
  171. currentTime := time.Now().Unix()
  172. m := util.ObjToString((*ent)["entName"])
  173. m = strings.ReplaceAll(m, "(", "(")
  174. m = strings.ReplaceAll(m, ")", ")")
  175. q := bson.M{"company_name": m}
  176. changeEnt, _ := MgoMix.FindOne(CollSave, q)
  177. update := map[string]interface{}{}
  178. if changeEnt != nil && len(*changeEnt) > 0 {
  179. //1、企业变更库有该企业信息
  180. if (*changeEnt)["changes"] != nil{
  181. (*changeEnt)["updatetime"] = currentTime
  182. if len(infoList) > len((*changeEnt)["changes"].([]interface{})) {
  183. mapArr := setChangeInfo(infoList)
  184. for _, v := range mapArr{
  185. setMark(v)
  186. }
  187. (*changeEnt)["changes"] = mapArr
  188. }
  189. update["$set"] = *changeEnt
  190. updateInfo := []map[string]interface{}{
  191. {
  192. "_id": (*changeEnt)["_id"],
  193. },
  194. update,
  195. }
  196. MgoSaveCache <- updateInfo
  197. }
  198. } else {
  199. //2、企业变更库没有该企业信息
  200. paEnt, _ := MgoMix.FindOne(CollQy, q)
  201. saveEnt := map[string]interface{}{}
  202. if saveEnt != nil && len(*paEnt) > 0 {
  203. //3、企业库有该企业信息
  204. saveEnt["datasource"] = "baidu"
  205. saveEnt["_id"] = primitive.NewObjectID()
  206. saveEnt["company_id"] = (*paEnt)["company_id"]
  207. saveEnt["company_name"] = (*ent)["entName"]
  208. saveEnt["createtime"] = currentTime
  209. saveEnt["updatetime"] = currentTime
  210. if (*paEnt)["changes"] != nil{
  211. changeArr := (*paEnt)["changes"].([]interface{})
  212. mapArr := setChangeInfo(infoList)
  213. for _, v := range util.ObjArrToMapArr(changeArr){
  214. setMark(v)
  215. mapArr = append(mapArr, v)
  216. }
  217. saveEnt["changes"] = mapArr
  218. }else {
  219. saveEnt["changes"] = setChangeInfo(infoList)
  220. }
  221. update["$set"] = saveEnt
  222. updateInfo := []map[string]interface{}{
  223. {
  224. "_id": saveEnt["_id"],
  225. },
  226. update,
  227. }
  228. MgoSaveCache <- updateInfo
  229. } else {
  230. //4、企业库没有该企业信息
  231. saveEnt["company_name"] = (*ent)["entName"]
  232. saveEnt["createtime"] = currentTime
  233. saveEnt["changes"] = setChangeInfo(infoList)
  234. MgoMix.Save(CollBack, saveEnt)
  235. }
  236. }
  237. }
  238. func setChangeInfo(list []interface{}) []map[string]interface{} {
  239. var arr []map[string]interface{}
  240. for _, item := range list {
  241. tmp := map[string]interface{}{}
  242. item1 := item.(map[string]interface{})
  243. tmp["change_date"] = item1["date"]
  244. tmp["content_before"] = item1["oldValue"]
  245. tmp["content_after"] = item1["newValue"]
  246. tmp["change_field"] = item1["fieldName"]
  247. setMark(tmp)
  248. arr = append(arr, tmp)
  249. }
  250. return arr
  251. }
  252. func setMark(tmp map[string]interface{}) {
  253. for _, v := range ChangeMap {
  254. str := util.ObjToString(tmp["change_field"])
  255. regArr := v["change_key_reg"].([]string)
  256. for _, v1 := range regArr {
  257. matched, _ := regexp.MatchString(v1, str)
  258. if matched {
  259. tmp["change_name_new"] = v["change_name"]
  260. return
  261. }
  262. }
  263. }
  264. }
  265. func clearRepeat(list []interface{}) []interface{} {
  266. var tmp []interface{}
  267. if len(list) > 1 {
  268. for k, v := range list{
  269. if k < len(list)-1 {
  270. if fmt.Sprint(list[k]) != fmt.Sprint(list[k+1]) {
  271. tmp = append(tmp, v)
  272. }
  273. }else {
  274. tmp = append(tmp, v)
  275. }
  276. }
  277. return tmp
  278. }else {
  279. return list
  280. }
  281. }
  282. var MgoSaveCache = make(chan []map[string]interface{}, 2000)
  283. var SP = make(chan bool, 5)
  284. func SaveData() {
  285. log.Println("Mgo Save...")
  286. arru := make([][]map[string]interface{}, 200)
  287. indexu := 0
  288. for {
  289. select {
  290. case v := <-MgoSaveCache:
  291. arru[indexu] = v
  292. indexu++
  293. if indexu == 200 {
  294. SP <- true
  295. go func(arru [][]map[string]interface{}) {
  296. defer func() {
  297. <-SP
  298. }()
  299. MgoMix.UpSertBulk(CollSave, arru...)
  300. }(arru)
  301. arru = make([][]map[string]interface{}, 200)
  302. indexu = 0
  303. }
  304. case <-time.After(1000 * time.Millisecond):
  305. if indexu > 0 {
  306. SP <- true
  307. go func(arru [][]map[string]interface{}) {
  308. defer func() {
  309. <-SP
  310. }()
  311. MgoMix.UpSertBulk(CollSave, arru...)
  312. }(arru[:indexu])
  313. arru = make([][]map[string]interface{}, 200)
  314. indexu = 0
  315. }
  316. }
  317. }
  318. }