task.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/log"
  5. "app.yhyue.com/data_processing/common_utils/mongodb"
  6. "app.yhyue.com/data_processing/common_utils/redis"
  7. "fieldproject_common/config"
  8. "fmt"
  9. uuid "github.com/satori/go.uuid"
  10. "github.com/spf13/cobra"
  11. "go.uber.org/zap"
  12. "strings"
  13. "sync"
  14. "time"
  15. "unicode/utf8"
  16. )
  17. // @Description bidding数据 bid_field
  18. // @Author J 2022/8/30 09:09
  19. func bidding() *cobra.Command {
  20. cmdClient := &cobra.Command{
  21. Use: "bidding",
  22. Short: "Start processing bidding data",
  23. Run: func(cmd *cobra.Command, args []string) {
  24. go updateEsMethod()
  25. taskBidding()
  26. },
  27. }
  28. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  29. return cmdClient
  30. }
  31. // @Description 医疗机构数据
  32. // @Author J 2022/8/11 16:50
  33. func institution() *cobra.Command {
  34. cmdClient := &cobra.Command{
  35. Use: "medical_institution",
  36. Short: "Start processing medical_institutional data",
  37. Run: func(cmd *cobra.Command, args []string) {
  38. go SaveEs(config.Conf.DB.Es.IndexM, config.Conf.DB.Es.TypeM)
  39. taskIterateSql()
  40. },
  41. }
  42. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  43. return cmdClient
  44. }
  45. // @Description 产品数据
  46. // @Author J 2022/8/11 16:49
  47. func product() *cobra.Command {
  48. cmdClient := &cobra.Command{
  49. Use: "product",
  50. Short: "Start processing product data",
  51. Run: func(cmd *cobra.Command, args []string) {
  52. go SaveFunc("dws_f_product_baseinfo", ProductField)
  53. //taskIterateSql1()
  54. taskProduct()
  55. },
  56. }
  57. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  58. return cmdClient
  59. }
  60. func dealer() *cobra.Command {
  61. cmdClient := &cobra.Command{
  62. Use: "dealer",
  63. Short: "Start processing dealer data",
  64. Run: func(cmd *cobra.Command, args []string) {
  65. go SaveFunc("dwd_f_yl_dealer_baseinfo_new", DealerField)
  66. //go SaveFuncRc()
  67. taskDealer()
  68. },
  69. }
  70. return cmdClient
  71. }
  72. func ent() *cobra.Command {
  73. cmdClient := &cobra.Command{
  74. Use: "ent",
  75. Short: "Start processing ent data",
  76. Run: func(cmd *cobra.Command, args []string) {
  77. go SaveFunc("dws_f_ent_pa_baseinfo", EntField)
  78. taskEnt()
  79. },
  80. }
  81. return cmdClient
  82. }
  83. func register() *cobra.Command {
  84. cmdClient := &cobra.Command{
  85. Use: "register",
  86. Short: "Start processing register data",
  87. Run: func(cmd *cobra.Command, args []string) {
  88. go SaveFunc("dws_f_register_baseinfo", RegField)
  89. taskRegister()
  90. },
  91. }
  92. return cmdClient
  93. }
  94. func project() *cobra.Command {
  95. cmdClient := &cobra.Command{
  96. Use: "project",
  97. Short: "Start processing project data",
  98. Run: func(cmd *cobra.Command, args []string) {
  99. go SaveFunc("dwd_f_yl_purchasing_baseinfo_new", ProjectField)
  100. //go SaveFunc1("dwd_f_yl_purchasing_win_baseinfo_new", WinerField)
  101. InitPoCode()
  102. InitLvCode()
  103. redis.InitRedis1("ent_id=172.17.4.189:8379", 6) // name_id
  104. redis.InitRedis1("bid_class=172.17.4.189:8379", 7) // class
  105. taskProject()
  106. },
  107. }
  108. //cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
  109. return cmdClient
  110. }
  111. func taskBidding() {
  112. sess := MongoTool.GetMgoConn()
  113. defer MongoTool.DestoryMongoConn(sess)
  114. ch := make(chan bool, 3)
  115. wg := &sync.WaitGroup{}
  116. //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5a8d7f4840d2d9bbe8962002")}
  117. query := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(nil).Iter()
  118. count := 0
  119. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  120. if count%20000 == 0 {
  121. log.Info(fmt.Sprintf("current --- %d", count))
  122. }
  123. ch <- true
  124. wg.Add(1)
  125. go func(tmp map[string]interface{}) {
  126. defer func() {
  127. <-ch
  128. wg.Done()
  129. }()
  130. if b := util.ObjToString(tmp["bid_field"]); b != "" {
  131. updateEsPool <- []map[string]interface{}{{
  132. "_id": mongodb.BsonIdToSId(tmp["_id"]),
  133. },
  134. {"bid_field": b},
  135. }
  136. }
  137. }(tmp)
  138. tmp = make(map[string]interface{})
  139. }
  140. wg.Wait()
  141. log.Info(fmt.Sprintf("over --- %d", count))
  142. }
  143. func taskIterateSql() {
  144. pool := make(chan bool, 10) //控制线程数
  145. wg := &sync.WaitGroup{}
  146. finalId := 0
  147. lastInfo := MysqlM.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY id DESC LIMIT 1", "institution_baseinfo"))
  148. if len(*lastInfo) > 0 {
  149. finalId = util.IntAll((*lastInfo)[0]["id"])
  150. }
  151. log.Info("taskIterateSql---", zap.Int("finally id: ", finalId))
  152. lastid, count := 0, 0
  153. for {
  154. util.Debug("重新查询,lastid---", lastid)
  155. q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "institution_baseinfo", lastid)
  156. rows, err := MysqlM.DB.Query(q)
  157. if err != nil {
  158. log.Error("taskIterateSql---", zap.Error(err))
  159. }
  160. columns, err := rows.Columns()
  161. if finalId == lastid {
  162. util.Debug("----finish----------", count)
  163. break
  164. }
  165. for rows.Next() {
  166. scanArgs := make([]interface{}, len(columns))
  167. values := make([]interface{}, len(columns))
  168. ret := make(map[string]interface{})
  169. for k := range values {
  170. scanArgs[k] = &values[k]
  171. }
  172. err = rows.Scan(scanArgs...)
  173. if err != nil {
  174. log.Error("taskIterateSql---", zap.Error(err))
  175. break
  176. }
  177. for i, col := range values {
  178. if v, ok := col.([]uint8); ok {
  179. ret[columns[i]] = string(v)
  180. } else {
  181. ret[columns[i]] = col
  182. }
  183. }
  184. lastid = util.IntAll(ret["id"])
  185. count++
  186. if count%500 == 0 {
  187. util.Debug("current-------", count, lastid)
  188. }
  189. pool <- true
  190. wg.Add(1)
  191. go func(tmp map[string]interface{}) {
  192. defer func() {
  193. <-pool
  194. wg.Done()
  195. }()
  196. EsSaveCache <- method(tmp)
  197. }(ret)
  198. ret = make(map[string]interface{})
  199. }
  200. _ = rows.Close()
  201. wg.Wait()
  202. }
  203. }
  204. func taskDealer() {
  205. sess := MongoTool2.GetMgoConn()
  206. defer MongoTool2.DestoryMongoConn(sess)
  207. ch := make(chan bool, 3)
  208. wg := &sync.WaitGroup{}
  209. query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Iter()
  210. count := 0
  211. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  212. if count%20000 == 0 {
  213. log.Info(fmt.Sprintf("current --- %d", count))
  214. }
  215. ch <- true
  216. wg.Add(1)
  217. go func(tmp map[string]interface{}) {
  218. defer func() {
  219. <-ch
  220. wg.Done()
  221. }()
  222. saveM := make(map[string]interface{})
  223. record := make(map[string]interface{})
  224. for _, f := range DealerField {
  225. if f == "name_id" && util.ObjToString(tmp["name_id"]) == "" {
  226. name_id := strings.ReplaceAll(uuid.NewV4().String(), "-", "")
  227. saveM[f] = name_id
  228. saveM["exists_id"] = 0
  229. record = map[string]interface{}{"name_id": name_id, "name": tmp["company_name"], "type": 2, "createtime": time.Now().Format(util.Date_Full_Layout)}
  230. } else if f == "dealer_name" {
  231. saveM[f] = tmp["company_name"]
  232. } else if f == "area_code" {
  233. if tmp["area"] != nil {
  234. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  235. }
  236. } else if f == "city_code" {
  237. if tmp["area"] != nil && tmp["city"] != nil {
  238. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  239. saveM[f] = AreaCode[c]
  240. }
  241. } else if f == "district_code" {
  242. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  243. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  244. saveM[f] = AreaCode[c]
  245. }
  246. } else if f == "business_model" {
  247. saveM[f] = tmp["business_type"]
  248. } else if f == "capital" {
  249. text := util.ObjToString(tmp["capital"])
  250. capital := ObjToMoney(text)
  251. capital = capital / 10000
  252. if capital != 0 {
  253. capital, _ = util.FormatFloat(capital, 2)
  254. } else {
  255. capital = 0
  256. }
  257. saveM[f] = capital
  258. if capital < 100 {
  259. saveM["capital_code"] = 1
  260. } else if capital >= 100 && capital < 500 {
  261. saveM["capital_code"] = 2
  262. } else if capital >= 500 && capital < 1000 {
  263. saveM["capital_code"] = 3
  264. } else if capital >= 1000 {
  265. saveM["capital_code"] = 4
  266. }
  267. } else if f == "createtime" {
  268. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  269. } else if f == "website" {
  270. tid := util.ObjToString(tmp["company_id"])
  271. std, _ := MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": tid}, map[string]interface{}{"website_url": 1})
  272. if std != nil && len(*std) > 0 && len(util.ObjToString((*std)["website_url"])) <= 255 {
  273. saveM[f] = util.ObjToString((*std)["website_url"])
  274. }
  275. } else {
  276. if tmp[f] != nil {
  277. saveM[f] = tmp[f]
  278. }
  279. }
  280. }
  281. if len(record) > 0 {
  282. saveRcPool <- record
  283. }
  284. saveBasePool <- saveM
  285. }(tmp)
  286. tmp = make(map[string]interface{})
  287. }
  288. wg.Wait()
  289. log.Info(fmt.Sprintf("over --- %d", count))
  290. }
  291. func taskEnt() {
  292. sess := MongoTool2.GetMgoConn()
  293. defer MongoTool2.DestoryMongoConn(sess)
  294. ch := make(chan bool, 3)
  295. wg := &sync.WaitGroup{}
  296. query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_company_info").Find(nil).Iter()
  297. count := 0
  298. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  299. if count%20000 == 0 {
  300. log.Info(fmt.Sprintf("current --- %d", count))
  301. }
  302. ch <- true
  303. wg.Add(1)
  304. go func(tmp map[string]interface{}) {
  305. defer func() {
  306. <-ch
  307. wg.Done()
  308. }()
  309. saveM := make(map[string]interface{})
  310. for _, f := range EntField {
  311. if f == "area_code" {
  312. if tmp["area"] != nil {
  313. saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
  314. }
  315. } else if f == "issue_date" {
  316. if util.ObjToString(tmp[f]) != "" {
  317. saveM[f] = tmp[f]
  318. }
  319. } else if f == "sourcetype" {
  320. saveM[f] = 1
  321. } else if f == "createtime" || f == "updatetime" {
  322. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  323. } else if f == "website" {
  324. tid := util.ObjToString(tmp["company_id"])
  325. std, _ := MongoTool.FindOneByField("qyxy_std", map[string]interface{}{"_id": tid}, map[string]interface{}{"website_url": 1})
  326. if std != nil && len(*std) > 0 && len(util.ObjToString((*std)["website_url"])) <= 255 {
  327. saveM[f] = util.ObjToString((*std)["website_url"])
  328. }
  329. } else {
  330. if tmp[f] != nil {
  331. saveM[f] = tmp[f]
  332. }
  333. }
  334. }
  335. saveBasePool <- saveM
  336. }(tmp)
  337. tmp = make(map[string]interface{})
  338. }
  339. wg.Wait()
  340. log.Info(fmt.Sprintf("over --- %d", count))
  341. }
  342. func taskRegister() {
  343. sess := MongoTool2.GetMgoConn()
  344. defer MongoTool2.DestoryMongoConn(sess)
  345. ch := make(chan bool, 3)
  346. wg := &sync.WaitGroup{}
  347. query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("nmpa_company").Find(nil).Iter()
  348. count := 0
  349. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  350. if count%20000 == 0 {
  351. log.Info(fmt.Sprintf("current --- %d", count))
  352. }
  353. ch <- true
  354. wg.Add(1)
  355. go func(tmp map[string]interface{}) {
  356. defer func() {
  357. <-ch
  358. wg.Done()
  359. }()
  360. saveM := make(map[string]interface{})
  361. for _, f := range RegField {
  362. if f == "dealer_id" {
  363. name := util.ObjToString(tmp["company"])
  364. info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"dealer_name": name}, "name_id", "")
  365. if info == nil || len(*info) == 0 {
  366. return
  367. }
  368. saveM[f] = (*info)["name_id"]
  369. saveM["website"] = (*info)["website"]
  370. } else if f == "company_name" {
  371. saveM[f] = tmp["company"]
  372. } else if f == "regnum" {
  373. if tmp["reg_no"] != nil {
  374. saveM[f] = tmp["reg_no"]
  375. }
  376. } else if f == "scope" {
  377. if util.ObjToString(tmp["class"]) == "生产型" {
  378. saveM[f] = tmp["product_range"]
  379. } else if util.ObjToString(tmp["class"]) == "经营型" {
  380. saveM[f] = tmp["business_range"]
  381. }
  382. } else if f == "type" {
  383. if util.ObjToString(tmp[f]) == "备案企业" {
  384. saveM[f] = 2
  385. } else if util.ObjToString(tmp[f]) == "许可企业" {
  386. saveM[f] = 1
  387. }
  388. } else if f == "approve_depart" {
  389. saveM[f] = tmp["badw"]
  390. } else if f == "approve_date" {
  391. if util.ObjToString(tmp["barq"]) != "" && util.ObjToString(tmp["barq"]) != "null" {
  392. saveM[f] = tmp["barq"]
  393. }
  394. } else if f == "validity_date" {
  395. if util.ObjToString(tmp["yxqx"]) != "" && util.ObjToString(tmp["yxqx"]) != "null" {
  396. saveM[f] = tmp["yxqx"]
  397. }
  398. } else if f == "type_address" {
  399. if util.ObjToString(tmp["class"]) == "生产型" {
  400. saveM[f] = tmp["product_address"]
  401. } else if util.ObjToString(tmp["class"]) == "经营型" {
  402. saveM[f] = tmp["business_address"]
  403. }
  404. } else if f == "createtime" {
  405. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  406. } else {
  407. if tmp[f] != nil {
  408. saveM[f] = tmp[f]
  409. }
  410. }
  411. }
  412. saveBasePool <- saveM
  413. }(tmp)
  414. tmp = make(map[string]interface{})
  415. }
  416. wg.Wait()
  417. log.Info(fmt.Sprintf("over --- %d", count))
  418. }
  419. func taskProduct() {
  420. sess := MongoTool2.GetMgoConn()
  421. defer MongoTool2.DestoryMongoConn(sess)
  422. ch := make(chan bool, 3)
  423. wg := &sync.WaitGroup{}
  424. query := sess.DB(config.Conf.DB.Mongo2.Dbname).C("zktest_mysql_product_info").Find(nil).Iter()
  425. count := 0
  426. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  427. if count%20000 == 0 {
  428. log.Info(fmt.Sprintf("current --- %d", count))
  429. }
  430. ch <- true
  431. wg.Add(1)
  432. go func(tmp map[string]interface{}) {
  433. defer func() {
  434. <-ch
  435. wg.Done()
  436. }()
  437. saveM := make(map[string]interface{})
  438. for _, f := range ProductField {
  439. if f == "dealer_id" {
  440. name := util.ObjToString(tmp["company_name"])
  441. info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"dealer_name": name}, "name_id", "")
  442. if info == nil || len(*info) == 0 {
  443. MongoTool2.Save("product_err_record", map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"])})
  444. return
  445. }
  446. saveM[f] = (*info)["name_id"]
  447. } else if f == "make_country" || f == "regist_type" {
  448. saveM[f] = util.IntAll(tmp[f])
  449. } else if f == "createtime" {
  450. saveM[f] = time.Now().Format(util.Date_Full_Layout)
  451. } else if f == "medical_equipment_class1" {
  452. saveM[f] = tmp["product_class1"]
  453. } else if f == "medical_equipment_class2" {
  454. saveM[f] = tmp["product_class2"]
  455. } else if f == "medical_equipment_class3" {
  456. saveM[f] = tmp["product_class3"]
  457. } else if f == "sdproduct_name" {
  458. saveM[f] = tmp["product_class4"]
  459. } else if f == "sdequipment_code" {
  460. if len(util.ObjToString(tmp["product_code"])) > 7 {
  461. saveM[f] = util.ObjToString(tmp["product_code"])[:7]
  462. } else {
  463. saveM[f] = tmp["product_code"]
  464. }
  465. } else {
  466. if tmp[f] != nil {
  467. saveM[f] = tmp[f]
  468. }
  469. }
  470. }
  471. saveBasePool <- saveM
  472. }(tmp)
  473. tmp = make(map[string]interface{})
  474. }
  475. wg.Wait()
  476. log.Info(fmt.Sprintf("over --- %d", count))
  477. }
  478. func taskProject() {
  479. sess := MongoTool1.GetMgoConn()
  480. defer MongoTool1.DestoryMongoConn(sess)
  481. ch := make(chan bool, 3)
  482. wg := &sync.WaitGroup{}
  483. query := sess.DB(config.Conf.DB.Mongo1.Dbname).C("projectset_medical").Find(nil).Iter()
  484. count := 0
  485. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  486. if count%20000 == 0 {
  487. log.Info(fmt.Sprintf("current --- %d", count))
  488. }
  489. ch <- true
  490. wg.Add(1)
  491. go func(tmp map[string]interface{}) {
  492. defer func() {
  493. <-ch
  494. wg.Done()
  495. }()
  496. if tmp["jg_plist"] == nil {
  497. return
  498. }
  499. saveM := make(map[string]interface{})
  500. infoid := util.ObjToString(tmp["sourceinfoid"])
  501. saveM["projectid"] = mongodb.BsonIdToSId(tmp["_id"])
  502. saveM["infoid"] = infoid
  503. saveM["jyhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", infoid))
  504. saveM["bidstatus"] = Bidstatus[util.ObjToString(tmp["bidstatus"])]
  505. saveM["bidstype"] = Bidtype[util.ObjToString(tmp["bidstype"])]
  506. saveM["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  507. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  508. if tmp["budget"] != nil {
  509. saveM["budget"], _ = util.FormatFloat(util.Float64All(tmp["budget"]), 4)
  510. }
  511. if tmp["area"] != nil {
  512. saveM["area_code"] = AreaCode[util.ObjToString(tmp["area"])]
  513. }
  514. if tmp["area"] != nil && tmp["city"] != nil {
  515. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
  516. saveM["city_code"] = AreaCode[c]
  517. }
  518. if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
  519. c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
  520. saveM["district_code"] = AreaCode[c]
  521. }
  522. for _, f := range []string{"title", "projectname", "projectcode", "purchasing", "agency", "buyer"} {
  523. if f == "purchasing" {
  524. if utf8.RuneCountInString(util.ObjToString(tmp[f])) < 20000 {
  525. saveM[f] = tmp[f]
  526. }
  527. } else {
  528. if util.ObjToString(tmp[f]) != "" {
  529. saveM[f] = tmp[f]
  530. }
  531. }
  532. }
  533. if b := util.ObjToString(tmp["buyer"]); b != "" {
  534. if eid := redis.GetStr("ent_id", b); eid != "" {
  535. saveM["buyer_id"] = strings.Split(eid, "_")[0]
  536. saveM["mi_area_code"] = strings.Split(eid, "_")[1]
  537. if cd := LvCode[strings.Split(eid, "_")[0]]; cd != "" {
  538. saveM["mi_level_code"] = cd
  539. }
  540. }
  541. }
  542. if a := util.ObjToString(tmp["agency"]); a != "" {
  543. if eid := redis.GetStr("ent_id", a); eid != "" {
  544. saveM["agency_id"] = strings.Split(eid, "_")[0]
  545. }
  546. }
  547. for _, f := range []string{"bidopentime", "zbtime", "jgtime"} {
  548. if util.IntAll(tmp[f]) > 0 {
  549. t := util.Int64All(tmp[f])
  550. saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  551. }
  552. }
  553. if t := util.Int64All(tmp["jgtime"]); t > 0 {
  554. y := time.Unix(t, 0).Year()
  555. m := time.Unix(t, 0).Month().String()
  556. saveM["year_tags"] = y
  557. saveM["month_tags"] = Month[m]
  558. saveM["quarter_tags"] = Quarter[m]
  559. }
  560. pname := make(map[string]bool) // 标的物名称 去重
  561. for _, p := range tmp["jg_plist"].([]interface{}) {
  562. p1 := p.(map[string]interface{})
  563. if name := util.ObjToString(p1["itemname"]); name != "" && !pname[name] {
  564. if utf8.RuneCountInString(name) > 255 {
  565. continue
  566. }
  567. pname[name] = true
  568. saveM1 := util.DeepCopy(saveM).(map[string]interface{})
  569. saveM1["itemname"] = name
  570. if code := redis.GetStr("bid_class", fmt.Sprintf("%s_%s", infoid, name)); code != "" {
  571. if len(code) > 7 {
  572. saveM1["sdequipment_code"] = code[:7]
  573. } else {
  574. saveM1["sdequipment_code"] = code
  575. }
  576. for k, v := range PclassCode[code] {
  577. if k == "class_1" {
  578. saveM1["medical_equipment_class1"] = v
  579. } else if k == "class_2" {
  580. saveM1["medical_equipment_class2"] = v
  581. } else if k == "class_3" {
  582. saveM1["medical_equipment_class3"] = v
  583. } else if k == "class_4" {
  584. saveM1["sdproduct_name"] = v
  585. }
  586. }
  587. }
  588. if p1["brandname"] != nil {
  589. saveM1["brandname"] = p1["brandname"]
  590. }
  591. if p1["specs"] != nil {
  592. saveM1["specs"] = p1["specs"]
  593. }
  594. if p1["model"] != nil && utf8.RuneCountInString(util.ObjToString(p1["model"])) < 200 {
  595. saveM1["model"] = p1["model"]
  596. }
  597. if p1["unitname"] != nil {
  598. saveM1["unitname"] = p1["unitname"]
  599. }
  600. if p1["number"] != nil && util.IntAll(p1["number"]) < 100000 {
  601. saveM1["number"] = util.IntAll(p1["number"])
  602. }
  603. if p1["unitprice"] != nil && util.Float64All(p1["unitprice"]) < 100000000 {
  604. saveM1["unitprice"], _ = util.FormatFloat(util.Float64All(p1["unitprice"]), 2)
  605. }
  606. if p1["totalprice"] != nil {
  607. saveM1["totalprice"], _ = util.FormatFloat(util.Float64All(p1["totalprice"]), 2)
  608. }
  609. saveBasePool <- saveM1
  610. }
  611. }
  612. // 中标信息
  613. saveW := make(map[string]interface{})
  614. if w := util.ObjToString(tmp["winner"]); w != "" {
  615. saveW["winner"] = w
  616. if eid := redis.GetStr("ent_id", w); eid != "" {
  617. saveW["winner_id"] = strings.Split(eid, "_")[0]
  618. saveW["winner_area_code"] = strings.Split(eid, "_")[1]
  619. if ccode := strings.Split(eid, "_")[2]; ccode != "" {
  620. saveW["winner_city_code"] = ccode
  621. }
  622. }
  623. if util.ObjToString(tmp["winnertel"]) != "" {
  624. saveW["contact_tel"] = tmp["winnertel"]
  625. }
  626. if util.ObjToString(tmp["winnerperson"]) != "" {
  627. saveW["contact_name"] = tmp["winnerperson"]
  628. }
  629. saveW["projectid"] = mongodb.BsonIdToSId(tmp["_id"])
  630. saveW["infoid"] = util.ObjToString(tmp["sourceinfoid"])
  631. if tmp["bidamount"] != nil {
  632. saveW["bidamount"], _ = util.FormatFloat(util.Float64All(tmp["bidamount"]), 4)
  633. }
  634. if util.IntAll(tmp["jgtime"]) > 0 {
  635. t := util.Int64All(tmp["jgtime"])
  636. saveW["jgtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  637. }
  638. saveW["is_winner"] = 1
  639. saveW["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  640. saveW["createtime"] = time.Now().Format(util.Date_Full_Layout)
  641. saveBasePool1 <- saveW
  642. }
  643. //中标候选
  644. if tmp["winnerorder"] != nil {
  645. saveW1 := make(map[string]interface{})
  646. for _, w := range tmp["winnerorder"].([]interface{}) {
  647. if util.ObjToString(w) == "" {
  648. continue
  649. }
  650. saveW1["winner"] = w
  651. if eid := redis.GetStr("ent_id", util.ObjToString(w)); eid != "" {
  652. saveW1["winner_id"] = strings.Split(eid, "_")[0]
  653. saveW1["winner_area_code"] = strings.Split(eid, "_")[1]
  654. if ccode := strings.Split(eid, "_")[2]; ccode != "" {
  655. saveW1["winner_city_code"] = ccode
  656. }
  657. }
  658. saveW1["projectid"] = mongodb.BsonIdToSId(tmp["_id"])
  659. saveW1["infoid"] = util.ObjToString(tmp["sourceinfoid"])
  660. if tmp["bidamount"] != nil {
  661. saveW1["bidamount"], _ = util.FormatFloat(util.Float64All(tmp["bidamount"]), 4)
  662. }
  663. if util.IntAll(tmp["jgtime"]) > 0 {
  664. t := util.Int64All(tmp["jgtime"])
  665. saveW["jgtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
  666. }
  667. saveW["is_winner"] = 2
  668. saveW["updatetime"] = time.Now().Format(util.Date_Full_Layout)
  669. saveW["createtime"] = time.Now().Format(util.Date_Full_Layout)
  670. saveBasePool1 <- saveW
  671. }
  672. }
  673. }(tmp)
  674. tmp = make(map[string]interface{})
  675. }
  676. wg.Wait()
  677. log.Info(fmt.Sprintf("over --- %d", count))
  678. }
  679. func method(tmp map[string]interface{}) map[string]interface{} {
  680. m := make(map[string]interface{})
  681. for k, v := range config.Conf.DB.Es.FieldM {
  682. if k == "alias" {
  683. var arr []string
  684. info := MysqlM.Find("institution_alias", map[string]interface{}{"company_id": tmp["company_id"]}, "", "", -1, -1)
  685. for _, m2 := range *info {
  686. arr = append(arr, util.ObjToString(m2["alias"]))
  687. }
  688. if len(arr) > 0 {
  689. m[k] = strings.Join(arr, ",")
  690. }
  691. } else if k == "sdequipment" {
  692. if util.ObjToString(tmp["mi_type_code"]) == "0208" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  693. m[k] = "多功能电离子手术治疗机,CO2激光治疗仪,半导体激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针、电刀、电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机"
  694. } else if util.ObjToString(tmp["mi_type_code"]) == "0201" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  695. m[k] = "牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机"
  696. } else if util.ObjToString(tmp["mi_type_code"]) == "0203" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  697. m[k] = "小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,胆红素测定仪,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,经皮给药治疗仪,儿童智能测量仪"
  698. } else if util.ObjToString(tmp["mi_type_code"]) == "12" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  699. m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏起搏器,心脏除颤器,心脏复苏机,呼吸机,儿童用呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,血压计,体温计,体重计,空气消毒机"
  700. } else if util.ObjToString(tmp["mi_type_code"]) == "01" && strings.Contains(util.ObjToString(tmp["level_code"]), "03") {
  701. m[k] = "中心负压吸引设备,中心供氧设备,多参数监护设备,心脏除颤器,心脏复苏机,呼吸机,简易呼吸器,自动洗胃机,心电图机,多功能抢救床,气管插管设备,转运车,快速血糖仪,亚低温治疗仪,冰帽,电子冰毯,微量泵,输液泵,营养输注泵,医用冰箱,空气消毒机,动态心电监测系统,有创呼吸机,动态血压监护仪,便携式血氧饱和度监护仪,多导睡眠呼吸监测仪,床边肺功能仪,便携式血气分析仪,心肺功能监测仪,胃动力检测仪,胃电治疗仪,腹水超滤仪,腹水浓缩机,人工肝,肝病治疗仪,床单位臭氧消毒机,结肠灌洗治疗仪,经皮肾镜,动态血糖监测仪,胰岛素泵,糖尿病足病诊断箱,眼底镜,空气波压力治疗仪,动态脑电监护仪,颅内压监测仪,脑水肿监测仪,半导体激光治疗仪,钴60放射治疗机,超声聚焦刀,氩氦刀,射频肿瘤治疗仪,微波热疗仪,双筒显微镜,相差显微镜,荧光显微镜,倒置显微镜,骨髓活检装置,流式细胞分析仪,细胞分离机,脑细胞介质分析仪,双光能骨密度仪,脑电超慢涨落分析仪,动脉硬化测试仪,移动式负压吸引器,换药床,乳腺微创真空旋切系统,外碎石设备,骨科牵引床,脊柱牵引床,推拿手法床,石膏床,石膏剪,石膏锯,水温箱,足底静脉泵,激光治疗仪,骨科康复设备,吸引设备,供氧设备,监护设备,呼叫系统,心脏除颤仪,抢救车,换药车,转运床,营养输注泵,防褥疮气垫,血压计,体温计,体重计,移动紫外线灯,负压病房设施,层流病房设施,心脏起搏器,便携式呼吸机,雾化器,床边支气管镜,血液动力学检测仪,电冰毯,电子冰帽,脑电图监测仪,脑功能监测仪,振动排痰器,床单元臭氧消毒机,层流净化系统,holter,主动脉球囊反搏泵,便携式超声诊断仪,食道电生理仪,诊察床,肛管直肠压力测定设备,肛门镜,肛门坐浴熏洗设备,结肠灌洗设备,肛肠综合治疗仪,痔科套扎器,肛肠内腔治疗仪,肛门肌电图,多功能电离子手术治疗机,CO2激光治疗仪,准分子激光治疗仪,微波治疗仪,生物共振检测治疗仪,过敏原检测仪,紫外线治疗仪,蓝红光痤疮治疗仪,多功能手术仪,显微镜,手术器材,高频电针,电刀,电灼器,病理切片机,红宝石激光美容仪,光子嫩肤仪,半导体激光脱毛机,中药熏洗机,母婴监护仪,妇科检查台,计划生育手术床,冲洗车,阴道镜,人流吸引器,超声诊断仪,超高频电波刀,超声聚焦治疗仪,盆腔炎治疗设备,产后康复综合治疗仪,胎心监护仪,妇科检查床,综合产床,新生儿抢救台,婴儿辐射保暖台,婴儿培养箱,电动羊水吸引器,经皮给药治疗仪,胆红素测定仪,产妇电脑综合治疗仪,消毒隔离器械柜,小儿多参数心电监护仪,小儿呼吸机,小儿吸痰器,小儿脉氧仪,小儿雾化治疗仪,复合脉冲磁性治疗仪,儿童智能测量仪,眼科治疗床,裂隙灯,眼压计,角膜曲率计,视力灯箱,客观视力仪,电脑验光仪,全自动电脑视野仪,手术显微镜,眼科AB超声仪,超声乳化治疗仪,眼底荧光造影仪,自动焦度仪,沙眼治疗仪,睫毛电解器,视觉诱发电位仪,耳鼻喉综合治疗台,耳科旋转椅,鼓气电窥耳镜,耳钻,动态喉镜,纤维喉镜,间接喉镜,直接喉镜,支撑喉镜,电子喉镜,鼻咽喉镜,间接鼻咽喉镜,前鼻镜,鼻内镜及手术系统,电测听器,前庭检查仪,眼震电图仪,声阻抗仪,五官科多用显微镜,牙科综合治疗台,石膏模拟切边机,抛光机,氦氖激光器,光敏固化灯,种植机,喷砂机,铸造机,石膏振荡器,干燥箱,全瓷/铸造烤瓷设备,超声波洁牙机,手术器械及器械车,牙科技工装置,包装机,纸塑包装封口机,清洗机,普通X光机,洗片机,透视机,移动式X光机,数字X线摄影,CR,DR,干式激光相机,X线电子计算机断层扫描装置,CT,数字减影血管造影X光机,DSA,数字化胃肠X光机,数据传输系统,LIS,全自动血细胞分析仪,尿液分析仪,尿沉渣工作站,冰点渗透压计,凝血检测仪,血糖测定仪,微量血糖测定仪,血气分析仪,干式生化分析仪,生化分析仪,发光免疫分析仪,全自动酶免免疫分析系统,酶标仪,电泳分析仪,血小板聚集仪,全自动细菌培养系统,生物培养箱,微生物鉴定药敏分析仪,血培养仪,菌落计数器,厌氧菌培养箱,幽门螺旋杆菌检测仪,氨基酸分析系统,荧光定量PCR检测系统,TCT液基细胞学检测仪,HPV-DNA检测系统,心梗三项检测仪,脑钠肽检测仪,二氧化碳培养箱,高温灭菌器,生物安全柜,血液流变仪,普通显微镜,生物显微镜,血沉仪,蛋白电泳仪,特种蛋白仪,电解质分析仪,精子分析系统,血栓弹力分析仪,血型鉴定及配血设备,纯水系统,自动洗板机,分析天平,超声清洗器,振荡器,电热培养箱,恒温水浴箱,医用冷库,医用冷藏柜,超低温冰柜,普通离心机,低速冷冻离心机,高速冷冻离心机,快速血糖仪微量泵,输液泵营养,输注泵,负压病房设施,层流病房设施,有创呼吸机便携式呼吸机,心电图机动态心电监测系统,微量泵输液泵,食道电生理仪床,单元臭氧消毒机,肛管直肠,压力测定设备,超声聚焦治疗仪,盆腔炎治疗设备,裂隙灯眼压计,眼底镜角膜曲率计视力,灯箱客观视力仪,睫毛电解器,鼓气电窥耳镜耳钻,动态喉镜纤维喉镜,间接喉镜直接喉镜,前鼻镜鼻,内镜及手术系统,电测听器前庭检查仪,抛光机氦氖激光器,光敏固化灯种植机,喷砂机铸造机石膏振荡器,干燥箱全瓷/铸造烤瓷设备,普通X光机,透视机移动式X光机,X线电子计算机,数字化胃肠X光机,数据传输系统(LIS)全自动血细胞分析仪,尿沉渣工作站冰点渗透压计,全自动细菌培养系统,生物培养箱,血沉仪蛋白电泳仪,分析天平超声清洗器,干燥箱医用冰箱,心脏复苏机,营养输注泵,血透机,自动腹透机,外碎石设备,核磁共振仪,核磁共振成像系统"
  702. }
  703. info1, _ := MongoTool.FindOneByField("20220906shebei_buchong", map[string]interface{}{"company_id": tmp["company_id"]}, map[string]interface{}{"itemname_all": 1})
  704. if info1 != nil && len(*info1) > 0 {
  705. if m[k] != nil {
  706. m[k] = fmt.Sprintf("%s,%s", m[k], (*info1)["itemname_all"])
  707. } else {
  708. m[k] = (*info1)["itemname_all"]
  709. }
  710. }
  711. info := MysqlM.FindOne("code_sdleveltypeequip", map[string]interface{}{"code": tmp["sdequipment_code"]}, "", "")
  712. if info != nil && len(*info) > 0 {
  713. if m[k] != nil {
  714. m[k] = fmt.Sprintf("%s,%s", m[k], (*info)["equipment"])
  715. } else {
  716. m[k] = (*info)["equipment"]
  717. }
  718. }
  719. } else if k == "area_code" {
  720. m[k] = tmp[k]
  721. info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
  722. if info != nil && len(*info) > 0 {
  723. m["area"] = (*info)["area"]
  724. if (*info)["city"] != nil {
  725. m["city"] = (*info)["city"]
  726. }
  727. if (*info)["district"] != nil {
  728. m["district"] = (*info)["district"]
  729. }
  730. }
  731. } else if k == "mi_type_code" {
  732. m[k] = string([]byte(util.ObjToString(tmp[k]))[:2])
  733. } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
  734. continue
  735. } else {
  736. if v == "string" {
  737. m[k] = tmp[k]
  738. } else {
  739. m[k] = util.IntAll(tmp[k])
  740. }
  741. }
  742. }
  743. return m
  744. }
  745. func taskIterateSql1() {
  746. pool := make(chan bool, 10) //控制线程数
  747. wg := &sync.WaitGroup{}
  748. finalId := 0
  749. lastInfo := MysqlB.SelectBySql(fmt.Sprintf("SELECT id FROM %s ORDER BY product_id DESC LIMIT 1", "product_baseinfo"))
  750. if len(*lastInfo) > 0 {
  751. finalId = util.IntAll((*lastInfo)[0]["product_id"])
  752. }
  753. log.Info("taskIterateSql1---", zap.Int("finally id", finalId))
  754. lastid, count := 0, 0
  755. for {
  756. util.Debug("重新查询,lastid---", lastid)
  757. q := fmt.Sprintf("SELECT * FROM %s WHERE product_id > %d ORDER BY product_id ASC limit 100000", "product_baseinfo", lastid)
  758. rows, err := MysqlB.DB.Query(q)
  759. if err != nil {
  760. log.Error("taskIterateSql1---", zap.Error(err))
  761. }
  762. columns, err := rows.Columns()
  763. if finalId == lastid {
  764. util.Debug("----finish----------", count)
  765. break
  766. }
  767. for rows.Next() {
  768. scanArgs := make([]interface{}, len(columns))
  769. values := make([]interface{}, len(columns))
  770. ret := make(map[string]interface{})
  771. for k := range values {
  772. scanArgs[k] = &values[k]
  773. }
  774. err = rows.Scan(scanArgs...)
  775. if err != nil {
  776. log.Error("taskIterateSql1---", zap.Error(err))
  777. break
  778. }
  779. for i, col := range values {
  780. if v, ok := col.([]uint8); ok {
  781. ret[columns[i]] = string(v)
  782. } else {
  783. ret[columns[i]] = col
  784. }
  785. }
  786. lastid = util.IntAll(ret["product_id"])
  787. count++
  788. if count%500 == 0 {
  789. util.Debug("current-------", count, lastid)
  790. }
  791. pool <- true
  792. wg.Add(1)
  793. go func(tmp map[string]interface{}) {
  794. defer func() {
  795. <-pool
  796. wg.Done()
  797. }()
  798. cid := util.ObjToString(tmp["company_id"])
  799. info := MysqlM.FindOne("dws_f_dealer_baseinfo", map[string]interface{}{"company_id": cid}, "name_id, dealer_name", "")
  800. if info == nil || len(*info) == 0 {
  801. MongoTool2.Save("product_err_record", map[string]interface{}{"product_id": tmp["product_id"]})
  802. return
  803. }
  804. saveM := make(map[string]interface{})
  805. saveM["dealer_id"] = (*info)["name_id"]
  806. saveM["company_name"] = (*info)["dealer_name"]
  807. saveM["sdequipment_code"] = tmp["medical_equipment_code"]
  808. saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
  809. delete(tmp, "medical_equipment_code")
  810. delete(tmp, "product_id")
  811. delete(tmp, "comeintime")
  812. saveBasePool <- tmp
  813. }(ret)
  814. ret = make(map[string]interface{})
  815. }
  816. _ = rows.Close()
  817. wg.Wait()
  818. }
  819. }
  820. func method1(tmp map[string]interface{}) map[string]interface{} {
  821. m := make(map[string]interface{})
  822. for k, v := range config.Conf.DB.Es.FieldS {
  823. if k == "business_model" {
  824. info := MysqlB.FindOne("company_business_model", map[string]interface{}{"company_id": tmp["company_id"], "company_field_code": "0101"}, "", "")
  825. if info != nil && len(*info) > 0 {
  826. m[k] = util.IntAll((*info)["business_model"])
  827. } else {
  828. m[k] = 2
  829. }
  830. } else if k == "supplier" {
  831. m[k] = tmp["company_name"]
  832. } else if k == "productlist" {
  833. var p = method2(util.ObjToString(tmp["company_id"]))
  834. if p != nil {
  835. m[k] = p
  836. }
  837. } else if k == "area_code" {
  838. if tmp[k] != nil {
  839. m[k] = tmp[k]
  840. info := MysqlB.FindOne("code_area", map[string]interface{}{"code": tmp["area_code"]}, "", "")
  841. if info != nil && len(*info) > 0 {
  842. m["area"] = (*info)["area"]
  843. if (*info)["city"] != nil {
  844. m["city"] = (*info)["city"]
  845. }
  846. if (*info)["district"] != nil {
  847. m["district"] = (*info)["district"]
  848. }
  849. }
  850. } else {
  851. log.Error("area_code", zap.Any("id", tmp["company_id"]))
  852. }
  853. } else if tmp[k] == nil || util.ObjToString(tmp[k]) == "" {
  854. continue
  855. } else {
  856. if v == "string" {
  857. m[k] = tmp[k]
  858. } else {
  859. m[k] = util.IntAll(tmp[k])
  860. }
  861. }
  862. }
  863. return m
  864. }
  865. func method2(cid string) []map[string]interface{} {
  866. var pmap []map[string]interface{}
  867. mc := make(map[string]bool) // 记录name 去重
  868. pinfo1 := MysqlM.Find("product_baseinfo", map[string]interface{}{"company_id": cid}, "", "", -1, -1)
  869. for _, m2 := range *pinfo1 {
  870. m := make(map[string]interface{})
  871. m["name"] = m2["product_name"]
  872. pmap = append(pmap, m)
  873. mc[util.ObjToString(m2["product_name"])] = true
  874. }
  875. pinfo2, _ := MongoTool.Find("bidding_p_list_0907", map[string]interface{}{"company_id": cid}, nil, nil, false, -1, -1)
  876. if len(*pinfo2) > 0 {
  877. for _, m2 := range *pinfo2 {
  878. key := util.ObjToString(m2["itemname"]) + util.ObjToString(m2["brand"]) + util.ObjToString(m2["model"])
  879. if mc[key] {
  880. continue
  881. } else {
  882. m := make(map[string]interface{})
  883. m["name"] = util.ObjToString(m2["itemname"])
  884. if m2["model"] != nil {
  885. m["model"] = m2["model"]
  886. }
  887. if m2["brand"] != nil {
  888. m["brand"] = m2["brand"]
  889. }
  890. pmap = append(pmap, m)
  891. mc[key] = true
  892. }
  893. }
  894. }
  895. return pmap
  896. }
  897. func SaveEs(i, t string) {
  898. log.Info("SaveEs---", zap.String("i", i), zap.String("t", t))
  899. arru := make([]map[string]interface{}, 500)
  900. indexu := 0
  901. for {
  902. select {
  903. case v := <-EsSaveCache:
  904. arru[indexu] = v
  905. indexu++
  906. if indexu == 500 {
  907. SP <- true
  908. go func(arru []map[string]interface{}) {
  909. defer func() {
  910. <-SP
  911. }()
  912. Es.BulkSave(i, t, &arru, false)
  913. }(arru)
  914. arru = make([]map[string]interface{}, 500)
  915. indexu = 0
  916. }
  917. case <-time.After(1000 * time.Millisecond):
  918. if indexu > 0 {
  919. SP <- true
  920. go func(arru []map[string]interface{}) {
  921. defer func() {
  922. <-SP
  923. }()
  924. Es.BulkSave(i, t, &arru, false)
  925. }(arru[:indexu])
  926. arru = make([]map[string]interface{}, 500)
  927. indexu = 0
  928. }
  929. }
  930. }
  931. }
  932. func updateEsMethod() {
  933. arru := make([][]map[string]interface{}, 200)
  934. indexu := 0
  935. for {
  936. select {
  937. case v := <-updateEsPool:
  938. arru[indexu] = v
  939. indexu++
  940. if indexu == 200 {
  941. updateEsSp <- true
  942. go func(arru [][]map[string]interface{}) {
  943. defer func() {
  944. <-updateEsSp
  945. }()
  946. Es.UpdateBulk("bidding", "bidding", arru...)
  947. }(arru)
  948. arru = make([][]map[string]interface{}, 200)
  949. indexu = 0
  950. }
  951. case <-time.After(1000 * time.Millisecond):
  952. if indexu > 0 {
  953. updateEsSp <- true
  954. go func(arru [][]map[string]interface{}) {
  955. defer func() {
  956. <-updateEsSp
  957. }()
  958. Es.UpdateBulk("bidding", "bidding", arru...)
  959. }(arru[:indexu])
  960. arru = make([][]map[string]interface{}, 200)
  961. indexu = 0
  962. }
  963. }
  964. }
  965. }