timedTask.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. elastic "qfw/common/src/qfw/util/elastic"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "gopkg.in/mgo.v2/bson"
  10. "log"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. //定时任务
  16. //1.存异常表
  17. //2.合并原始库新增
  18. func TimedTask() {
  19. t2 := time.NewTimer(time.Second * 5)
  20. for range t2.C {
  21. tmpLast := map[string]interface{}{}
  22. if err := FClient.Database(Config["mgodb_extract_kf"]).Collection("tmp_winner_qyk").FindOne(context.TODO(), bson.M{}, options.FindOne().SetSort(bson.M{"_id": -1})).Decode(&tmpLast); err != nil {
  23. //临时表无数据
  24. log.Println("临时表无数据:", err)
  25. t2.Reset(time.Minute * 5)
  26. continue
  27. } else {
  28. //临时表有数据
  29. log.Println("临时表有数据:", tmpLast)
  30. cursor, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("tmp_winner_qyk").Find(context.TODO(), bson.M{
  31. "_id": bson.M{
  32. "$lte": tmpLast["_id"],
  33. },
  34. }, options.Find().SetSort(bson.M{"_id": 1}))
  35. if err != nil {
  36. log.Println(err)
  37. t2.Reset(time.Second * 5)
  38. continue
  39. }
  40. //遍历临时表数据,匹配不到原始库存入异常表
  41. for cursor.Next(context.TODO()) {
  42. tmp := make(map[string]interface{})
  43. if err := cursor.Decode(&tmp); err == nil {
  44. resulttmp := make(map[string]interface{})
  45. r := FClient.Database(Config["mgodb_enterprise"]).Collection(Config["mgodb_enterprise_c"]).FindOne(context.TODO(), bson.M{"company_name": tmp["winner"]}).Decode(&resulttmp)
  46. if r != nil {
  47. //log.Println(r)
  48. //匹配不到原始库,存入异常表删除临时表
  49. FClient.Database(Config["mgodb_extract_kf"]).Collection("err_winner_qyk").InsertOne(context.TODO(), tmp)
  50. FClient.Database(Config["mgodb_extract_kf"]).Collection("tmp_winner_qyk").DeleteOne(context.TODO(), tmp)
  51. continue
  52. } else {
  53. //log.Println(123)
  54. //匹配到原始库,新增 resulttmp
  55. if resulttmp["credit_no"] != nil {
  56. if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
  57. len(strings.TrimSpace(credit_no)) > 8 {
  58. dataNo := strings.TrimSpace(credit_no)[2:8]
  59. if Addrs[dataNo] != nil {
  60. if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
  61. if resulttmp["province"] == nil || resulttmp["province"] == "" {
  62. resulttmp["province"] = v["province"]
  63. }
  64. resulttmp["city"] = v["city"]
  65. resulttmp["district"] = v["district"]
  66. }
  67. }
  68. }
  69. }
  70. contacts := make([]map[string]interface{}, 0)
  71. contact := make(map[string]interface{}, 0)
  72. if resulttmp["legal_person"] != nil {
  73. contact["contact_person"] = resulttmp["legal_person"] //联系人
  74. } else {
  75. contact["contact_person"] = "" //联系人
  76. }
  77. contact["contact_type"] = "法定代表人" //法定代表人
  78. //log.Println(1)
  79. if resulttmp["annual_reports"] != nil {
  80. bytes, err := json.Marshal(resulttmp["annual_reports"])
  81. if err != nil {
  82. log.Println("annual_reports err:", err)
  83. }
  84. //log.Println(2, string(bytes))
  85. phonetmp := make([]map[string]interface{}, 0)
  86. err = json.Unmarshal(bytes, &phonetmp)
  87. if err != nil {
  88. log.Println("Unmarshal err:", err)
  89. }
  90. //log.Println(44, err)
  91. for _, vv := range phonetmp {
  92. if vv["company_phone"] != nil {
  93. if vv["company_phone"] == "" {
  94. continue
  95. } else {
  96. contact["phone"] = vv["company_phone"] //联系电话
  97. break
  98. }
  99. } else {
  100. contact["phone"] = "" //联系电话
  101. }
  102. }
  103. }
  104. //log.Println(k, contact["phone"], resulttmp["_id"])
  105. //time.Sleep(10 * time.Second)
  106. if contact["phone"] == nil {
  107. contact["phone"] = "" //联系电话
  108. }
  109. contact["topscopeclass"] = "企业公示" //项目类型
  110. contact["updatetime"] = time.Now().Unix() //更新时间
  111. contacts = append(contacts, contact)
  112. resulttmp["contact"] = contacts
  113. savetmp := make(map[string]interface{}, 0)
  114. for _, sk := range Fields {
  115. if sk == "establish_date" {
  116. if resulttmp[sk] != nil {
  117. savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
  118. continue
  119. }
  120. } else if sk == "capital" {
  121. //log.Println(sk, resulttmp[sk])
  122. savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
  123. continue
  124. } else if sk == "partners" {
  125. //log.Println(sk, resulttmp[sk], )
  126. //fmt.Println(reflect.TypeOf(resulttmp[sk]))
  127. if resulttmp[sk] != nil {
  128. if ppms, ok := resulttmp[sk].(primitive.A); ok {
  129. for i, _ := range ppms {
  130. if ppms[i].(map[string]interface{})["stock_type"] != nil {
  131. ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
  132. }
  133. delete(ppms[i].(map[string]interface{}), "identify_type")
  134. }
  135. savetmp[sk] = ppms
  136. }
  137. }else {
  138. savetmp[sk] = []interface{}{}
  139. }
  140. continue
  141. } else if sk == "_id" {
  142. savetmp["tmp"+sk] = resulttmp[sk]
  143. continue
  144. } else if sk == "area_code" {
  145. //行政区划代码
  146. savetmp[sk] = fmt.Sprint(resulttmp[sk])
  147. continue
  148. } else if sk == "report_websites" {
  149. //网址
  150. if resulttmp["report_websites"] == nil {
  151. savetmp["website"] = ""
  152. } else {
  153. report_websitesArr := []string{}
  154. if ppms, ok := resulttmp[sk].(primitive.A); ok {
  155. for _, v := range ppms {
  156. if vvv, ok := v.(map[string]interface{}); ok {
  157. if rv, ok := vvv["website_url"].(string); ok {
  158. report_websitesArr = append(report_websitesArr, rv)
  159. }
  160. }
  161. }
  162. }
  163. sort.Strings(report_websitesArr)
  164. savetmp["website"] = strings.Join(report_websitesArr, ";")
  165. }
  166. continue
  167. } else if sk == "wechat_accounts" {
  168. savetmp[sk] = []interface{}{}
  169. continue
  170. }
  171. if resulttmp[sk] == nil && sk != "history_name" &&sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
  172. savetmp[sk] = ""
  173. } else {
  174. savetmp[sk] = resulttmp[sk]
  175. }
  176. }
  177. //tmps = append(tmps, savetmp)
  178. savetmp["updatatime"] =time.Now().Unix()
  179. //保存mongo
  180. result, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
  181. InsertOne(context.TODO(), savetmp)
  182. if err == nil {
  183. //保存redis
  184. rc := RedisPool.Get()
  185. defer rc.Close()
  186. var _id string
  187. if v, ok := result.InsertedID.(primitive.ObjectID); ok {
  188. _id = v.Hex()
  189. }
  190. if _, err := rc.Do("SET", savetmp["company_name"], result.InsertedID.(primitive.ObjectID).Hex()); err != nil {
  191. log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
  192. } else {
  193. //保存es
  194. delete(savetmp, "_id")
  195. esConn := elastic.GetEsConn()
  196. defer elastic.DestoryEsConn(esConn)
  197. if _, err := esConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
  198. log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
  199. } else {
  200. //删除临时表
  201. FClient.Database(Config["mgodb_extract_kf"]).Collection("tmp_winner_qyk").DeleteOne(context.TODO(), tmp)
  202. }
  203. }
  204. } else {
  205. log.Println("save mongo err:", err, tmp["_id"])
  206. }
  207. }
  208. }
  209. }
  210. defer cursor.Close(context.TODO())
  211. }
  212. t2.Reset(time.Minute)
  213. }
  214. }