main.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package main
  2. import (
  3. "fmt"
  4. "mongodb"
  5. "qfw/util"
  6. es "qfw/util/elastic"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. MongoTool *mongodb.MongodbSim
  14. Es *es.Elastic
  15. EsSaveCache chan map[string]interface{}
  16. SP chan bool
  17. EsField = []string{"_id", "company_name", "history_name", "updatetime", "legal_person", "legal_person_certno", "credit_no", "org_code", "tax_code",
  18. "company_code", "area_code", "company_area", "company_city", "company_district", "currency", "capital", "company_type", "company_status",
  19. "establish_date", "issue_date", "lastupdatetime", "operation_startdate", "operation_enddate", "cancel_date", "revoke_date", "authority",
  20. "company_address", "business_scope", "partners", "employees", "stock_name", "company_phone", "company_email", "website_url", "search_type",
  21. "employee_name", "company_type_old"}
  22. TypeMap = map[string]string{
  23. "采购单位": "1",
  24. "投标企业": "2",
  25. "代理机构": "3",
  26. "厂商": "4",
  27. }
  28. TypeMap1 = map[string]string{
  29. "固定电话": "1",
  30. "手机号": "2",
  31. "邮箱": "3",
  32. "不存在": "4",
  33. }
  34. )
  35. func init() {
  36. MongoTool = &mongodb.MongodbSim{
  37. MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
  38. Size: 10,
  39. DbName: "mixdata",
  40. UserName: "SJZY_RWESBid_Other",
  41. Password: "SJZY@O17t8herB3B",
  42. //MongodbAddr: "192.168.3.207:27092",
  43. //Size: 10,
  44. //DbName: "wjh",
  45. }
  46. MongoTool.InitPool()
  47. Es = &es.Elastic{
  48. S_esurl: "http://172.17.145.170:9800", // http://172.17.145.170:9800
  49. I_size: 10,
  50. }
  51. Es.InitElasticSize()
  52. EsSaveCache = make(chan map[string]interface{}, 1000)
  53. SP = make(chan bool, 5)
  54. }
  55. func main() {
  56. go SaveEs()
  57. sess := MongoTool.GetMgoConn()
  58. defer MongoTool.DestoryMongoConn(sess)
  59. ch := make(chan bool, 1)
  60. wg := &sync.WaitGroup{}
  61. query := sess.DB("mixdata").C("qyxy_std").Find(nil).Select(nil).Skip(6110000).Iter()
  62. count := 0
  63. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  64. if count%5000 == 0 {
  65. util.Debug("current ---", count)
  66. }
  67. ch <- true
  68. wg.Add(1)
  69. go func(tmp map[string]interface{}) {
  70. defer func() {
  71. <-ch
  72. wg.Done()
  73. }()
  74. if p1, ok := tmp["partners"].([]interface{}); ok {
  75. p2 := p1[0].(map[string]interface{})
  76. if p2["stock_capital"] != "" {
  77. taskinfo(tmp)
  78. }
  79. }
  80. }(tmp)
  81. tmp = make(map[string]interface{})
  82. }
  83. wg.Wait()
  84. util.Debug("over ---", count)
  85. c := make(chan bool, 1)
  86. <-c
  87. }
  88. func taskinfo(tmp map[string]interface{}) {
  89. esMap := make(map[string]interface{})
  90. for _, v := range EsField {
  91. if tmp[v] != nil {
  92. esMap[v] = tmp[v]
  93. }
  94. }
  95. esMap["name"] = tmp["company_name"]
  96. company_type := util.ObjToString(tmp["company_type"])
  97. company_name := util.ObjToString(tmp["company_name"])
  98. if company_type == "个体工商户" {
  99. if len([]rune(company_name)) >= 5 {
  100. esMap["company_type_int"] = 31
  101. } else {
  102. esMap["company_type_int"] = 32
  103. }
  104. } else if company_type == "其他" || company_type == "" {
  105. if len([]rune(company_name)) >= 4 {
  106. esMap["company_type_int"] = 21
  107. } else {
  108. esMap["company_type_int"] = 22
  109. }
  110. } else {
  111. if company_type == "内资分公司" {
  112. esMap["company_type_int"] = 12
  113. } else if len([]rune(company_name)) >= 4 {
  114. esMap["company_type_int"] = 11
  115. } else {
  116. esMap["company_type_int"] = 13
  117. }
  118. }
  119. if pname, ok := tmp["bid_projectname"].([]interface{}); ok {
  120. p1 := util.ObjArrToStringArr(pname)
  121. esMap["bid_projectname"] = strings.Join(p1, ",")
  122. }
  123. if pur, ok := tmp["bid_purchasing"].([]interface{}); ok {
  124. p1 := util.ObjArrToStringArr(pur)
  125. esMap["bid_purchasing"] = strings.Join(p1, ",")
  126. }
  127. if areas, ok := tmp["bid_area"].([]interface{}); ok {
  128. p1 := util.ObjArrToStringArr(areas)
  129. esMap["bid_area"] = strings.Join(p1, ",")
  130. }
  131. if t1, ok := tmp["bid_unittype"].([]interface{}); ok {
  132. var arr []string
  133. for _, v := range util.ObjArrToStringArr(t1) {
  134. arr = append(arr, TypeMap[v])
  135. }
  136. esMap["bid_unittype"] = strings.Join(arr, ",")
  137. }
  138. if t2, ok := tmp["bid_contracttype"].([]interface{}); ok {
  139. var arr []string
  140. for _, v := range util.ObjArrToStringArr(t2) {
  141. arr = append(arr, TypeMap1[v])
  142. }
  143. esMap["bid_contracttype"] = strings.Join(arr, ",")
  144. }
  145. if p1, ok := tmp["partners"].([]interface{}); ok {
  146. for _, v := range p1 {
  147. v1 := v.(map[string]interface{})
  148. if text := util.ObjToString(v1["stock_capital"]); text != "" {
  149. if c1, err := strconv.ParseFloat(text, 64); err == nil {
  150. c1, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", c1), 64) //保留小数点两位
  151. v1["stock_capital"] = c1
  152. } else {
  153. delete(v1, "stock_capital")
  154. }
  155. }
  156. if text := util.ObjToString(v1["stock_realcapital"]); text != "" {
  157. if c2, err := strconv.ParseFloat(text, 64); err == nil {
  158. c2, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", c2), 64) //保留小数点两位
  159. v1["stock_realcapital"] = c2
  160. } else {
  161. delete(v1, "stock_realcapital")
  162. }
  163. }
  164. }
  165. }
  166. EsSaveCache <- esMap
  167. }
  168. func SaveEs() {
  169. util.Debug("Es Save...")
  170. arru := make([]map[string]interface{}, 500)
  171. indexu := 0
  172. for {
  173. select {
  174. case v := <-EsSaveCache:
  175. arru[indexu] = v
  176. indexu++
  177. if indexu == 500 {
  178. SP <- true
  179. go func(arru []map[string]interface{}) {
  180. defer func() {
  181. <-SP
  182. }()
  183. Es.BulkSave("qyxy_v2", "qyxy", &arru, false)
  184. }(arru)
  185. arru = make([]map[string]interface{}, 500)
  186. indexu = 0
  187. }
  188. case <-time.After(1000 * time.Millisecond):
  189. if indexu > 0 {
  190. SP <- true
  191. go func(arru []map[string]interface{}) {
  192. defer func() {
  193. <-SP
  194. }()
  195. Es.BulkSave("qyxy_v2", "qyxy", &arru, false)
  196. }(arru[:indexu])
  197. arru = make([]map[string]interface{}, 500)
  198. indexu = 0
  199. }
  200. }
  201. }
  202. }