main.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/tealeg/xlsx"
  6. "go.mongodb.org/mongo-driver/bson"
  7. mgo "mongodb"
  8. "os"
  9. qu "qfw/util"
  10. "sync"
  11. "time"
  12. "util"
  13. )
  14. var (
  15. Mgo *mgo.MongodbSim
  16. )
  17. func init() {
  18. Mgo = &mgo.MongodbSim{
  19. MongodbAddr: "127.0.0.1:27084", // 127.0.0.1:27084
  20. Size: 5,
  21. DbName: "jyqykhfw",
  22. }
  23. Mgo.InitPool()
  24. }
  25. func ShowTable() {
  26. fmt.Println("================================")
  27. fmt.Println("小工具-数据处理")
  28. fmt.Println("1、导入清洗数据")
  29. fmt.Println("2、推送数据")
  30. fmt.Println("3、联通数据判重")
  31. fmt.Println("4、业务数据判重")
  32. fmt.Println("0、EXIT")
  33. fmt.Println("================================")
  34. }
  35. func main() {
  36. for {
  37. ShowTable()
  38. var flag int
  39. fmt.Print("请输入你的操作:")
  40. fmt.Scan(&flag)
  41. if flag == 0 {
  42. fmt.Println("退出成功")
  43. os.Exit(0)
  44. } else if flag == 1 {
  45. fmt.Println("请输入文档地址.")
  46. var p string
  47. fmt.Scan(&p)
  48. if p != "" {
  49. task1(p)
  50. }
  51. } else if flag == 2 {
  52. fmt.Println("推送数据...")
  53. task2()
  54. } else if flag == 3 {
  55. fmt.Println("请输入文档地址,进行数据判重.")
  56. var p string
  57. fmt.Scan(&p)
  58. if p != "" {
  59. task3()
  60. }
  61. } else if flag == 4 {
  62. fmt.Println("请输入表名,进行数据判重.")
  63. var coll, stime, etime, sType string
  64. fmt.Scan(&coll)
  65. if coll != "" {
  66. fmt.Println("请输入开始时间, 格式:2006-01-02")
  67. fmt.Scan(&stime)
  68. if stime != "" {
  69. fmt.Println("请输入结束时间, 格式:2006-01-02")
  70. fmt.Scan(&etime)
  71. if etime != "" {
  72. fmt.Println("请输入排序方式,1正序、-1倒序")
  73. fmt.Scan(&sType)
  74. if sType != "" {
  75. task4(coll, stime, etime, sType)
  76. }
  77. } else {
  78. fmt.Println("请输入结束时间, 格式:2006-01-02")
  79. }
  80. } else {
  81. fmt.Println("请输入开始时间, 格式:2006-01-02")
  82. }
  83. }
  84. }
  85. }
  86. }
  87. var FieldsMap = map[string]string{
  88. "是否优选": "is_push",
  89. "运营商中标标签": "tagname",
  90. "主体公司": "tagname2",
  91. "中标人": "s_winner",
  92. "招标人": "buyer",
  93. "中标金额": "bidamount",
  94. "是否为多标多包数据": "multipackage",
  95. }
  96. func task1(path string) {
  97. if path != "" {
  98. Mgo.Update("f_sourceinfo_chinaunicom_zb_data", nil, bson.M{"$unset": bson.M{"tag": ""}}, false, true)
  99. file, err := xlsx.OpenFile(path)
  100. if err != nil {
  101. panic(err)
  102. }
  103. sheet := file.Sheets[0]
  104. count := 0
  105. idcolnum := -1
  106. cellFieldName := map[int]string{}
  107. lastid := ""
  108. c1 := 0 // 多包第n条数据
  109. for rn, row := range sheet.Rows {
  110. update := make(map[string]interface{})
  111. del := make(map[string]interface{})
  112. if rn == 0 {
  113. for index, cell := range row.Cells {
  114. if cell.Value == "唯一标识" || cell.Value == "标讯编码(infoID)" { //id所在列
  115. idcolnum = index
  116. }
  117. if v := FieldsMap[cell.Value]; v != "" {
  118. cellFieldName[index] = v
  119. }
  120. }
  121. if idcolnum == -1 {
  122. break
  123. }
  124. continue
  125. } else {
  126. id := row.Cells[idcolnum].String()
  127. id = util.SE.DecodeString(id)
  128. for i, f := range cellFieldName {
  129. if val := row.Cells[i].Value; val != "" {
  130. if f == "is_push" {
  131. update[f] = qu.IntAll(val)
  132. } else if f == "multipackage" {
  133. update[fmt.Sprintf("v_baseinfo.%s", f)] = qu.IntAll(val)
  134. } else if f == "bidamount" {
  135. update[fmt.Sprintf("v_baseinfo.%s", f)] = qu.Float64All(val)
  136. } else {
  137. update[fmt.Sprintf("v_baseinfo.%s", f)] = val
  138. }
  139. } else {
  140. if f != "is_push" {
  141. del[fmt.Sprintf("v_baseinfo.%s", f)] = "1"
  142. }
  143. }
  144. }
  145. if qu.IntAll(update["v_baseinfo.multipackage"]) == 1 {
  146. if c1 == 0 {
  147. count++
  148. }
  149. if c1 > 0 && lastid == id {
  150. info, _ := Mgo.FindById("f_sourceinfo_chinaunicom_zb_data", id, bson.M{"v_baseinfo.tagname": 1, "v_baseinfo.tagname2": 1, "v_baseinfo.s_winner": 1,
  151. "v_baseinfo.bidamount": 1, "v_baseinfo.package": 1})
  152. if len(*info) > 0 {
  153. baseinfo := (*info)["v_baseinfo"].(map[string]interface{})
  154. if baseinfo["package"] != nil {
  155. packageM := baseinfo["package"].(map[string]interface{})
  156. m := make(map[string]interface{})
  157. if update["v_baseinfo.bidamount"] != nil {
  158. m["bidamount"] = qu.Float64All(update["v_baseinfo.bidamount"])
  159. }
  160. if update["v_baseinfo.s_winner"] != nil {
  161. m["s_winner"] = qu.ObjToString(update["v_baseinfo.s_winner"])
  162. }
  163. packageM[fmt.Sprint(c1)] = map[string]interface{}{"winner_all": append([]interface{}{}, m)}
  164. update["v_baseinfo.package"] = packageM
  165. }
  166. if s := qu.ObjToString(baseinfo["tagname"]); s != "" {
  167. update["v_baseinfo.tagname"] = s + "," + qu.ObjToString(update["v_baseinfo.tagname"])
  168. }
  169. if s := qu.ObjToString(baseinfo["tagname2"]); s != "" {
  170. update["v_baseinfo.tagname2"] = s + "," + qu.ObjToString(update["v_baseinfo.tagname2"])
  171. }
  172. if s := qu.ObjToString(baseinfo["s_winner"]); s != "" {
  173. update["v_baseinfo.s_winner"] = s + "," + qu.ObjToString(update["v_baseinfo.s_winner"])
  174. }
  175. update["v_baseinfo.bidamount"] = qu.Float64All(update["v_baseinfo.bidamount"]) + qu.Float64All(baseinfo["bidamount"])
  176. c1++
  177. }
  178. } else {
  179. c1 = 0
  180. packageM := make(map[string]interface{})
  181. m := make(map[string]interface{})
  182. if update["v_baseinfo.bidamount"] != nil {
  183. m["bidamount"] = qu.Float64All(update["v_baseinfo.bidamount"])
  184. }
  185. if update["v_baseinfo.s_winner"] != nil {
  186. m["s_winner"] = qu.ObjToString(update["v_baseinfo.s_winner"])
  187. }
  188. packageM[fmt.Sprint(c1)] = map[string]interface{}{"winner_all": append([]interface{}{}, m)}
  189. update["v_baseinfo.package"] = packageM
  190. c1++
  191. }
  192. } else {
  193. count++
  194. c1 = 0
  195. }
  196. // 临时
  197. update["tag"] = "临时"
  198. lastid = id
  199. qu.Debug(update)
  200. if len(del) > 0 {
  201. Mgo.UpdateById("f_sourceinfo_chinaunicom_zb_data", id, bson.M{"$set": update, "$unset": del})
  202. } else {
  203. Mgo.UpdateById("f_sourceinfo_chinaunicom_zb_data", id, bson.M{"$set": update})
  204. }
  205. }
  206. }
  207. qu.Debug(fmt.Sprintf("更新数据成功,更新: %d条", count))
  208. os.Exit(0)
  209. }
  210. }
  211. func task2() {
  212. sess := Mgo.GetMgoConn()
  213. defer Mgo.DestoryMongoConn(sess)
  214. ch := make(chan bool, 5)
  215. wg := &sync.WaitGroup{}
  216. q := bson.M{"tag": "临时"}
  217. query := sess.DB(Mgo.DbName).C("f_sourceinfo_chinaunicom_zb_data").Find(q).Select(nil).Iter()
  218. count := 0
  219. for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
  220. ch <- true
  221. wg.Add(1)
  222. go func(tmp map[string]interface{}) {
  223. defer func() {
  224. <-ch
  225. wg.Done()
  226. }()
  227. info := tmp["v_baseinfo"].(map[string]interface{})
  228. info["createtime"] = time.Now().Unix()
  229. if qu.ObjToString(info["id"]) == "" {
  230. info["id"] = tmp["id"]
  231. }
  232. info["isOptimization"] = 1
  233. delete(info, "field_source")
  234. delete(info, "regions_log")
  235. Mgo.Save("tmp_usermail", info)
  236. }(tmp)
  237. tmp = make(map[string]interface{})
  238. }
  239. wg.Wait()
  240. qu.Debug(fmt.Sprintf("推送数据成功,推送成功: %d条", count))
  241. os.Exit(0)
  242. }
  243. func task3() {
  244. var path string
  245. flag.StringVar(&path, "f", "", "文件路径")
  246. flag.Parse()
  247. if path != "" {
  248. file, err := xlsx.OpenFile(path)
  249. if err != nil {
  250. panic(err)
  251. }
  252. sheet := file.Sheets[0]
  253. cellFieldName := map[int]string{}
  254. for rn, row := range sheet.Rows {
  255. if rn == 0 {
  256. for index, cell := range row.Cells {
  257. //if cell.Value == "招标人" || cell.Value == "中标人" || cell.Value == "中标金额" {
  258. // qu.Debug(cell.Value, index)
  259. //}
  260. if v := FieldsMap[cell.Value]; v != "" {
  261. cellFieldName[index] = v
  262. }
  263. }
  264. } else {
  265. q := bson.M{}
  266. for i, f := range cellFieldName {
  267. if val := row.Cells[i].Value; val != "" && (f == "s_winner" || f == "buyer") {
  268. q[f] = val
  269. }
  270. if val := row.Cells[i].Value; val != "" && f == "bidamount" {
  271. if qu.Float64All(val) != 0 {
  272. q[f] = qu.Float64All(val)
  273. }
  274. }
  275. }
  276. info, _ := Mgo.FindOne("zglt_history", q)
  277. if len(*info) > 0 {
  278. row.Cells[12].SetValue(-1)
  279. } else {
  280. row.Cells[12].SetValue(1)
  281. }
  282. }
  283. }
  284. err = file.Save(path)
  285. } else {
  286. flag.PrintDefaults()
  287. }
  288. }