timedTask.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/garyburd/redigo/redis"
  6. "go.mongodb.org/mongo-driver/bson/primitive"
  7. "gopkg.in/mgo.v2/bson"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. //定时任务
  16. //1.存异常表
  17. //2.合并原始库新增
  18. func TimedTask() {
  19. //time.Sleep(time.Hour*70)
  20. t2 := time.NewTimer(time.Second * 5)
  21. for range t2.C {
  22. Fcconn := FClient.GetMgoConn()
  23. defer FClient.DestoryMongoConn(Fcconn)
  24. tmpLast := map[string]interface{}{}
  25. if iter := Fcconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{}).Sort("-_id").Limit(1).Iter(); iter != nil {
  26. if !iter.Next(&tmpLast) {
  27. //临时表无数据
  28. log.Println("临时表无数据:")
  29. t2.Reset(time.Minute * 5)
  30. continue
  31. } else {
  32. log.Println("临时表有数据:", tmpLast)
  33. fconn := FClient.GetMgoConn()
  34. defer FClient.DestoryMongoConn(fconn)
  35. cursor := fconn.DB(Config["mgodb_extract_kf"]).C("winner_new").Find(bson.M{
  36. "_id": bson.M{
  37. "$lte": tmpLast["_id"],
  38. },
  39. }).Sort("_id").Iter()
  40. if cursor == nil {
  41. log.Println("查询失败")
  42. t2.Reset(time.Second * 5)
  43. continue
  44. }
  45. //遍历临时表数据,匹配不到原始库存入异常表
  46. tmp := make(map[string]interface{})
  47. for cursor.Next(&tmp) {
  48. tmpId := tmp["_id"].(primitive.ObjectID).Hex()
  49. //再重新查找redis,存在发udp处理,不存在走新增合并
  50. rdb := RedisPool.Get()
  51. if _, err := redis.String(rdb.Do("GET", tmp["winner"])); err == nil {
  52. //{"gtid":"57d7ad2f61a0721f152d2ad5","lteid":"5e20968d85a9271abf0ad6c2","stype":""}
  53. //redis存在发送udp进行处理
  54. by, _ := json.Marshal(map[string]interface{}{
  55. "gtid": tmpId,
  56. "lteid": tmpId,
  57. "stype": "",
  58. })
  59. if e := udpclient.WriteUdp(by, mu.OP_TYPE_DATA, &net.UDPAddr{
  60. IP: net.ParseIP("127.0.0.1"),
  61. Port: Updport,
  62. }); e != nil {
  63. log.Println(e)
  64. }
  65. //存在的话删除tmp mongo表
  66. FClient.DbName = Config["mgodb_extract_kf"]
  67. if DeletedCount := FClient.DeleteById("winner_new", tmpId); DeletedCount == 0 {
  68. log.Println("删除临时表err:", DeletedCount)
  69. }
  70. if err := rdb.Close(); err != nil {
  71. log.Println(err)
  72. }
  73. continue
  74. } else {
  75. if err = rdb.Close(); err != nil {
  76. log.Println(err)
  77. }
  78. }
  79. //查询redis不存在新增
  80. FClient.DbName = Config["mgodb_enterprise"]
  81. resulttmp := FClient.FindOne(Config["mgodb_enterprise_c"], bson.M{"company_name": tmp["winner"]})
  82. if resulttmp != nil {
  83. //log.Println(r)
  84. //匹配不到原始库,存入异常表删除临时表
  85. FClient.DbName = Config["mgodb_extract_kf"]
  86. if saveid := FClient.Save("winner_err", tmp); saveid == nil {
  87. log.Println("存入异常表错误", tmp)
  88. }
  89. FClient.DbName = Config["mgodb_extract_kf"]
  90. if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
  91. log.Println("删除临时表错误", deleteNum)
  92. }
  93. continue
  94. } else {
  95. //log.Println(123)
  96. //匹配到原始库,新增 resulttmp
  97. if resulttmp["credit_no"] != nil {
  98. if credit_no, ok := resulttmp["credit_no"].(string); ok && strings.TrimSpace(credit_no) != "" &&
  99. len(strings.TrimSpace(credit_no)) > 8 {
  100. dataNo := strings.TrimSpace(credit_no)[2:8]
  101. if Addrs[dataNo] != nil {
  102. if v, ok := Addrs[dataNo].(map[string]interface{}); ok {
  103. if resulttmp["province"] == nil || resulttmp["province"] == "" {
  104. resulttmp["province"] = v["province"]
  105. }
  106. resulttmp["city"] = v["city"]
  107. resulttmp["district"] = v["district"]
  108. }
  109. }
  110. }
  111. }
  112. contacts := make([]map[string]interface{}, 0)
  113. contact := make(map[string]interface{}, 0)
  114. if resulttmp["legal_person"] != nil {
  115. contact["contact_person"] = resulttmp["legal_person"] //联系人
  116. } else {
  117. contact["contact_person"] = "" //联系人
  118. }
  119. contact["contact_type"] = "法定代表人" //法定代表人
  120. //log.Println(1)
  121. if resulttmp["annual_reports"] != nil {
  122. bytes, err := json.Marshal(resulttmp["annual_reports"])
  123. if err != nil {
  124. log.Println("annual_reports err:", err)
  125. }
  126. phonetmp := make([]map[string]interface{}, 0)
  127. err = json.Unmarshal(bytes, &phonetmp)
  128. if err != nil {
  129. log.Println("Unmarshal err:", err)
  130. }
  131. for _, vv := range phonetmp {
  132. if vv["company_phone"] != nil {
  133. if vv["company_phone"] == "" {
  134. continue
  135. } else {
  136. contact["phone"] = vv["company_phone"] //联系电话
  137. break
  138. }
  139. } else {
  140. contact["phone"] = "" //联系电话
  141. }
  142. }
  143. }
  144. //log.Println(k, contact["phone"], resulttmp["_id"])
  145. //time.Sleep(10 * time.Second)
  146. if contact["phone"] == nil {
  147. contact["phone"] = "" //联系电话
  148. }
  149. contact["topscopeclass"] = "企业公示" //项目类型
  150. contact["updatetime"] = time.Now().Unix() //更新时间
  151. contacts = append(contacts, contact)
  152. resulttmp["contact"] = contacts
  153. savetmp := make(map[string]interface{}, 0)
  154. for _, sk := range Fields {
  155. if sk == "establish_date" {
  156. if resulttmp[sk] != nil {
  157. savetmp[sk] = resulttmp[sk].(primitive.DateTime).Time().UTC().Unix()
  158. continue
  159. }
  160. } else if sk == "capital" {
  161. //log.Println(sk, resulttmp[sk])
  162. savetmp[sk] = ObjToMoney([]interface{}{resulttmp[sk], ""})[0]
  163. continue
  164. } else if sk == "partners" {
  165. //log.Println(sk, resulttmp[sk], )
  166. //fmt.Println(reflect.TypeOf(resulttmp[sk]))
  167. if resulttmp[sk] != nil {
  168. if ppms, ok := resulttmp[sk].(primitive.A); ok {
  169. for i, _ := range ppms {
  170. if ppms[i].(map[string]interface{})["stock_type"] != nil {
  171. ppms[i].(map[string]interface{})["stock_type"] = "企业公示"
  172. }
  173. delete(ppms[i].(map[string]interface{}), "identify_type")
  174. }
  175. savetmp[sk] = ppms
  176. }
  177. } else {
  178. savetmp[sk] = []interface{}{}
  179. }
  180. continue
  181. } else if sk == "_id" {
  182. savetmp["tmp"+sk] = resulttmp[sk]
  183. continue
  184. } else if sk == "area_code" {
  185. //行政区划代码
  186. savetmp[sk] = fmt.Sprint(resulttmp[sk])
  187. continue
  188. } else if sk == "report_websites" {
  189. //网址
  190. if resulttmp["report_websites"] == nil {
  191. savetmp["website"] = ""
  192. } else {
  193. report_websitesArr := []string{}
  194. if ppms, ok := resulttmp[sk].(primitive.A); ok {
  195. for _, v := range ppms {
  196. if vvv, ok := v.(map[string]interface{}); ok {
  197. if rv, ok := vvv["website_url"].(string); ok {
  198. report_websitesArr = append(report_websitesArr, rv)
  199. }
  200. }
  201. }
  202. }
  203. sort.Strings(report_websitesArr)
  204. savetmp["website"] = strings.Join(report_websitesArr, ";")
  205. }
  206. continue
  207. } else if sk == "wechat_accounts" {
  208. savetmp[sk] = []interface{}{}
  209. continue
  210. }
  211. if resulttmp[sk] == nil && sk != "history_name" && sk != "wechat_accounts" && sk != "establish_date" && sk != "capital" && sk != "partners" && sk != "contact" && sk != "report_websites" {
  212. savetmp[sk] = ""
  213. } else {
  214. savetmp[sk] = resulttmp[sk]
  215. }
  216. }
  217. //tmps = append(tmps, savetmp)
  218. savetmp["updatatime"] = time.Now().Unix()
  219. //保存mongo
  220. FClient.DbName = Config["mgodb_extract_kf"]
  221. saveid := FClient.Save(Config["mgo_qyk_c"], savetmp)
  222. if saveid != nil {
  223. //保存redis
  224. rc := RedisPool.Get()
  225. var _id string
  226. if v, ok := saveid.(primitive.ObjectID); ok {
  227. _id = v.Hex()
  228. }
  229. if _, err := rc.Do("SET", savetmp["company_name"], _id); err != nil {
  230. log.Println("save redis err:", tmp["_id"], savetmp["_id"], savetmp["company_name"], err)
  231. if err := rc.Close(); err != nil {
  232. log.Println(err)
  233. }
  234. } else {
  235. //保存es
  236. delete(savetmp, "_id")
  237. if err := rc.Close(); err != nil {
  238. log.Println(err)
  239. }
  240. //esConn := elastic.GetEsConn()
  241. //defer elastic.DestoryEsConn(esConn)
  242. if _, err := EsConn.Index().Index(Config["elasticsearch_index"]).Type(Config["elasticsearch_type"]).Id(_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
  243. log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
  244. } else {
  245. //删除临时表
  246. FClient.DbName = Config["mgodb_extract_kf"]
  247. if deleteNum := FClient.DeleteById("winner_new", tmpId); deleteNum == 0 {
  248. log.Println("删除临时表失败", deleteNum)
  249. }
  250. }
  251. }
  252. } else {
  253. log.Println("save mongo err:", saveid, tmp["_id"])
  254. }
  255. }
  256. }
  257. }
  258. }
  259. t2.Reset(time.Minute)
  260. }
  261. }