timedTask.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/garyburd/redigo/redis"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.mongodb.org/mongo-driver/mongo/options"
  9. "gopkg.in/mgo.v2/bson"
  10. "log"
  11. "net"
  12. "sort"
  13. "strings"
  14. "time"
  15. mu "mfw/util"
  16. )
  17. //定时任务
  18. //1.存异常表
  19. //2.合并原始库新增
  20. func TimedTask() {
  21. t2 := time.NewTimer(time.Second * 5)
  22. for range t2.C {
  23. tmpLast := map[string]interface{}{}
  24. if err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").FindOne(context.TODO(), bson.M{}, options.FindOne().SetSort(bson.M{"_id": -1})).Decode(&tmpLast); err != nil {
  25. //临时表无数据
  26. log.Println("临时表无数据:", err)
  27. t2.Reset(time.Minute * 5)
  28. continue
  29. } else {
  30. //临时表有数据
  31. log.Println("临时表有数据:", tmpLast)
  32. cursor, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").Find(context.TODO(), bson.M{
  33. "_id": bson.M{
  34. "$lte": tmpLast["_id"],
  35. },
  36. }, options.Find().SetSort(bson.M{"_id": 1}))
  37. if err != nil {
  38. log.Println(err)
  39. t2.Reset(time.Second * 5)
  40. continue
  41. }
  42. //遍历临时表数据,匹配不到原始库存入异常表
  43. for cursor.Next(context.TODO()) {
  44. if err := cursor.Err(); err != nil {
  45. log.Println("cursor.Err();", err)
  46. }
  47. tmp := make(map[string]interface{})
  48. if err := cursor.Decode(&tmp); err == nil {
  49. //再重新查找redis,存在发udp处理,不存在走新增合并
  50. rdb := RedisPool.Get()
  51. if reply, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
  52. //{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
  53. //redis存在发送udp进行处理
  54. go func(reply string) {
  55. by, _ := json.Marshal(map[string]interface{}{
  56. "gtid": reply,
  57. "lteid": reply,
  58. "stype": "",
  59. })
  60. if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  61. IP: net.ParseIP("127.0.0.1"),
  62. Port: Updport,
  63. });e != nil{
  64. log.Println(e)
  65. }
  66. }(reply)
  67. //存在的话删除tmp mongo表
  68. if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp);err != nil{
  69. log.Println("删除临时表err:",err)
  70. }
  71. if err := rdb.Close(); err != nil {
  72. log.Println(err)
  73. }
  74. continue
  75. } else {
  76. if err = rdb.Close(); err != nil {
  77. log.Println(err)
  78. }
  79. }
  80. //查询redis不存在新增
  81. resulttmp := make(map[string]interface{})
  82. r := FClient.Database(Config["mgodb_enterprise"]).Collection(Config["mgodb_enterprise_c"]).FindOne(context.TODO(), bson.M{"company_name": tmp["winner"]}).Decode(&resulttmp)
  83. if r != nil {
  84. //log.Println(r)
  85. //匹配不到原始库,存入异常表删除临时表
  86. if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_err").InsertOne(context.TODO(), tmp); err != nil {
  87. log.Println(err)
  88. }
  89. if _, err = FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
  90. log.Println(err)
  91. }
  92. continue
  93. } else {
  94. //log.Println(123)
  95. //匹配到原始库,新增 resulttmp
  96. if resulttmp["credit_no"] != nil {
  97. if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
  98. len(strings.TrimSpace(credit_no)) > 8 {
  99. dataNo := strings.TrimSpace(credit_no)[2:8]
  100. if Addrs[dataNo] != nil {
  101. if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
  102. if resulttmp["province"] == nil || resulttmp["province"] == "" {
  103. resulttmp["province"] = v["province"]
  104. }
  105. resulttmp["city"] = v["city"]
  106. resulttmp["district"] = v["district"]
  107. }
  108. }
  109. }
  110. }
  111. contacts := make([]map[string]interface{}, 0)
  112. contact := make(map[string]interface{}, 0)
  113. if resulttmp["legal_person"] != nil {
  114. contact["contact_person"] = resulttmp["legal_person"] //联系人
  115. } else {
  116. contact["contact_person"] = "" //联系人
  117. }
  118. contact["contact_type"] = "法定代表人" //法定代表人
  119. //log.Println(1)
  120. if resulttmp["annual_reports"] != nil {
  121. bytes, err := json.Marshal(resulttmp["annual_reports"])
  122. if err != nil {
  123. log.Println("annual_reports err:", err)
  124. }
  125. phonetmp := make([]map[string]interface{}, 0)
  126. err = json.Unmarshal(bytes, &phonetmp)
  127. if err != nil {
  128. log.Println("Unmarshal err:", err)
  129. }
  130. for _, vv := range phonetmp {
  131. if vv["company_phone"] != nil {
  132. if vv["company_phone"] == "" {
  133. continue
  134. } else {
  135. contact["phone"] = vv["company_phone"] //联系电话
  136. break
  137. }
  138. } else {
  139. contact["phone"] = "" //联系电话
  140. }
  141. }
  142. }
  143. //log.Println(k, contact["phone"], resulttmp["_id"])
  144. //time.Sleep(10 * time.Second)
  145. if contact["phone"] == nil {
  146. contact["phone"] = "" //联系电话
  147. }
  148. contact["topscopeclass"] = "企业公示" //项目类型
  149. contact["updatetime"] = time.Now().Unix() //更新时间
  150. contacts = append(contacts, contact)
  151. resulttmp["contact"] = contacts
  152. savetmp := make(map[string]interface{}, 0)
  153. for _, sk := range Fields {
  154. if sk == "establish_date" {
  155. if resulttmp[sk] != nil {
  156. savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
  157. continue
  158. }
  159. } else if sk == "capital" {
  160. //log.Println(sk, resulttmp[sk])
  161. savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
  162. continue
  163. } else if sk == "partners" {
  164. //log.Println(sk, resulttmp[sk], )
  165. //fmt.Println(reflect.TypeOf(resulttmp[sk]))
  166. if resulttmp[sk] != nil {
  167. if ppms, ok := resulttmp[sk].(primitive.A); ok {
  168. for i, _ := range ppms {
  169. if ppms[i].(map[string]interface{})["stock_type"] != nil {
  170. ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
  171. }
  172. delete(ppms[i].(map[string]interface{}), "identify_type")
  173. }
  174. savetmp[sk] = ppms
  175. }
  176. } else {
  177. savetmp[sk] = []interface{}{}
  178. }
  179. continue
  180. } else if sk == "_id" {
  181. savetmp["tmp"+sk] = resulttmp[sk]
  182. continue
  183. } else if sk == "area_code" {
  184. //行政区划代码
  185. savetmp[sk] = fmt.Sprint(resulttmp[sk])
  186. continue
  187. } else if sk == "report_websites" {
  188. //网址
  189. if resulttmp["report_websites"] == nil {
  190. savetmp["website"] = ""
  191. } else {
  192. report_websitesArr := []string{}
  193. if ppms, ok := resulttmp[sk].(primitive.A); ok {
  194. for _, v := range ppms {
  195. if vvv, ok := v.(map[string]interface{}); ok {
  196. if rv, ok := vvv["website_url"].(string); ok {
  197. report_websitesArr = append(report_websitesArr, rv)
  198. }
  199. }
  200. }
  201. }
  202. sort.Strings(report_websitesArr)
  203. savetmp["website"] = strings.Join(report_websitesArr, ";")
  204. }
  205. continue
  206. } else if sk == "wechat_accounts" {
  207. savetmp[sk] = []interface{}{}
  208. continue
  209. }
  210. if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
  211. savetmp[sk] = ""
  212. } else {
  213. savetmp[sk] = resulttmp[sk]
  214. }
  215. }
  216. //tmps = append(tmps, savetmp)
  217. savetmp["updatatime"] = time.Now().Unix()
  218. //保存mongo
  219. result, err := FClient.Database(Config["mgodb_extract_kf"]).Collection(Config["mgo_qyk_c"]).
  220. InsertOne(context.TODO(), savetmp)
  221. if err == nil {
  222. //保存redis
  223. rc := RedisPool.Get()
  224. var _id string
  225. if v, ok := result.InsertedID.(primitive.ObjectID); ok {
  226. _id = v.Hex()
  227. }
  228. if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
  229. log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
  230. if err := rc.Close(); err != nil {
  231. log.Println(err)
  232. }
  233. } else {
  234. //保存es
  235. delete(savetmp, "_id")
  236. if err := rc.Close(); err != nil {
  237. log.Println(err)
  238. }
  239. //esConn := elastic.GetEsConn()
  240. //defer elastic.DestoryEsConn(esConn)
  241. if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
  242. log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
  243. } else {
  244. //删除临时表
  245. if _, err := FClient.Database(Config["mgodb_extract_kf"]).Collection("winner_new").DeleteOne(context.TODO(), tmp); err != nil {
  246. log.Println(err)
  247. }
  248. }
  249. }
  250. } else {
  251. log.Println("save mongo err:", err, tmp["_id"])
  252. }
  253. }
  254. }
  255. }
  256. defer cursor.Close(context.TODO())
  257. }
  258. t2.Reset(time.Minute)
  259. }
  260. }