main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/robfig/cron"
  6. "github.com/spf13/cobra"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  15. "math/rand"
  16. "net"
  17. "os"
  18. "proposed_project/config"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "unicode/utf8"
  24. )
  25. var (
  26. UdpClient udp.UdpClient
  27. Es *elastic.Elastic
  28. EsBulkSize = 200
  29. saveEsPool = make(chan map[string]interface{}, 5000)
  30. saveEsSp = make(chan bool, 3)
  31. )
  32. func main() {
  33. rootCmd := &cobra.Command{Use: "my cmd"}
  34. rootCmd.AddCommand(tags())
  35. rootCmd.AddCommand(project())
  36. rootCmd.AddCommand(nzjData())
  37. rootCmd.AddCommand(projectAdd())
  38. rootCmd.AddCommand(tidbSave())
  39. rootCmd.AddCommand(tidbSave1())
  40. rootCmd.AddCommand(tidbAddSave())
  41. rootCmd.AddCommand(redisSave())
  42. rootCmd.AddCommand(esSave())
  43. rootCmd.AddCommand(projectComb())
  44. rootCmd.AddCommand(projectCombTidb())
  45. rootCmd.AddCommand(projectCombAdd())
  46. rootCmd.AddCommand(newId())
  47. if err := rootCmd.Execute(); err != nil {
  48. fmt.Println("rootCmd.Execute failed", err.Error())
  49. }
  50. }
  51. func tags() *cobra.Command {
  52. cmdClient := &cobra.Command{
  53. Use: "tags",
  54. Short: "Start processing tags data",
  55. Run: func(cmd *cobra.Command, args []string) {
  56. InitRule()
  57. go UpdateMethod()
  58. taskRun()
  59. c := make(chan bool, 1)
  60. <-c
  61. },
  62. }
  63. return cmdClient
  64. }
  65. func project() *cobra.Command {
  66. cmdClient := &cobra.Command{
  67. Use: "project",
  68. Short: "Start processing project data",
  69. Run: func(cmd *cobra.Command, args []string) {
  70. InitRule()
  71. loadProject()
  72. go updateAllQueue()
  73. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  74. UdpClient.Listen(processUdpMsg)
  75. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  76. c := make(chan bool, 1)
  77. <-c
  78. },
  79. }
  80. return cmdClient
  81. }
  82. func nzjData() *cobra.Command {
  83. cmdClient := &cobra.Command{
  84. Use: "nzj-data",
  85. Short: "Start processing project data",
  86. Run: func(cmd *cobra.Command, args []string) {
  87. if LastId == "" {
  88. _ = cmd.Help()
  89. os.Exit(0)
  90. }
  91. go saveMethod()
  92. sid, eid := taskQ()
  93. if sid != "" && eid != "" {
  94. doTask1(sid, eid)
  95. LastId = eid
  96. }
  97. crn := cron.New()
  98. cronstr := "0 */10 * * * ?" // 每10min执行一次
  99. _ = crn.AddFunc(cronstr, func() {
  100. if TaskSingle {
  101. sid, eid := taskQ()
  102. if sid != "" && eid != "" {
  103. TaskSingle = false
  104. doTask1(sid, eid)
  105. LastId = eid
  106. }
  107. } else {
  108. log.Info("上次任务未执行完成")
  109. }
  110. })
  111. crn.Start()
  112. c := make(chan bool, 1)
  113. <-c
  114. },
  115. }
  116. cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id")
  117. return cmdClient
  118. }
  119. func projectAdd() *cobra.Command {
  120. cmdClient := &cobra.Command{
  121. Use: "project-add",
  122. Short: "Start processing project data",
  123. Run: func(cmd *cobra.Command, args []string) {
  124. if LastId == "" {
  125. _ = cmd.Help()
  126. os.Exit(0)
  127. }
  128. InitRule()
  129. loadProject()
  130. go updateAllQueue()
  131. info, _ := MgoBid.Find("nzj_bidding", nil, `{"_id": -1}`, nil, true, -1, 1)
  132. doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"]))
  133. LastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  134. crn := cron.New()
  135. cronstr := "0 */30 * * * ?" // 每30min执行一次
  136. _ = crn.AddFunc(cronstr, func() {
  137. if TaskSingle {
  138. info, _ := MgoBid.Find("nzj_bidding", nil, `{"_id": -1}`, nil, true, -1, 1)
  139. TaskSingle = false
  140. doTask(LastId, mongodb.BsonIdToSId((*info)[0]["_id"]))
  141. LastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  142. } else {
  143. log.Info("上次任务未执行完成")
  144. }
  145. })
  146. crn.Start()
  147. c := make(chan bool, 1)
  148. <-c
  149. },
  150. }
  151. cmdClient.Flags().StringVarP(&LastId, "lastid", "s", "", "data start id")
  152. return cmdClient
  153. }
  154. func tidbSave() *cobra.Command {
  155. cmdClient := &cobra.Command{
  156. Use: "tidb",
  157. Short: "Start processing project save to tidb",
  158. Run: func(cmd *cobra.Command, args []string) {
  159. InitMysql()
  160. InitArea()
  161. go SaveFunc("dwd_f_nzj_baseinfo", BaseField)
  162. go SaveRFunc("dwd_f_nzj_follw_record", RecordField)
  163. go SaveCFunc("dwd_f_nzj_contact", ContactField)
  164. go SaveCyFunc("dwd_f_nzj_category_tags", CategoryField)
  165. go SaveEntFunc("dwd_f_nzj_ent", EntField)
  166. //redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  167. taskTidb(nil)
  168. c := make(chan bool, 1)
  169. <-c
  170. },
  171. }
  172. return cmdClient
  173. }
  174. func tidbSave1() *cobra.Command {
  175. cmdClient := &cobra.Command{
  176. Use: "tidb1",
  177. Short: "Start processing project save to tidb",
  178. Run: func(cmd *cobra.Command, args []string) {
  179. InitMysql()
  180. InitArea()
  181. go SaveFunc("dwd_f_nzj_baseinfo", BaseField)
  182. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  183. taskTidb1()
  184. c := make(chan bool, 1)
  185. <-c
  186. },
  187. }
  188. cmdClient.Flags().StringVarP(&id, "pid", "p", "", "pid")
  189. return cmdClient
  190. }
  191. func tidbAddSave() *cobra.Command {
  192. cmdClient := &cobra.Command{
  193. Use: "tidb-add",
  194. Short: "Start processing project save to tidb",
  195. Run: func(cmd *cobra.Command, args []string) {
  196. InitMysql()
  197. InitArea()
  198. //go SaveFunc("dwd_f_nzj_baseinfo", BaseField)
  199. //go SaveRFunc("dwd_f_nzj_follw_record", RecordField)
  200. //go SaveCFunc("dwd_f_nzj_contact", ContactField)
  201. //go SaveCyFunc("dwd_f_nzj_category_tags", CategoryField)
  202. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  203. taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}})
  204. crn := cron.New()
  205. cronstr := "0 */30 * * * ?" // 每30min执行一次
  206. _ = crn.AddFunc(cronstr, func() {
  207. if TaskSingle {
  208. TaskSingle = false
  209. taskTidb_add(bson.M{"pici": bson.M{"$gt": pici}})
  210. } else {
  211. log.Info("上次任务未执行完成")
  212. }
  213. })
  214. crn.Start()
  215. c := make(chan bool, 1)
  216. <-c
  217. },
  218. }
  219. cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time")
  220. return cmdClient
  221. }
  222. func redisSave() *cobra.Command {
  223. cmdClient := &cobra.Command{
  224. Use: "redis",
  225. Short: "Start processing project save to tidb",
  226. Run: func(cmd *cobra.Command, args []string) {
  227. InitMysql()
  228. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  229. redisDisp()
  230. },
  231. }
  232. return cmdClient
  233. }
  234. func esSave() *cobra.Command {
  235. cmdClient := &cobra.Command{
  236. Use: "es-save",
  237. Short: "Start processing project save to es",
  238. Run: func(cmd *cobra.Command, args []string) {
  239. InitMysql()
  240. InitArea()
  241. InitTagCode()
  242. InitEs()
  243. go SaveEs()
  244. esDisp()
  245. crn := cron.New()
  246. cronstr := "0 */30 * * * ?" // 每30min执行一次
  247. _ = crn.AddFunc(cronstr, func() {
  248. if TaskSingle {
  249. TaskSingle = false
  250. esDisp()
  251. } else {
  252. log.Info("上次任务未执行完成")
  253. }
  254. })
  255. crn.Start()
  256. c := make(chan bool, 1)
  257. <-c
  258. },
  259. }
  260. cmdClient.Flags().Int64VarP(&pici, "pici", "c", 0, "pici time")
  261. return cmdClient
  262. }
  263. // @Description 拟在建项目关联
  264. // @Author J 2023/4/17 09:05
  265. func projectComb() *cobra.Command {
  266. cmdClient := &cobra.Command{
  267. Use: "project-comb",
  268. Short: "Start processing combined project",
  269. Run: func(cmd *cobra.Command, args []string) {
  270. initSeg()
  271. InitEs()
  272. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Project, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbt)
  273. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Proposed, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbd)
  274. go SavePpMethod()
  275. taskComb()
  276. c := make(chan bool, 1)
  277. <-c
  278. },
  279. }
  280. return cmdClient
  281. }
  282. // @Description 拟在建项目关联后入tidb处理
  283. // @Author J 2023/4/17 09:06
  284. func projectCombTidb() *cobra.Command {
  285. cmdClient := &cobra.Command{
  286. Use: "project-comb-tidb",
  287. Short: "Start processing combined project",
  288. Run: func(cmd *cobra.Command, args []string) {
  289. InitMysql()
  290. initStage()
  291. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  292. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Project, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbt)
  293. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Proposed, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbd)
  294. go SaveEntFunc1("dwd_f_nzj_ent", EntField)
  295. taskD()
  296. c := make(chan bool, 1)
  297. <-c
  298. },
  299. }
  300. return cmdClient
  301. }
  302. // @Description 拟在建关联数据 增量数据处理
  303. // @Author J 2023/4/24 13:51
  304. func projectCombAdd() *cobra.Command {
  305. cmdClient := &cobra.Command{
  306. Use: "project-comb-add",
  307. Short: "Start processing combined project add",
  308. Run: func(cmd *cobra.Command, args []string) {
  309. initSeg()
  310. InitEs()
  311. InitMysql()
  312. initStage()
  313. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  314. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Project, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbt)
  315. redis.InitRedis1(fmt.Sprintf("%s=%s", config.Conf.DB.Redis.Proposed, config.Conf.DB.Redis.Addr), config.Conf.DB.Redis.Dbd)
  316. go SaveEntFunc1("dwd_f_nzj_ent", EntField)
  317. //go taskAA()
  318. //go taskBB()
  319. crn := cron.New()
  320. cronstr := "0 0 22 * * ?" // 每天10点执行一次
  321. _ = crn.AddFunc(cronstr, func() {
  322. if TaskSingle {
  323. //go taskAA()
  324. go taskBB()
  325. } else {
  326. log.Info("上次任务未执行完成")
  327. }
  328. })
  329. crn.Start()
  330. c := make(chan bool, 1)
  331. <-c
  332. },
  333. }
  334. return cmdClient
  335. }
  336. func newId() *cobra.Command {
  337. cmdClient := &cobra.Command{
  338. Use: "newid",
  339. Short: "Start processing data",
  340. Run: func(cmd *cobra.Command, args []string) {
  341. InitMysql()
  342. info := MysqlTool.Find("dwd_f_user_claim", nil, "id, project_id", "", -1, -1)
  343. for _, m := range *info {
  344. update := make(map[string]interface{})
  345. pid := util.ObjToString(m["project_id"])
  346. info := MysqlTool.FindOne("dwd_f_nzj_baseinfo", bson.M{"proposed_id": pid}, "", "")
  347. if info != nil && len(*info) > 0 {
  348. update["new_id"] = pid
  349. } else {
  350. info = MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": pid}, "proposed_id", "")
  351. if info != nil && len(*info) > 0 {
  352. update["new_id"] = util.ObjToString((*info)["proposed_id"])
  353. } else {
  354. util.Debug("err ---", pid)
  355. }
  356. }
  357. if len(update) > 0 {
  358. util.Debug(update)
  359. MysqlTool.Update("dwd_f_user_claim", bson.M{"id": util.ObjToString(m["id"])}, update)
  360. }
  361. }
  362. },
  363. }
  364. return cmdClient
  365. }
  366. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  367. switch act {
  368. case udp.OP_TYPE_DATA:
  369. var mapInfo map[string]interface{}
  370. err := json.Unmarshal(data, &mapInfo)
  371. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  372. gtid, _ := mapInfo["gtid"].(string)
  373. lteid, _ := mapInfo["lteid"].(string)
  374. if err != nil || gtid == "" || lteid == "" {
  375. UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra)
  376. } else {
  377. //udp成功回写
  378. if k := util.ObjToString(mapInfo["key"]); k != "" {
  379. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  380. } else {
  381. k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
  382. UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
  383. }
  384. log.Info("start merge ...")
  385. doTask(gtid, lteid)
  386. }
  387. case udp.OP_NOOP: //下个节点回应
  388. }
  389. }
  390. func taskQ() (string, string) {
  391. log.Info("taskQ", zap.String("lastid", LastId))
  392. query := bson.M{"gtid": bson.M{"$gt": LastId}}
  393. info, _ := MgoBid.Find("field_data_record", query, `{"_id": -1}`, nil, false, -1, 1)
  394. if len(*info) > 0 {
  395. newid := util.ObjToString((*info)[0]["lteid"])
  396. log.Info("taskQ", zap.String("新lastid", newid))
  397. return LastId, newid
  398. } else {
  399. log.Info("taskQ 未发现新数据")
  400. return "", ""
  401. }
  402. }
  403. func redisDisp() {
  404. pool := make(chan bool, 5) //控制线程数
  405. wg := &sync.WaitGroup{}
  406. finalId := 0
  407. lastInfo := MysqlTool1.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
  408. if len(*lastInfo) > 0 {
  409. finalId = util.IntAll((*lastInfo)[0]["id"])
  410. }
  411. util.Debug("taskIterateSql---", "finally id", finalId)
  412. lastid, count := 0, 0
  413. for {
  414. util.Debug("重新查询,lastid---", lastid)
  415. q := fmt.Sprintf("SELECT id, name, name_id, area_code, city_code FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_ent_baseinfo", lastid)
  416. rows, err := MysqlTool1.DB.Query(q)
  417. if err != nil {
  418. util.Debug("taskIterateSql---", err)
  419. }
  420. columns, err := rows.Columns()
  421. if finalId == lastid {
  422. util.Debug("----finish----------", count)
  423. break
  424. }
  425. for rows.Next() {
  426. scanArgs := make([]interface{}, len(columns))
  427. values := make([]interface{}, len(columns))
  428. ret := make(map[string]interface{})
  429. for k := range values {
  430. scanArgs[k] = &values[k]
  431. }
  432. err = rows.Scan(scanArgs...)
  433. if err != nil {
  434. util.Debug("taskIterateSql---", err)
  435. break
  436. }
  437. for i, col := range values {
  438. if v, ok := col.([]uint8); ok {
  439. ret[columns[i]] = string(v)
  440. } else {
  441. ret[columns[i]] = col
  442. }
  443. }
  444. lastid = util.IntAll(ret["id"])
  445. count++
  446. if count%20000 == 0 {
  447. util.Debug("current-------", count, lastid)
  448. }
  449. pool <- true
  450. wg.Add(1)
  451. go func(tmp map[string]interface{}) {
  452. defer func() {
  453. <-pool
  454. wg.Done()
  455. }()
  456. redis.PutCKV("ent_id", util.ObjToString(tmp["name"]),
  457. fmt.Sprintf("%s_%s_%s", util.ObjToString(tmp["name_id"]), util.ObjToString(tmp["area_code"]), util.ObjToString(tmp["city_code"])))
  458. }(ret)
  459. ret = make(map[string]interface{})
  460. }
  461. _ = rows.Close()
  462. wg.Wait()
  463. }
  464. }
  465. var EsField = []string{"_id", "projectname", "owner", "area", "city", "district", "nature_code", "ownerclass_code", "project_stage_code", "category_code", "total_investment", "lasttime", "proposed_id"}
  466. func esDisp() {
  467. sess := MgoPro.GetMgoConn()
  468. defer MgoPro.DestoryMongoConn(sess)
  469. client := Es.GetEsConn()
  470. defer Es.DestoryEsConn(client)
  471. ch := make(chan bool, config.Conf.Serve.Thread)
  472. wg := &sync.WaitGroup{}
  473. var q bson.M
  474. if pici > 0 {
  475. q = bson.M{"pici": bson.M{"$gt": pici}}
  476. }
  477. query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Select(SelectF).Iter()
  478. count := 0
  479. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  480. if count%200 == 0 {
  481. log.Info(fmt.Sprintf("current --- %d", count))
  482. }
  483. if t := util.Int64All(tmp["pici"]); t > pici {
  484. pici = t
  485. }
  486. ch <- true
  487. wg.Add(1)
  488. go func(tmp map[string]interface{}) {
  489. defer func() {
  490. <-ch
  491. wg.Done()
  492. }()
  493. save := make(map[string]interface{})
  494. for _, f := range EsField {
  495. if tmp[f] == nil {
  496. continue
  497. }
  498. if f == "_id" {
  499. save["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"])
  500. save["_id"] = mongodb.BsonIdToSId(tmp["_id"])
  501. } else if f == "area" {
  502. save[f] = tmp[f]
  503. save["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  504. if util.ObjToString(tmp["city"]) != "" {
  505. save["area_city"] = tmp["city"]
  506. } else {
  507. save["area_city"] = tmp["area"]
  508. }
  509. } else if f == "lasttime" {
  510. save[f] = util.Int64All(tmp[f]) * 1000
  511. } else if f == "nature_code" {
  512. save[f] = tmp[f]
  513. save["nature"] = TagCode["nature"].(map[string]interface{})[util.ObjToString(tmp[f])]
  514. } else if f == "ownerclass_code" {
  515. save[f] = tmp[f]
  516. save["ownerclass"] = TagCode["owner"].(map[string]interface{})[util.ObjToString(tmp[f])]
  517. } else if f == "project_stage_code" {
  518. save[f] = tmp[f]
  519. save["project_stage"] = TagCode["project_stage"].(map[string]interface{})[util.ObjToString(tmp[f])]
  520. } else if f == "category_code" {
  521. save[f] = tmp[f]
  522. save["category"] = TagCode["category"].(map[string]interface{})[util.ObjToString(tmp[f])]
  523. } else if f == "total_investment" {
  524. text := util.ObjToString(tmp[f])
  525. c := ObjToMoney(text)
  526. c = c / 10000
  527. if c != 0 {
  528. c, _ = util.FormatFloat(c, 4)
  529. save[f] = c
  530. }
  531. } else {
  532. save[f] = tmp[f]
  533. }
  534. }
  535. saveEsPool <- save
  536. }(tmp)
  537. tmp = make(map[string]interface{})
  538. }
  539. wg.Wait()
  540. log.Info(fmt.Sprintf("over --- %d", count))
  541. TaskSingle = true
  542. }
  543. func saveMethod() {
  544. arru := make([]map[string]interface{}, saveSize)
  545. indexu := 0
  546. for {
  547. select {
  548. case v := <-savePool:
  549. arru[indexu] = v
  550. indexu++
  551. if indexu == saveSize {
  552. saveSp <- true
  553. go func(arru []map[string]interface{}) {
  554. defer func() {
  555. <-saveSp
  556. }()
  557. MgoBid.SaveBulk("nzj_bidding", arru...)
  558. }(arru)
  559. arru = make([]map[string]interface{}, saveSize)
  560. indexu = 0
  561. }
  562. case <-time.After(1000 * time.Millisecond):
  563. if indexu > 0 {
  564. saveSp <- true
  565. go func(arru []map[string]interface{}) {
  566. defer func() {
  567. <-saveSp
  568. }()
  569. MgoBid.SaveBulk("nzj_bidding", arru...)
  570. }(arru[:indexu])
  571. arru = make([]map[string]interface{}, saveSize)
  572. indexu = 0
  573. }
  574. }
  575. }
  576. }
  577. func SaveEs() {
  578. arru := make([]map[string]interface{}, EsBulkSize)
  579. indexu := 0
  580. for {
  581. select {
  582. case v := <-saveEsPool:
  583. arru[indexu] = v
  584. indexu++
  585. if indexu == EsBulkSize {
  586. saveEsSp <- true
  587. go func(arru []map[string]interface{}) {
  588. defer func() {
  589. <-saveEsSp
  590. }()
  591. Es.BulkSave(config.Conf.DB.Es.IndexP, arru)
  592. }(arru)
  593. arru = make([]map[string]interface{}, EsBulkSize)
  594. indexu = 0
  595. }
  596. case <-time.After(1000 * time.Millisecond):
  597. if indexu > 0 {
  598. saveEsSp <- true
  599. go func(arru []map[string]interface{}) {
  600. defer func() {
  601. <-saveEsSp
  602. }()
  603. Es.BulkSave(config.Conf.DB.Es.IndexP, arru)
  604. }(arru[:indexu])
  605. arru = make([]map[string]interface{}, EsBulkSize)
  606. indexu = 0
  607. }
  608. }
  609. }
  610. }
  611. func main1() {
  612. InitMysql()
  613. InitArea()
  614. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  615. //sql := fmt.Sprintf("SELECT id, new_id FROM %s", "dwd_f_user_claim")
  616. info := MysqlTool.SelectBySql("SELECT * FROM dwd_f_user_claim where project_id not in (SELECT proposed_id FROM dwd_f_nzj_baseinfo dfnb)")
  617. for _, m := range *info {
  618. fid := util.ObjToString(m["new_id"])
  619. //m1 := MysqlTool.Find("dwd_f_nzj_baseinfo", map[string]interface{}{"proposed_id": fid}, "id", "", -1, -1)
  620. pinfo, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProposedColl, fid, nil)
  621. if len(*pinfo) > 0 {
  622. } else {
  623. pinfo, _ = MgoPro.FindOne(config.Conf.DB.MongoP.ProposedColl, bson.M{"ids": fid})
  624. MysqlTool.Update("dwd_f_user_claim", bson.M{"project_id": util.ObjToString(m["project_id"])}, bson.M{"new_id": mongodb.BsonIdToSId((*pinfo)["_id"])})
  625. }
  626. saveM := make(map[string]interface{})
  627. for _, f := range BaseField {
  628. if f == "lasttime" || f == "firsttime" {
  629. if t := util.Int64All((*pinfo)[f]); t > 0 {
  630. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  631. }
  632. } else if f == "proposed_id" {
  633. saveM[f] = mongodb.BsonIdToSId((*pinfo)["_id"])
  634. } else if f == "area_code" {
  635. if (*pinfo)["area"] != nil {
  636. saveM[f] = AreaCode[util.ObjToString((*pinfo)["area"])]
  637. }
  638. } else if f == "city_code" {
  639. if (*pinfo)["area"] != nil && (*pinfo)["city"] != nil {
  640. c := util.ObjToString((*pinfo)["area"]) + "," + util.ObjToString((*pinfo)["city"])
  641. saveM[f] = AreaCode[c]
  642. }
  643. } else if f == "owner" {
  644. if v := util.ObjToString((*pinfo)[f]); v != "" {
  645. if utf8.RuneCountInString(v) < 100 {
  646. saveM[f] = v
  647. }
  648. }
  649. } else if f == "name_id" {
  650. if b := util.ObjToString((*pinfo)["owner"]); b != "" {
  651. if eid := redis.GetStr("ent_id", b); eid != "" {
  652. saveM["name_id"] = strings.Split(eid, "_")[0]
  653. }
  654. }
  655. } else if f == "lasttime" || f == "firsttime" || f == "project_startdate" || f == "project_completedate" {
  656. if (*pinfo)[f] != nil && util.IntAll((*pinfo)[f]) > 0 {
  657. t := util.Int64All((*pinfo)[f])
  658. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  659. }
  660. } else if f == "createtime" {
  661. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  662. } else if f == "total_investment" {
  663. text := util.ObjToString((*pinfo)[f])
  664. capital := ObjToMoney(text)
  665. capital = capital / 10000
  666. if capital != 0 {
  667. capital, _ = util.FormatFloat(capital, 6)
  668. saveM[f] = capital
  669. }
  670. } else if f == "approvestatus" {
  671. if util.ObjToString((*pinfo)[f]) != "" && utf8.RuneCountInString(util.ObjToString((*pinfo)[f])) < 8 {
  672. saveM[f] = (*pinfo)[f]
  673. }
  674. } else if f == "proposed_number" {
  675. if (*pinfo)[f] == nil {
  676. now := time.Now().Unix()
  677. st := util.FormatDateByInt64(&now, util.Date_yyyyMMdd)
  678. parseSt := strconv.FormatInt(util.Int64All(st), 8) // 转8进制
  679. rd := fmt.Sprintf("%04v", rand.New(rand.NewSource(time.Now().UnixNano())).Int63n(10000)) // 4位随机数
  680. saveM[f] = fmt.Sprintf("NZJ%s%s", parseSt, rd)
  681. } else {
  682. saveM[f] = (*pinfo)[f]
  683. }
  684. } else if f == "approvecode" {
  685. if util.ObjToString((*pinfo)[f]) != "" && utf8.RuneCountInString(util.ObjToString((*pinfo)[f])) < 200 {
  686. saveM[f] = (*pinfo)[f]
  687. }
  688. } else if f == "floor_area" {
  689. if util.ObjToString((*pinfo)[f]) != "" && utf8.RuneCountInString(util.ObjToString((*pinfo)[f])) < 255 {
  690. saveM[f] = (*pinfo)[f]
  691. }
  692. } else {
  693. if (*pinfo)[f] != nil {
  694. saveM[f] = (*pinfo)[f]
  695. }
  696. }
  697. }
  698. MysqlTool.Insert("dwd_f_nzj_baseinfo", saveM)
  699. }
  700. }