main.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/xuri/excelize/v2"
  6. "go.uber.org/zap"
  7. utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  12. "net"
  13. "os"
  14. "strings"
  15. "time"
  16. )
  17. var (
  18. Sysconfig map[string]interface{} //配置文件
  19. Mgo *mongodb.MongodbSim
  20. Mgo181 *mongodb.MongodbSim
  21. Dbname string
  22. Dbcoll string
  23. Es *elastic.Elastic
  24. Es2 *elastic.Elastic
  25. Index string
  26. //Itype string
  27. EsFields []string
  28. yangMap = make(map[string]bool) //存储98家央企
  29. yangChildMap = make(map[string]bool) //存储央企 下属子公司
  30. //Updatetime int64
  31. localPort string // 本地监听端口
  32. UdpClient udp.UdpClient
  33. updatePool = make(chan []map[string]interface{}, 20000) // 更新qyxy_std 国标行业分类
  34. )
  35. var EsSaveCache = make(chan map[string]interface{}, 5000)
  36. var SP = make(chan bool, 5)
  37. func init() {
  38. utils.ReadConfig(&Sysconfig)
  39. //utils.ReadConfig("test.json", &Sysconfig)
  40. Dbname = Sysconfig["dbname"].(string) //
  41. Dbcoll = Sysconfig["dbcoll"].(string) //qyxy_std
  42. //qyxy_std
  43. Mgo = &mongodb.MongodbSim{
  44. MongodbAddr: Sysconfig["mgodb"].(string),
  45. Size: utils.IntAllDef(Sysconfig["dbsize"], 5),
  46. DbName: Dbname,
  47. UserName: Sysconfig["uname"].(string),
  48. Password: Sysconfig["upwd"].(string),
  49. }
  50. Mgo.InitPool()
  51. //181
  52. if utils.ObjToString(Sysconfig["mgo181"]) != "" {
  53. Mgo181 = &mongodb.MongodbSim{
  54. MongodbAddr: utils.ObjToString(Sysconfig["mgo181"]),
  55. //MongodbAddr: "127.0.0.1:27001",
  56. DbName: "mixdata",
  57. Size: 10,
  58. UserName: "",
  59. Password: "",
  60. //Direct: true,
  61. }
  62. Mgo181.InitPool()
  63. }
  64. //es
  65. econf := Sysconfig["elastic"].(map[string]interface{})
  66. Index = econf["index"].(string)
  67. //Itype = econf["itype"].(string)
  68. Es = &elastic.Elastic{
  69. S_esurl: econf["addr"].(string),
  70. I_size: utils.IntAllDef(econf["pool"], 12),
  71. Username: econf["username"].(string),
  72. Password: econf["password"].(string),
  73. }
  74. Es.InitElasticSize()
  75. //集群2
  76. if utils.ObjToString(econf["addr2"]) != "" {
  77. Es2 = &elastic.Elastic{
  78. S_esurl: econf["addr2"].(string),
  79. I_size: utils.IntAllDef(econf["pool"], 12),
  80. Username: econf["username2"].(string),
  81. Password: econf["password2"].(string),
  82. }
  83. Es2.InitElasticSize()
  84. }
  85. EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{}))
  86. //Updatetime = utils.Int64All(Sysconfig["updatetime"])
  87. localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
  88. UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
  89. InitLog()
  90. readXlsx()
  91. }
  92. func InitLog() {
  93. err := log.InitLog(
  94. log.Path("./logs/log.out"),
  95. log.Level("info"),
  96. log.Compress(true),
  97. log.MaxSize(10),
  98. log.MaxBackups(10),
  99. log.MaxAge(7),
  100. log.Format("json"),
  101. )
  102. if err != nil {
  103. fmt.Printf("InitLog failed: %v\n", err)
  104. os.Exit(1)
  105. }
  106. }
  107. func main() {
  108. UdpClient.Listen(processUdpMsg)
  109. log.Info("main", zap.String("Udp服务监听", localPort))
  110. //go StdAll()
  111. go SaveEs()
  112. go updateMethod()
  113. ch := make(chan bool, 1)
  114. <-ch
  115. }
  116. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  117. switch act {
  118. case udp.OP_TYPE_DATA:
  119. var mapInfo map[string]interface{}
  120. err := json.Unmarshal(data, &mapInfo)
  121. if err != nil {
  122. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  123. }
  124. log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
  125. if mapInfo != nil {
  126. //相应UDP回答
  127. key := utils.ObjToString(mapInfo["key"])
  128. if key == "" {
  129. key = "udpok"
  130. }
  131. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  132. }
  133. if tasktype, ok := mapInfo["stype"].(string); ok {
  134. switch tasktype {
  135. case "stdall":
  136. go StdAll()
  137. default:
  138. fmt.Println("tasktype", tasktype)
  139. }
  140. } else {
  141. //拿到同步信号,开始同步数据
  142. if _, ok := mapInfo["start"]; ok {
  143. var start_time, end_time int64
  144. if _, ok2 := mapInfo["start_time"]; ok2 {
  145. start_time = utils.Int64All(mapInfo["start_time"])
  146. end_time = utils.Int64All(mapInfo["end_time"])
  147. }
  148. var q map[string]interface{}
  149. if start_time > 0 {
  150. if end_time > 0 {
  151. q = map[string]interface{}{
  152. "updatetime": map[string]interface{}{
  153. "$gte": start_time,
  154. "$lte": end_time,
  155. },
  156. }
  157. } else {
  158. q = map[string]interface{}{
  159. "updatetime": map[string]interface{}{
  160. "$gte": start_time,
  161. },
  162. }
  163. }
  164. go StdAdd(q) //读取qyxy_std 数据,放入es 数组
  165. } else {
  166. fmt.Println("参数 start_time 为0")
  167. }
  168. }
  169. }
  170. default:
  171. log.Info("processUdpMsg", zap.String("mapinfo", string(data)))
  172. }
  173. }
  174. // updateMethod 更新MongoDB
  175. func updateMethod() {
  176. updateSp := make(chan bool, 10)
  177. arru := make([][]map[string]interface{}, 500)
  178. indexu := 0
  179. for {
  180. select {
  181. case v := <-updatePool:
  182. arru[indexu] = v
  183. indexu++
  184. if indexu == 500 {
  185. updateSp <- true
  186. go func(arru [][]map[string]interface{}) {
  187. defer func() {
  188. <-updateSp
  189. }()
  190. Mgo.UpdateBulk("qyxy_std", arru...)
  191. }(arru)
  192. arru = make([][]map[string]interface{}, 500)
  193. indexu = 0
  194. }
  195. case <-time.After(100 * time.Millisecond):
  196. if indexu > 0 {
  197. updateSp <- true
  198. go func(arru [][]map[string]interface{}) {
  199. defer func() {
  200. <-updateSp
  201. }()
  202. Mgo.UpdateBulk("qyxy_std", arru...)
  203. }(arru[:indexu])
  204. arru = make([][]map[string]interface{}, 500)
  205. indexu = 0
  206. }
  207. }
  208. }
  209. }
  210. // readXlsx 读取央企
  211. func readXlsx() {
  212. filePath := "央企.xlsx"
  213. // 1. 读取 Excel(获取 A 列数据)
  214. f, err := excelize.OpenFile(filePath)
  215. if err != nil {
  216. log.Fatal("❌ 无法打开 Excel 文件:", zap.Error(err))
  217. }
  218. defer f.Close()
  219. //读取央企
  220. rows, err := f.GetRows("Sheet1")
  221. if err != nil {
  222. log.Fatal("❌ 无法读取 Sheet1:", zap.Error(err))
  223. }
  224. for i := 1; i < len(rows); i++ {
  225. name := rows[i][0]
  226. if name != "" {
  227. yangMap[name] = true
  228. }
  229. }
  230. // 央企下属
  231. rows2, err := f.GetRows("Sheet2")
  232. if err != nil {
  233. log.Fatal("❌ 无法读取 Sheet2:", zap.Error(err))
  234. }
  235. for i := 1; i < len(rows2); i++ {
  236. name := rows2[i][1]
  237. if name != "" {
  238. yangChildMap[name] = true
  239. }
  240. }
  241. }
  242. // getCompanyType 获取公司类型;央企、国企、央企下属、事业单位、民企
  243. func getCompanyType(name, ctype string) (company_type string) {
  244. if name == "" {
  245. return
  246. }
  247. if yangMap[name] {
  248. company_type = "央企"
  249. return
  250. }
  251. if yangChildMap[name] {
  252. company_type = "央企"
  253. return
  254. }
  255. if strings.Contains(ctype, "国有独资") || strings.Contains(ctype, "国有控股") ||
  256. ctype == "全民所有制" || ctype == "集体所有制" || ctype == "全民所有制分支机构(非法人)" ||
  257. ctype == "集体分支机构(非法人)" {
  258. company_type = "国企"
  259. return
  260. }
  261. company_type = "其他"
  262. return
  263. }