main.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/nats-io/nats.go"
  5. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  7. "log"
  8. "time"
  9. )
  10. type MsgInfo struct {
  11. Id string //消息唯一id
  12. CurrSetp string //当前步骤
  13. NextSetp string //下个步骤,特殊流程增加
  14. IsEnd int //当前流程后结束 1-结束
  15. Data map[string]interface{} //数据内容
  16. Extend struct { //有需要按照示例增加
  17. Extract struct { //抽取
  18. }
  19. Repeat struct { //判重
  20. SId string //原始id
  21. RId string //被替换id
  22. }
  23. MgoSave struct { //mgo保存更新
  24. SType string //更新u 保存s
  25. col string //表
  26. }
  27. EsSave struct { //es保存更新
  28. SType string //更新u 保存s
  29. Index string //索引
  30. IsHistory bool //是否是历史数据
  31. }
  32. }
  33. Err string //错误信息 有错误会告警并终止流程
  34. Stime int64
  35. Etime int64
  36. }
  37. func main() {
  38. //go receiveA()
  39. //go receiveB()
  40. var addr = "192.168.3.207:4223"
  41. jn := jnats.NewJnats(addr)
  42. //data := MsgInfo{
  43. // Id: "5c191c1fa5cb26b9b7320459",
  44. // CurrSetp: "first",
  45. // Data: map[string]interface{}{
  46. // "title": "日照市土地收储测绘中介机构库建设项目公开招标公告",
  47. // "detail": "五星品牌认证,打倒共产主义,腐败中国,<table><br/><tbody><tr> <td> <br/></td> </tr> <tr> <td></td> </tr> <tr> <td><br/></td></tr><tr><td colspan=\"2\">日照市土地收储测绘中介机构库建设项目公开招标公告<br/></td></tr><tr><td>一、采购人:日照市国土资源局    地址:日照市泰安路59号</td></tr><tr><td>        联系方式:0633-8776932</td></tr><tr><td>        采购代理机构:日照路达招标代理有限公司    地址:日照市山东路1000号华润置地广场1号楼13楼</td></tr><tr><td>        联系方式:0633-8010707</td></tr><tr><td>二、采购项目名称:日照市土地收储测绘中介机构库建设项目</td></tr><tr><td>        采购项目编号(采购计划编号):SDGP371100201802000165</td></tr><tr><td>        采购项目分包情况:</td></tr><tr><td><table border=\"1\"><tbody><tr><td>标包</td><td>货物服务名称</td><td>数量</td><td>投标人资格要求</td><td>本包预算金额(最高限价,单位:万元)</td></tr><tr><td>A</td><td>日照市土地收储测绘中介机构库建设项目 </td><td>1 </td><td>1.供应商须为在中华人民共和国境内注册的独立法人单位或其他组织,能独立承担民事责任和合同义务,能在国内合法提供采购内容及其相应的服务;2.供应商须具备丁级及以上测绘资质;3.供应商参加政府采购活动前3年内在经营活动中没有重大违法记录;4.由采购代理机构通过“信用中国”网站(www.creditchina.gov.cn)、中国政府采购网(www.ccgp.gov.cn)、“信用山东”(www.creditsd.gov.cn)、“全国法院失信被执行人名单信息公布与查询”平台(shixin.court.gov.cn)现场统一查询供应商信用记录,未被列入失信被执行人名单、重大税收违法案件当事人名单、政府采购严重违法失信行为记录名单;5.本次招标不接受供应商以联合体形式投标。供应商应符合《中华人民共和国政府采购法》第二十二条规定。6.单位负责人为同一人或者存在控股、管理关系的不同单位,或同一母公司的子公司,不得参加同一标段投标或者未划分标段的同一招标项目投标。 </td><td>0.000100 </td></tr></tbody></table></td></tr><tr><td>三、需求公示(见附件)</td></tr><tr><td></td></tr><tr><td>四、获取招标文件</td></tr><tr><td>        1.时间:2018年12月19日8时30分至2018年12月25日17时30分(报名截止时间)(北京时间,法定节假日除外)</td></tr><tr><td>        2.地点:供应商登录日照市公共资源交易网(http://www.rzggzyjy.gov.cn/)凭企业数字证书(CA)身份认证密匙下载招标文件(格式为*.word),此为获取招标文件唯一途径。</td></tr><tr><td>        3.方式:自行下载。企业未领取企业数字证书(CA)的,到日照市公共资源交易中心二楼窗口办理。凡有意参加本次政府采购的供应商必须在中国山东政府采购网进行注册并备案。供应商请访问中国山东政府采购网(http://www.ccgp-shandong.gov.cn),在报名截止时间前注册并备案。</td></tr><tr><td>        4.售价:无</td></tr><tr><td>五、公告期限:2018年12月19日 至 2018年12月25日</td></tr><tr><td>六、递交投标文件时间及地点</td></tr><tr><td>        1.时间:2019年1月8日14时0分至2019年1月8日14时30分(北京时间)</td></tr><tr><td>        2.地点:日照市公共资源交易中心(日照市国际金融中心B座)四楼第六开标室。</td></tr><tr><td>七、开标时间及地点</td></tr><tr><td>        1.时间:2019年1月8日14时30分(北京时间)</td></tr><tr><td>        2.地点:日照市公共资源交易中心(日照市国际金融中心B座)四楼第六开标室</td></tr><tr><td>八、采购项目联系方式:</td></tr><tr><td>        联系人:张强        联系方式:0633-8010707</td></tr><tr><td>九、采购项目的用途、数量、简要技术要求等<br/>        详见招标文件</td></tr><tr><td>十、采购项目需要落实的政府采购政策<br/>        本项目需落实的节能环保、中小微型企业扶持、监狱企业扶持、残疾人福利性企业等政府采购政策详见招标文件。</td></tr> <br/><tr> <td></td> </tr> <tr> <td>发布人:张强</td> </tr> <tr> <td>发布时间:2018年12月18日 23时40分04秒</td> </tr> <tr> <td></td> </tr> <tr> <td>   相关信息 <br/></td><br/></tr> <tr><td>1、日照市国土资源局日照市土地收储测绘中介机构库建设项目需求公开</td></tr><tr><td>2、日照市土地收储测绘中介机构库建设项目公开招标公告</td></tr><br/></tbody></table>",
  48. // "channel": "市县采购公告",
  49. // "infoformat": 1,
  50. // "extracttype": 0,
  51. // "buyer": "日照市国土资源局",
  52. // "purchasing": "收储测绘中介机构库",
  53. // "projectname": "日照市土地收储测绘中介机构库建设项目",
  54. // "spidercode": "gx_gxzzzzqzfcg_dzmchtgg",
  55. // "projectscope": "朵力·迎宾大道(F6-3地块)工程C1#楼~C6#楼(办公楼、商业裙房)、车库(部分)的平基土石方、基础、主体、初装修、给排水、强弱电、外墙保温及装饰、综合管网、消防等施工。",
  56. // },
  57. //}
  58. //data := MsgInfo{
  59. // Id: "658b7ce566cf0db42a395756",
  60. // Data: map[string]interface{}{
  61. // "title": "中铁二局青岛工程有限公司烟台市福山区城乡供水一体化项目涉铁段定向钻工程施工专业分包招标",
  62. // "city": "郑州",
  63. // "purchasing": "预埋钢板",
  64. // "detail": " 中铁二局青岛工程有限公司烟台市福山区城乡供水一体化项目涉铁段定向钻工程施工专业分包招标 <br><br><br>采购单位:<br>None <br><br>发布时间:<br>2023-12-27 09:09 <br><br>响应截止时间:<br>2023-12-31 10:00 <table > <col > <col > <col > <col > <col > <col > <col > <col > <col > <thead > <tr > <th nzwidth=\"50px\" >序号</th> <th >标的编号</th> <th >标的名称</th> <th >规格型号</th> <th >数量</th> <th >计量单位</th> <th >技术标准及要求</th> <th >交货期</th> <th >送货地址</th> </tr> </thead> <tbody > <tr > <td >1</td> <td > 3800186285068</td> <td >预埋钢板</td> <td > 700×400×25mm</td> <td >224.0</td> <td >块</td> <td ></td> <td ></td> <td >东北区-辽宁省-沈阳市-于洪区-马三家镇</td> </tr> <tr > <td >2</td> <td > 3800186360465</td> <td >预埋钢板</td> <td > 750×470×25mm</td> <td >200.0</td> <td >块</td> <td ></td> <td ></td> <td >东北区-辽宁省-沈阳市-于洪区-马三家镇</td> </tr> </tbody> </table> ",
  65. // "site": "中铁鲁班商务网",
  66. // "area": "河南",
  67. // "competehref": "#",
  68. // "spidercode": "a_ztlbsww_zbgg",
  69. // "channel": "采购公告-招标采购",
  70. // "extracttype": 1,
  71. // "href": "https://eproport.crecgec.com/#/notice/notice-detail?projectId=1739610805157965826&tenantId=2&indexnumber=0",
  72. // "toptype": "结果",
  73. // "infoformat": 1.0,
  74. // "dataprocess": 8,
  75. // "keywords": "",
  76. // "buyerclass": "其它",
  77. // "basicClass": "工程",
  78. // "description": "中铁二局青岛工程有限公司烟台市福山区城乡供水一体化项目涉铁段定向钻工程施工专业分包招标采购单位:None发布时间:2023-12-2709:09响应截止时间:2023-12-3110:00序号标的编号标的名称规格型号数量计量单位技术标准及要",
  79. // "district": "于洪区",
  80. // "projectname": "中铁二局青岛工程有限公司烟台市福山区城乡供水一体化项目涉铁段定向钻工程施工",
  81. // "purchasing_tag": "工程施工,预埋,钢板",
  82. // //"s_subscopeclass": "建筑工程_工程施工",
  83. // //"s_topscopeclass": "建筑工程",
  84. // "multipackage": 0,
  85. // //"subscopeclass": []string{"建筑工程_工程施工"},
  86. // //"topscopeclass": []string{"建筑工程t"},
  87. // },
  88. // Extend: struct { //有需要按照示例增加
  89. // Extract struct { //抽取
  90. // }
  91. // Repeat struct { //判重
  92. // SId string //原始id
  93. // RId string //被替换id
  94. // }
  95. // MgoSave struct { //mgo保存更新
  96. // SType string //更新u 保存s
  97. // col string //表
  98. // }
  99. // EsSave struct { //es保存更新
  100. // SType string //更新u 保存s
  101. // Index string //索引
  102. // IsHistory bool //是否是历史数据
  103. // }
  104. // }{
  105. // EsSave: struct {
  106. // SType string
  107. // Index string
  108. // IsHistory bool
  109. // }{
  110. // SType: "u",
  111. // Index: "bidding",
  112. // IsHistory: false,
  113. // },
  114. // },
  115. //}
  116. //MgoB := &mongodb.MongodbSim{
  117. // //MongodbAddr: "172.17.189.140:27080",
  118. // MongodbAddr: "127.0.0.1:27083",
  119. // Size: 10,
  120. // DbName: "qfw",
  121. // UserName: "SJZY_RWbid_ES",
  122. // Password: "SJZY@B4i4D5e6S",
  123. // Direct: true,
  124. //}
  125. //MgoB.InitPool()
  126. MgoP := &mongodb.MongodbSim{
  127. //MongodbAddr: "172.17.189.140:27080",
  128. MongodbAddr: "192.168.3.206:27002",
  129. Size: 10,
  130. DbName: "qfw_data",
  131. UserName: "root",
  132. Password: "root",
  133. Direct: true,
  134. }
  135. MgoP.InitPool()
  136. dataP, _ := MgoP.FindById("projectset", "658d30ab66cf0db42a4035f6", nil)
  137. data := MsgInfo{
  138. Id: "658d30ab66cf0db42a4035f6",
  139. Data: *dataP,
  140. Extend: struct { //有需要按照示例增加
  141. Extract struct { //抽取
  142. }
  143. Repeat struct { //判重
  144. SId string //原始id
  145. RId string //被替换id
  146. }
  147. MgoSave struct { //mgo保存更新
  148. SType string //更新u 保存s
  149. col string //表
  150. }
  151. EsSave struct { //es保存更新
  152. SType string //更新u 保存s
  153. Index string //索引
  154. IsHistory bool //是否是历史数据
  155. }
  156. }{
  157. EsSave: struct {
  158. SType string
  159. Index string
  160. IsHistory bool
  161. }{
  162. SType: "u",
  163. Index: "project",
  164. IsHistory: false,
  165. },
  166. },
  167. }
  168. bytes, _ := json.Marshal(&data)
  169. go func() {
  170. for i := 0; i < 10; i++ {
  171. log.Println("iiii", i)
  172. //rep, err := jn.PubReqZip("dataprocess.buyerclass", bytes, time.Minute*10)
  173. rep, err := jn.PubReqZip("dataprocess.essave", bytes, time.Minute*10)
  174. //rep, err := jn.PubReqZip("dataprocess.scopeclass", bytes, time.Minute*10)
  175. //rep, err := jn.PubReqZip("dataprocess.topclass", bytes, time.Minute*10)
  176. if err != nil {
  177. log.Println(err) //消息重试机制...
  178. } else {
  179. log.Println("收到回复消息:", string(rep.Data))
  180. return
  181. }
  182. time.Sleep(500 * time.Millisecond)
  183. }
  184. }()
  185. select {}
  186. }
  187. func receiveA() {
  188. var addr = "192.168.3.207:4223"
  189. jn := jnats.NewJnats(addr)
  190. //先消费,带zip压缩,用于跨网传输节省流量
  191. jn.SubZip("dataprocess.topclass", func(msg *nats.Msg) {
  192. //log.Println(string(msg.Data))
  193. //回执消息
  194. //msg.Respond([]byte("receive msg:" + string(msg.Data)))
  195. var res = make(map[string]interface{}, 0)
  196. json.Unmarshal(msg.Data, &res)
  197. msg.Respond([]byte("ok"))
  198. log.Println("A 收到 test 数据:", res)
  199. })
  200. jn.SubZip("bidding", func(msg *nats.Msg) {
  201. //log.Println(string(msg.Data))
  202. //回执消息
  203. //msg.Respond([]byte("receive msg:" + string(msg.Data)))
  204. var res = make(map[string]interface{}, 0)
  205. json.Unmarshal(msg.Data, &res)
  206. msg.Respond([]byte("ok"))
  207. log.Println("A 收到 bidding 数据:", res)
  208. })
  209. }
  210. func receiveB() {
  211. var addr = "192.168.3.207:4223"
  212. jn := jnats.NewJnats(addr)
  213. //先消费,带zip压缩,用于跨网传输节省流量
  214. jn.SubZip("test", func(msg *nats.Msg) {
  215. //log.Println(string(msg.Data))
  216. //回执消息
  217. //msg.Respond([]byte("receive msg:" + string(msg.Data)))
  218. var res = make(map[string]interface{}, 0)
  219. json.Unmarshal(msg.Data, &res)
  220. msg.Respond([]byte("ok"))
  221. log.Println("B 收到数据:", res)
  222. })
  223. }