Przeglądaj źródła

1、udp段落支持
2、合并段落
3、监测上下节点
4、熔断跳过信号支持(udp-stype-stop-start)

zhengkun 1 rok temu
rodzic
commit
8ce1fed17f
12 zmienionych plików z 409 dodań i 23 usunięć
  1. 13 1
      config.json
  2. 11 12
      extract/extract.go
  3. 82 0
      extract/test.go
  4. 10 0
      go.mod
  5. 19 0
      go.sum
  6. 7 3
      main.go
  7. 1 0
      prompt/prompt.go
  8. 40 0
      udp/udpmail.go
  9. 126 0
      udp/udprocess.go
  10. 92 0
      udp/udptask.go
  11. 6 0
      ul/attr.go
  12. 2 7
      ul/init.go

+ 13 - 1
config.json

@@ -1,4 +1,9 @@
 {
+  "udpport": ":1777",
+  "smail": {
+    "to": "zhengkun@topnet.net.cn,xuzhiheng@topnet.net.cn",
+    "api": "http://172.17.145.179:19281/_send/_mail"
+  },
   "s_mgo": {
     "local": true,
     "l_addr": "127.0.0.1:12005",
@@ -14,5 +19,12 @@
     "dbname" : "mixdata",
     "username": "zhengkun",
     "password": "zk@123123"
-  }
+  },
+  "nextNode": [
+    {
+      "addr": "127.0.0.1",
+      "port": 1784,
+      "stype": ""
+    }
+  ]
 }

+ 11 - 12
extract/extract.go

@@ -4,7 +4,6 @@ import (
 	"data_ai/clean"
 	"data_ai/prompt"
 	"data_ai/ul"
-	"fmt"
 	log "github.com/donnie4w/go-logger/logger"
 	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"sync"
@@ -12,10 +11,16 @@ import (
 )
 
 // 抽取结构字段
-func ExtractFieldInfo(name string, s_name string) {
+func ExtractFieldInfo(sid string, eid string, name string) {
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  ul.StringTOBsonId(sid),
+			"$lte": ul.StringTOBsonId(eid),
+		},
+	}
 	pool_mgo := make(chan bool, 50)
 	wg_mgo := &sync.WaitGroup{}
-	dataArr, _ := ul.SourceMgo.Find(name, map[string]interface{}{}, nil, nil)
+	dataArr, _ := ul.SourceMgo.Find(name, q, nil, nil)
 	for k, v := range dataArr {
 		if k%100 == 0 {
 			log.Debug(k, "~", ul.BsonTOStringId(v["_id"]))
@@ -33,18 +38,12 @@ func ExtractFieldInfo(name string, s_name string) {
 			}()
 			tmpid := ul.BsonTOStringId(v["_id"])
 			data := ResolveInfo(v)
-			//最终结果...
-			ul.SourceMgo.Save(s_name, map[string]interface{}{
-				"_id":    v["_id"],
-				"href":   v["href"],
-				"jyhref": fmt.Sprintf(ul.Url, qu.CommonEncodeArticle("content", tmpid)),
-				"zhipu":  data,
-				"num":    v["num"],
-			})
+			if data == nil || tmpid == "" {
+
+			}
 		}(v)
 	}
 	wg_mgo.Wait()
-	log.Debug("is over ...")
 }
 
 // 获取处理数据...

+ 82 - 0
extract/test.go

@@ -1,10 +1,15 @@
 package extract
 
 import (
+	"data_ai/prompt"
 	"data_ai/ul"
 	"fmt"
 	log "github.com/donnie4w/go-logger/logger"
+	new_xlsx "github.com/tealeg/xlsx/v3"
 	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"os"
+	"sync"
+	"unicode/utf8"
 )
 
 func TestSingleFieldInfo(name string, tmpid string) {
@@ -16,6 +21,77 @@ func TestSingleFieldInfo(name string, tmpid string) {
 	}
 }
 
