main.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package main
  2. import (
  3. "mongodb"
  4. "qfw/util"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. var (
  10. MongoTool *mongodb.MongodbSim
  11. StatusMap map[string]string
  12. WordsArr []string
  13. updatePool chan []map[string]interface{}
  14. updateSp chan bool
  15. )
  16. func init() {
  17. MongoTool = &mongodb.MongodbSim{
  18. MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
  19. Size: 10,
  20. DbName: "mixdata",
  21. UserName: "SJZY_RWESBid_Other",
  22. Password: "SJZY@O17t8herB3B",
  23. }
  24. MongoTool.InitPool()
  25. updatePool = make(chan []map[string]interface{}, 2000)
  26. updateSp = make(chan bool, 4)
  27. StatusMap = map[string]string{
  28. "正常": "存续",
  29. "其他": "存续",
  30. "未注销": "吊销",
  31. "个体转企业": "存续",
  32. }
  33. WordsArr = []string{"研发", "研制", "开发", "生产", "制造", "制作", "加工", "种植"}
  34. }
  35. func main() {
  36. go updateMethod()
  37. sess := MongoTool.GetMgoConn()
  38. defer MongoTool.DestoryMongoConn(sess)
  39. ch := make(chan bool, 5)
  40. wg := &sync.WaitGroup{}
  41. field := map[string]interface{}{"company_status": 1, "company_type": 1, "company_phone": 1, "company_email": 1, "business_scope": 1, "employees": 1,
  42. "annual_reports": 1, "company_name": 1, "company_type_old": 1}
  43. query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(field).Limit(3000).Sort("-updatetime").Iter()
  44. count := 0
  45. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  46. if count%5000 == 0 {
  47. util.Debug("current ---", count)
  48. }
  49. ch <- true
  50. wg.Add(1)
  51. go func(tmp map[string]interface{}) {
  52. defer func() {
  53. <-ch
  54. wg.Done()
  55. }()
  56. updata := make(map[string]interface{})
  57. // company_status
  58. text := util.ObjToString(tmp["company_status"])
  59. if text != "" {
  60. updata["company_status"] = StatusMap[text]
  61. }
  62. // employee_name
  63. var ename []string
  64. if emp, ok := tmp["employees"].([]interface{}); ok {
  65. for _, v := range emp {
  66. v1 := v.(map[string]interface{})
  67. if util.ObjToString(v1["employee_name"]) != "" {
  68. ename = append(ename, util.ObjToString(v1["employee_name"]))
  69. }
  70. }
  71. }
  72. if len(ename) > 0 {
  73. updata["employee_name"] = strings.Join(ename, ",")
  74. }
  75. // bid_unittype 厂商
  76. flag := false
  77. for _, v := range WordsArr {
  78. if strings.Contains(util.ObjToString(tmp["business_scope"]), v) {
  79. flag = true
  80. break
  81. }
  82. }
  83. if flag {
  84. updata["bid_unittype"] = []string{"厂商"}
  85. }
  86. // bid_contracttype
  87. var types []string
  88. if phone := util.ObjToString(tmp["company_phone"]); phone != "" {
  89. if len(phone) == 11 {
  90. types = append(types, "手机号")
  91. } else {
  92. types = append(types, "固定电话")
  93. }
  94. }
  95. if util.ObjToString(tmp["company_email"]) != "" {
  96. types = append(types, "邮箱")
  97. }
  98. if len(types) == 0 {
  99. types = append(types, "不存在")
  100. }
  101. updata["bid_contracttype"] = types
  102. // website_url
  103. if annualReports, ok := tmp["annual_reports"].([]interface{}); ok && len(annualReports) > 0 {
  104. L:
  105. for _, v := range annualReports {
  106. v1 := v.(map[string]interface{})
  107. if v1["report_websites"] != nil {
  108. if reportWebsites, o := tmp["report_websites"].([]interface{}); o && len(reportWebsites) > 0 {
  109. for _, m := range reportWebsites {
  110. m1 := m.(map[string]interface{})
  111. if util.ObjToString(m1["website_url"]) != "" {
  112. updata["website_url"] = util.ObjToString(m1["website_url"])
  113. break L
  114. }
  115. }
  116. }
  117. }
  118. }
  119. }
  120. // search_type
  121. if t := util.ObjToString(tmp["company_type"]); t != "" {
  122. if t != "个体工商户" && t != "其他" {
  123. t1 := util.ObjToString(tmp["company_type_old"])
  124. name := util.ObjToString(tmp["company_name"])
  125. if strings.Contains(t1, "有限合伙") {
  126. updata["search_type"] = "有限合伙"
  127. } else if strings.Contains(t1, "合伙") {
  128. updata["search_type"] = "普通合伙"
  129. } else if strings.Contains(name, "股份") ||
  130. (strings.Contains(t1, "上市") && !strings.Contains(t1, "非上市")) {
  131. updata["search_type"] = "股份有限公司"
  132. } else {
  133. updata["search_type"] = "有限责任公司"
  134. }
  135. }
  136. }
  137. save := []map[string]interface{}{{
  138. "_id": tmp["_id"],
  139. },
  140. {"$set": updata},
  141. }
  142. updatePool <- save
  143. }(tmp)
  144. tmp = make(map[string]interface{})
  145. }
  146. wg.Wait()
  147. util.Debug("over ---", count)
  148. c := make(chan bool, 1)
  149. <-c
  150. }
  151. func updateMethod() {
  152. arru := make([][]map[string]interface{}, 1000)
  153. indexu := 0
  154. for {
  155. select {
  156. case v := <-updatePool:
  157. arru[indexu] = v
  158. indexu++
  159. if indexu == 1000 {
  160. updateSp <- true
  161. go func(arru [][]map[string]interface{}) {
  162. defer func() {
  163. <-updateSp
  164. }()
  165. MongoTool.UpSertBulk("qyxy_std", arru...)
  166. }(arru)
  167. arru = make([][]map[string]interface{}, 1000)
  168. indexu = 0
  169. }
  170. case <-time.After(1000 * time.Millisecond):
  171. if indexu > 0 {
  172. updateSp <- true
  173. go func(arru [][]map[string]interface{}) {
  174. defer func() {
  175. <-updateSp
  176. }()
  177. MongoTool.UpSertBulk("qyxy_std", arru...)
  178. }(arru[:indexu])
  179. arru = make([][]map[string]interface{}, 1000)
  180. indexu = 0
  181. }
  182. }
  183. }
  184. }