+func TestPackageInfo() {
+	query := map[string]interface{}{
+		"new_pkg": map[string]interface{}{
+			"$exists": 1,
+		},
+	}
+	dataArr, _ := ul.SourceMgo.Find("ai_41411_zhipu", query, nil, map[string]interface{}{})
+	log.Debug("查询数量...", len(dataArr))
+
+	os.Remove("test.xlsx")
+	f := new_xlsx.NewFile()
+	sheet, _ := f.AddSheet("数据信息")
+	row := sheet.AddRow()
+	writeRow(row, []string{"唯一标识", "站点", "剑鱼链接", "子包名称", "子包单位", "子包金额"})
+	for _, v := range dataArr {
+		tmpid := ul.BsonTOStringId(v["_id"])
+		ttt := ul.SourceMgo.FindById("ai_41411", tmpid)
+		site := qu.ObjToString(ttt["site"])
+		jyhref := fmt.Sprintf(ul.Url, qu.CommonEncodeArticle("content", tmpid))
+		p_info := *qu.ObjToMap(v["new_pkg"])
+		p_arr := ul.IsMarkInterfaceMap(p_info["分包信息"])
+		for _, v1 := range p_arr {
+			row = sheet.AddRow()
+			arr := []string{}
+			arr = append(arr, tmpid)
+			arr = append(arr, site)
+			arr = append(arr, jyhref)
+			arr = append(arr, qu.ObjToString(v1["包项目名称"]))
+			arr = append(arr, qu.ObjToString(v1["中标单位"]))
+			arr = append(arr, qu.ObjToString(v1["中标金额"]))
+			writeRow(row, arr)
+		}
+	}
+	if err := f.Save("test.xlsx"); err != nil {
+		fmt.Println("保存xlsx失败:", err)
+	} else {
+		fmt.Println("保存xlsx成功:", err)
+	}
+	log.Debug("is over ...")
+
+	return
+	//分包判断,获取信息
+	pool_mgo := make(chan bool, 80)
+	wg_mgo := &sync.WaitGroup{}
+	for k, v := range dataArr {
+		if k%10 == 0 {
+			log.Debug(k, "~", v["_id"])
+		}
+		pool_mgo <- true
+		wg_mgo.Add(1)
+		go func(v map[string]interface{}) {
+			defer func() {
+				<-pool_mgo
+				wg_mgo.Done()
+			}()
+			tmpid := ul.BsonTOStringId(v["_id"])
+			data := ul.SourceMgo.FindById("ai_41411", tmpid)
+			if detail := qu.ObjToString(data["detail"]); utf8.RuneCountInString(detail) > 100 {
+				pkg := prompt.AcquireMultiplePackageInfo(detail)
+				//最终结果...
+				ul.SourceMgo.UpdateById("ai_41411_zhipu", tmpid, map[string]interface{}{
+					"$set": map[string]interface{}{
+						"new_pkg": pkg,
+					},
+				})
+			}
+		}(v)
+	}
+	wg_mgo.Wait()
+}
+
 // 更新链接
 func TestUpdateJyhref(name string) {
 	dataArr, _ := ul.SourceMgo.Find(name, map[string]interface{}{}, nil, map[string]interface{}{"_id": 1})
@@ -30,3 +106,9 @@ func TestUpdateJyhref(name string) {
 	}
 	log.Debug("is over ...")
 }
+
+func writeRow(row *new_xlsx.Row, arr []string) {
+	for _, v := range arr {
+		row.AddCell().Value = v
+	}
+}

+ 10 - 0
go.mod

@@ -16,21 +16,30 @@ require (
 	github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
 	github.com/dchest/captcha v1.0.0 // indirect
 	github.com/donnie4w/gofer v0.0.0-20240219061552-aad2cd80fd6e // indirect
+	github.com/frankban/quicktest v1.14.6 // indirect
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
 	github.com/golang/snappy v0.0.4 // indirect
+	github.com/google/btree v1.0.0 // indirect
+	github.com/google/go-cmp v0.6.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/josharian/intern v1.0.0 // indirect
 	github.com/klauspost/compress v1.17.0 // indirect
+	github.com/kr/pretty v0.3.1 // indirect
+	github.com/kr/text v0.2.0 // indirect
 	github.com/magiconair/properties v1.8.7 // indirect
 	github.com/mailru/easyjson v0.7.7 // indirect
 	github.com/mitchellh/mapstructure v1.5.0 // indirect
 	github.com/montanaflynn/stats v0.7.1 // indirect
 	github.com/olivere/elastic/v7 v7.0.32 // indirect
 	github.com/pelletier/go-toml/v2 v2.1.0 // indirect
+	github.com/peterbourgon/diskv/v3 v3.0.1 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
 	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
+	github.com/rogpeppe/fastuuid v1.2.0 // indirect
+	github.com/rogpeppe/go-internal v1.9.0 // indirect
 	github.com/sagikazarmark/locafero v0.4.0 // indirect
 	github.com/sagikazarmark/slog-shim v0.1.0 // indirect
+	github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa // indirect
 	github.com/sirupsen/logrus v1.9.3 // indirect
 	github.com/sourcegraph/conc v0.3.0 // indirect
 	github.com/spf13/afero v1.11.0 // indirect
@@ -39,6 +48,7 @@ require (
 	github.com/spf13/viper v1.18.2 // indirect
 	github.com/stretchr/testify v1.8.4 // indirect
 	github.com/subosito/gotenv v1.6.0 // indirect
+	github.com/tealeg/xlsx/v3 v3.3.7 // indirect
 	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
 	github.com/xdg-go/scram v1.1.2 // indirect
 	github.com/xdg-go/stringprep v1.0.4 // indirect

+ 19 - 0
go.sum

@@ -18,6 +18,7 @@ github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZx
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@@ -57,6 +58,8 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
+github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
+github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -95,25 +98,36 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
 github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
 github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
 github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
 github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
 github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
 github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
 github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
 github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
+github.com/peterbourgon/diskv/v3 v3.0.1 h1:x06SQA46+PKIUftmEujdwSEpIx8kR+M9eLYsUxeYveU=
+github.com/peterbourgon/diskv/v3 v3.0.1/go.mod h1:kJ5Ny7vLdARGU3WUuy6uzO6T0nb/2gWcT1JiBvRmb5o=
+github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/profile v1.5.0 h1:042Buzk+NhDI+DeSAA62RwJL8VAuZUMQZUjCsRz1Mug=
+github.com/pkg/profile v1.5.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s=
+github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
 github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
 github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ=
 github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4=
 github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE=
 github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
+github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa h1:2cO3RojjYl3hVTbEvJVqrMaFmORhL6O06qdW42toftk=
+github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa/go.mod h1:Yjr3bdWaVWyME1kha7X0jsz3k2DgXNa1Pj3XGyUAbx8=
 github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
 github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
 github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
@@ -141,6 +155,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
 github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
+github.com/tealeg/xlsx/v3 v3.3.7 h1:clTKeQJORkctZNvQp+94juO19nj1b12snyX6PFT8iZY=
+github.com/tealeg/xlsx/v3 v3.3.7/go.mod h1:KV4FTFtvGy0TBlOivJLZu/YNZk6e0Qtk7eOSglWksuA=
 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
 github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
@@ -239,6 +255,7 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX
 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
 golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
@@ -283,6 +300,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
+gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
 gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=

+ 7 - 3
main.go

@@ -1,19 +1,23 @@
 package main
 
 import (
+	"data_ai/udp"
 	"data_ai/ul"
 )
 
 func init() {
 	ul.InitGlobalVar()
-	ul.InitPCD()
+	udp.InitProcessVar()
+
 }
 
 func main() {
 
+	//extract.TestPackageInfo()
 	//extract.ExtractFieldInfo("ai_294", "ai_294_zhipu_test")
-
 	//extract.ExtractFieldInfo("ai_41411", "ai_41411_zhipu")
-
 	//extract.TestSingleFieldInfo("zktest_ai_107", "6699292566cf0db42a555c57")
+
+	lock := make(chan bool)
+	<-lock
 }

+ 1 - 0
prompt/prompt.go

@@ -83,6 +83,7 @@ func AcquireIsPackageInfo(detail string) bool {
 func AcquireMultiplePackageInfo(detail string) map[string]interface{} {
 	content := PromptMultiplePackageText(detail)
 	zp := PostZhiPuInfo(content)
+	return zp
 	//后续在转格式...暂时先输出两个值
 	pkg := map[string]interface{}{}
 	s_winner, s_bidamount := "", 0.0

+ 40 - 0
udp/udpmail.go

@@ -0,0 +1,40 @@
+package udp
+
+import (
+	"data_ai/ul"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net"
+	"net/http"
+	"sync"
+)
+
+// 邮件下节点响应
+var udptaskmap = &sync.Map{}
+
+type udpNode struct {
+	data      []byte
+	addr      *net.UDPAddr
+	timestamp int64
+}
+
+var tomail string
+var api string
+
+func sendErrMailApi(title, body string) {
+	jkmail, _ := ul.SysConfig["smail"].(map[string]interface{})
+	if jkmail != nil {
+		tomail, _ = jkmail["to"].(string)
+		api, _ = jkmail["api"].(string)
+	}
+	log.Println(tomail, api)
+	res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, title, body))
+	if err == nil {
+		defer res.Body.Close()
+		read, err := ioutil.ReadAll(res.Body)
+		log.Println("邮件发送成功:", string(read), err)
+	} else {
+		log.Println("邮件发送失败:", err)
+	}
+}

+ 126 - 0
udp/udprocess.go

@@ -0,0 +1,126 @@
+package udp
+
+import (
+	"data_ai/ul"
+	"encoding/json"
+	log "github.com/donnie4w/go-logger/logger"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
+	"net"
+	"sync"
+	"time"
+)
+
+var (
+	nextNode                   []map[string]interface{}
+	udpclient                  mu.UdpClient
+	udplock, nextlock          sync.Mutex
+	responselock, getasklock   sync.Mutex
+	lastNodeResponse           int64
+	taskList                   []map[string]interface{}
+	isGetask, isAction, isStop bool
+)
+
+func InitProcessVar() {
+	//初始化···
+	isGetask = false
+	isStop = false
+	nextNode = qu.ObjArrToMapArr(ul.SysConfig["nextNode"].([]interface{}))
+	lastNodeResponse = time.Now().Unix()
+	taskList = []map[string]interface{}{}
+
+	//执行监控
+	go lastUdpMonitoring()
+	go nextUdpMonitoring()
+	go getRepeatTask()
+
+	//监听···
+	updport := ul.SysConfig["udpport"].(string)
+	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
+	udpclient.Listen(ProcessUdpMsg)
+	log.Debug("Udp服务监听", updport)
+}
+
+// udp接收
+func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	switch act {
+	case mu.OP_TYPE_DATA:
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		if err != nil {
+			udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
+		} else if mapInfo != nil {
+			sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
+			stype := qu.ObjToString(mapInfo["stype"])
+			if stype == "monitor" {
+				log.Debug("收到监测......")
+				key := qu.ObjToString(mapInfo["key"])
+				udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+				return
+			}
+			if stype == "stop" {
+				log.Debug("收到停止信号...")
+				isStop = true
+			}
+			if stype == "start" {
+				log.Debug("收到开始信号...")
+				isStop = false
+			}
+			if sid == "" || eid == "" {
+				log.Debug("接收id段异常-err ", "sid=", sid, ",eid=", eid)
+			} else {
+				lastNodeResponse = time.Now().Unix()
+				key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
+				go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
+				udplock.Lock()
+				if isStop { //停止信号...
+					for {
+						if len(taskList) == 0 {
+							break
+						}
+						log.Debug("检测到停止信号...等待任务结束...")
+						time.Sleep(time.Second * 30)
+					}
+					log.Debug("检测到停止信号...通知下节点...")
+					sendNextNode(sid, eid)
+				} else { //插入任务...
+					taskList = append(taskList, map[string]interface{}{
+						"sid": sid,
+						"eid": eid,
+					})
+					log.Debug("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
+				}
+				udplock.Unlock()
+			}
+		}
+	case mu.OP_NOOP: //下个节点回应
+		nextlock.Lock()
+		str := string(data)
+		udptaskmap.Delete(str)
+		log.Debug("其他节点回应:", str)
+		nextlock.Unlock()
+	}
+}
+
+// 下节点发送
+func sendNextNode(sid string, eid string) {
+	for _, node := range nextNode {
+		key := sid + "-" + eid + "-" + qu.ObjToString(node["stype"])
+		by, _ := json.Marshal(map[string]interface{}{
+			"gtid":  sid,
+			"lteid": eid,
+			"stype": qu.ObjToString(node["stype"]),
+			"key":   key,
+		})
+		addr := &net.UDPAddr{
+			IP:   net.ParseIP(node["addr"].(string)),
+			Port: qu.IntAll(node["port"]),
+		}
+		udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //发送下节点
+
+		new_node := &udpNode{by, addr, time.Now().Unix()}
+		udptaskmap.Store(key, new_node) //监控···节点
+	}
+	log.Debug("udp通知抽取完成...通知下阶段udp-敏感词,补城市", sid, "~", eid)
+	isGetask = false //此段落彻底完毕~继续获取任务
+}

+ 92 - 0
udp/udptask.go

@@ -0,0 +1,92 @@
+package udp
+
+import (
+	"data_ai/extract"
+	"fmt"
+	log "github.com/donnie4w/go-logger/logger"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"time"
+)
+
+// 监听-获取-分发任务
+func getRepeatTask() {
+	for {
+		if len(taskList) > 0 && !isGetask {
+			getasklock.Lock()
+			isGetask = true
+			len_list := len(taskList)
+			if len_list > 1 {
+				first_id := qu.ObjToString(taskList[0]["sid"])
+				end_id := qu.ObjToString(taskList[len_list-1]["eid"])
+				if first_id != "" && end_id != "" {
+					taskList = taskList[len_list:]
+					log.Debug("合并段落~正常~", first_id, "~", end_id, "~剩余任务池~", len(taskList), taskList)
+					extract.ExtractFieldInfo(first_id, end_id, "bidding")
+					log.Debug("AI识别数据完成...发送下节点udp...")
+					sendNextNode(first_id, end_id)
+				} else {
+					log.Debug("合并段落~错误~正常取段落~~~")
+					mapInfo := taskList[0]
+					if mapInfo != nil {
+						taskList = taskList[1:]
+						log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
+						sid := qu.ObjToString(mapInfo["sid"])
+						eid := qu.ObjToString(mapInfo["eid"])
+						extract.ExtractFieldInfo(sid, eid, "bidding")
+						log.Debug("AI识别数据完成...发送下节点udp...")
+						sendNextNode(sid, eid)
+					} else {
+						sendErrMailApi("AI识别接收段落错误", "获取任务段落异常...跳过段落...")
+						isGetask = false
+					}
+				}
+			} else {
+				mapInfo := taskList[0]
+				if mapInfo != nil {
+					taskList = taskList[1:]
+					log.Debug("获取任务段处理中~~~剩余任务池~~~", len(taskList), taskList)
+					sid := qu.ObjToString(mapInfo["sid"])
+					eid := qu.ObjToString(mapInfo["eid"])
+					extract.ExtractFieldInfo(sid, eid, "bidding")
+					log.Debug("AI识别数据完成...发送下节点udp...")
+					sendNextNode(sid, eid)
+				} else {
+					sendErrMailApi("AI识别获取段落错误", "获取任务段落异常...跳过段落...")
+					isGetask = false
+				}
+			}
+			getasklock.Unlock()
+		} else {
+			time.Sleep(10 * time.Second)
+		}
+	}
+}
+
+// 监控~上节点~长时间未响应
+func lastUdpMonitoring() {
+	for {
+		responselock.Lock()
+		if !isGetask && time.Now().Unix()-lastNodeResponse >= 1800 {
+			sendErrMailApi("AI识别程序~流程超时~告警", fmt.Sprintf("半小时~没有新段落数据进入流程...请检查..."))
+			lastNodeResponse = time.Now().Unix() //重置时间
+		}
+		responselock.Unlock()
+		time.Sleep(600 * time.Second)
+	}
+}
+
+// 监控~下节点
+func nextUdpMonitoring() {
+	for {
+		udptaskmap.Range(func(k, v interface{}) bool {
+			now := time.Now().Unix()
+			node, _ := v.(*udpNode)
+			if now-node.timestamp > 120 {
+				udptaskmap.Delete(k)
+				sendErrMailApi("AI识别程序~下节点未响应~警告", fmt.Sprintf("下节点~数据清洗~未及时响应...请检查..."))
+			}
+			return true
+		})
+		time.Sleep(10 * time.Second)
+	}
+}

+ 6 - 0
ul/attr.go

@@ -11,6 +11,12 @@ var (
 	MaxLen             = 3000
 	RulesPname         = []*ExtReg{}
 )
+
+type ExtReg struct {
+	Reg     *regexp.Regexp
+	Replace string
+}
+
 var (
 	//标准化地域信息
 	S_ProvinceDict map[string][]S_Province //标准省份-map

+ 2 - 7
ul/init.go

@@ -5,17 +5,12 @@ import (
 	log "github.com/donnie4w/go-logger/logger"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
-	"regexp"
 )
 
 func InitGlobalVar() {
 	qu.ReadConfig(&SysConfig) //加载配置文件
 	initMgo()
-}
-
-type ExtReg struct {
-	Reg     *regexp.Regexp
-	Replace string
+	initPCD()
 }
 
 // 初始化mgo
@@ -62,7 +57,7 @@ func initMgo() {
 }
 
 // 标准化省市区···
-func InitPCD() {
+func initPCD() {
 	S_ProvinceDict = make(map[string][]S_Province, 0)
 	S_CityDict = make(map[string][]S_City, 0)
 	S_DistrictDict = make(map[string][]S_District, 0)