Browse Source

抽取-存储 -新增-邮件

zhengkun 3 years ago
parent
commit
4e36eb9ce6

+ 1 - 0
data_monitoring/get_ assign/src/main.go

@@ -0,0 +1 @@
+package main

+ 2 - 0
data_monitoring/get_ assign/src/mark

@@ -0,0 +1,2 @@
+获取指定数据
+上海政府采购网、中国政府采购网   上海   需求字段:省份、公告标题、公告类别、发布时间、公告地址、项目名称、来源网站。

+ 329 - 0
data_monitoring/get_ assign/src/mgo.go

@@ -0,0 +1,329 @@
+package main
+
+import (
+	"context"
+	"log"
+	"time"
+
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/bson/primitive"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type MgoSess struct {
+	Db     string
+	Coll   string
+	Query  interface{}
+	Sorts  []string
+	fields interface{}
+	limit  int64
+	skip   int64
+	M      *MongodbSim
+}
+
+type MgoIter struct {
+	Cursor *mongo.Cursor
+}
+
+func (mt *MgoIter) Next(result interface{}) bool {
+	if mt.Cursor != nil {
+		if mt.Cursor.Next(nil) {
+			err := mt.Cursor.Decode(result)
+			if err != nil {
+				log.Println("mgo cur err", err.Error())
+				mt.Cursor.Close(nil)
+				return false
+			}
+			return true
+		} else {
+			mt.Cursor.Close(nil)
+			return false
+		}
+	} else {
+		return false
+	}
+
+}
+
+func (ms *MgoSess) DB(name string) *MgoSess {
+	ms.Db = name
+	return ms
+}
+
+func (ms *MgoSess) C(name string) *MgoSess {
+	ms.Coll = name
+	return ms
+}
+
+func (ms *MgoSess) Find(q interface{}) *MgoSess {
+	ms.Query = q
+	return ms
+}
+
+func (ms *MgoSess) Select(fields interface{}) *MgoSess {
+	ms.fields = fields
+	return ms
+}
+
+func (ms *MgoSess) Limit(limit int64) *MgoSess {
+	ms.limit = limit
+	return ms
+}
+func (ms *MgoSess) Skip(skip int64) *MgoSess {
+	ms.skip = skip
+	return ms
+}
+
+func (ms *MgoSess) Sort(sorts ...string) *MgoSess {
+	ms.Sorts = sorts
+	return ms
+}
+
+func (ms *MgoSess) Iter() *MgoIter {
+	it := &MgoIter{}
+	find := options.Find()
+	if ms.skip > 0 {
+		find.SetSkip(ms.skip)
+	}
+	if ms.limit > 0 {
+		find.SetLimit(ms.limit)
+	}
+	find.SetBatchSize(100)
+	if len(ms.Sorts) > 0 {
+		sort := bson.M{}
+		for _, k := range ms.Sorts {
+			switch k[:1] {
+			case "-":
+				sort[k[1:]] = -1
+			case "+":
+				sort[k[1:]] = 1
+			default:
+				sort[k] = 1
+			}
+		}
+		find.SetSort(sort)
+	}
+	if ms.fields != nil {
+		find.SetProjection(ms.fields)
+	}
+	cur, err := ms.M.C.Database(ms.Db).Collection(ms.Coll).Find(ms.M.Ctx, ms.Query, find)
+	if err != nil {
+		log.Println("mgo find err", err.Error())
+	} else {
+		it.Cursor = cur
+	}
+	return it
+}
+
+type MongodbSim struct {
+	MongodbAddr string
+	Size        int
+	//	MinSize     int
+	DbName   string
+	C        *mongo.Client
+	Ctx      context.Context
+	ShortCtx context.Context
+	pool     chan bool
+	UserName string
+	Password string
+}
+
+func (m *MongodbSim) GetMgoConn() *MgoSess {
+	//m.Open()
+	ms := &MgoSess{}
+	ms.M = m
+	return ms
+}
+
+func (m *MongodbSim) DestoryMongoConn(ms *MgoSess) {
+	//m.Close()
+	ms.M = nil
+	ms = nil
+}
+
+func (m *MongodbSim) InitPool() {
+	opts := options.Client()
+	opts.SetConnectTimeout(3 * time.Second)
+	opts.ApplyURI("mongodb://" + m.MongodbAddr)
+	opts.SetMaxPoolSize(uint64(m.Size))
+	m.pool = make(chan bool, m.Size)
+
+	if m.UserName !="" && m.Password !="" {
+		cre := options.Credential{
+			Username:m.UserName,
+			Password:m.Password,
+		}
+		opts.SetAuth(cre)
+	}
+
+
+
+	opts.SetMaxConnIdleTime(2 * time.Hour)
+	m.Ctx, _ = context.WithTimeout(context.Background(), 99999*time.Hour)
+	m.ShortCtx, _ = context.WithTimeout(context.Background(), 1*time.Minute)
+	client, err := mongo.Connect(m.ShortCtx, opts)
+	if err != nil {
+		log.Println("mgo init error:", err.Error())
+	} else {
+		m.C = client
+		log.Println("init success")
+	}
+}
+
+func (m *MongodbSim) Open() {
+	m.pool <- true
+}
+func (m *MongodbSim) Close() {
+	<-m.pool
+}
+
+//批量插入
+func (m *MongodbSim) UpSertBulk(c string, doc ...[]map[string]interface{}) (map[int64]interface{}, bool) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewUpdateOneModel()
+		write.SetFilter(d[0])
+		write.SetUpdate(d[1])
+		write.SetUpsert(true)
+		writes = append(writes, write)
+	}
+	r, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo upsert error:", e.Error())
+		return nil, false
+	}
+	//	else {
+	//		if r.UpsertedCount != int64(len(doc)) {
+	//			log.Println("mgo upsert uncomplete:uc/dc", r.UpsertedCount, len(doc))
+	//		}
+	//		return true
+	//	}
+	return r.UpsertedIDs, true
+}
+
+//批量插入
+func (m *MongodbSim) SaveBulk(c string, doc ...map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	var writes []mongo.WriteModel
+	for _, d := range doc {
+		write := mongo.NewInsertOneModel()
+		write.SetDocument(d)
+		writes = append(writes, write)
+	}
+	_, e := coll.BulkWrite(m.Ctx, writes)
+	if e != nil {
+		log.Println("mgo savebulk error:", e.Error())
+		return false
+	}
+	return true
+}
+
+//保存
+func (m *MongodbSim) Save(c string, doc map[string]interface{}) interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.InsertOne(m.Ctx, doc)
+	if err != nil {
+		return nil
+	}
+	return r.InsertedID
+}
+
+//更新by Id
+func (m *MongodbSim) UpdateById(c, id string, doc map[string]interface{}) bool {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	_, err := coll.UpdateOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)}, doc)
+	if err != nil {
+		return false
+	}
+	return true
+}
+
+//删除by id
+func (m *MongodbSim) DeleteById(c, id string) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//通过条件删除
+func (m *MongodbSim) Delete(c string, query map[string]interface{}) int64 {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r, err := coll.DeleteMany(m.Ctx, query)
+	if err != nil {
+		return 0
+	}
+	return r.DeletedCount
+}
+
+//findbyid
+func (m *MongodbSim) FindById(c, id string) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, map[string]interface{}{"_id": StringTOBsonId(id)})
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//findone
+func (m *MongodbSim) FindOne(c string, query map[string]interface{}) map[string]interface{} {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	r := coll.FindOne(m.Ctx, query)
+	v := map[string]interface{}{}
+	r.Decode(&v)
+	return v
+}
+
+//find
+func (m *MongodbSim) Find(c string, query map[string]interface{}, sort, fields interface{}) ([]map[string]interface{}, error) {
+	m.Open()
+	defer m.Close()
+	coll := m.C.Database(m.DbName).Collection(c)
+	op := options.Find()
+	r, err := coll.Find(m.Ctx, query, op.SetSort(sort), op.SetProjection(fields))
+	if err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+
+	var results []map[string]interface{}
+	if err = r.All(m.Ctx, &results); err != nil {
+		log.Fatal(err)
+		return nil, err
+	}
+	return results, nil
+}
+
+//创建_id
+func NewObjectId() primitive.ObjectID {
+	return primitive.NewObjectID()
+}
+
+func StringTOBsonId(id string) primitive.ObjectID {
+	objectId, _ := primitive.ObjectIDFromHex(id)
+	return objectId
+}
+
+func BsonTOStringId(id interface{}) string {
+	return id.(primitive.ObjectID).Hex()
+}

+ 9 - 9
data_monitoring/listen_data/src/main.go

@@ -78,14 +78,14 @@ func init() {
 
 func main()  {
 
-	//python_mgo = &MongodbSim{
-	//	MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
-	//	DbName:      "qfw",
-	//	Size:        20,
-	//	UserName: "zhengkun",
-	//	Password: "zk@123123",
-	//}
-	//python_mgo.InitPool()
+	python_mgo = &MongodbSim{
+		MongodbAddr: "172.17.145.163:27083,172.17.4.187:27082",
+		DbName:      "mixdata",
+		Size:        20,
+		UserName: "zhengkun",
+		Password: "zk@123123",
+	}
+	python_mgo.InitPool()
 
 	save_mgo = &MongodbSim{
 		MongodbAddr: "172.17.4.85:27080",
@@ -114,7 +114,7 @@ func main()  {
 	//}
 	//save_mgo.InitPool()
 
-	filterDataBuyerWinnerLength()
+	exportRepairNewStandData()
 
 	return
 

+ 183 - 2
data_monitoring/listen_data/src/zkmethod.go

@@ -65,7 +65,6 @@ func addChromPluginTaskData()  {
 	}
 	log.Debug("结束:",len(taskArr))
 }
-
 //解密
 func decodeJyUrl()  {
 	//zk@123123   zhengkun
@@ -98,7 +97,6 @@ func decodeJyUrl()  {
 	log.Debug(Decode[0])
 	return
 }
-
 //加密
 func encodeJyUrl()  {
 	var Url = "https://www.jianyu360.com/article/content/%s.html"
@@ -106,6 +104,188 @@ func encodeJyUrl()  {
 	log.Debug(Encode)
 }
 
+//刷数据
+func exportRepairNewStandData()  {
+	sess := save_mgo.GetMgoConn()
+	defer save_mgo.DestoryMongoConn(sess)
+	q,total,isok:=map[string]interface{}{},0,0
+	it := sess.DB(save_mgo.DbName).C("zktest_new_sssttt_data").Find(&q).Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp);total++{
+		if total%1000==0 {
+			log.Debug("current index",total,isok,tmp["_id"])
+		}
+		buyer_name := qu.ObjToString(tmp["buyer"])
+		buyerclass := qu.ObjToString(tmp["buyerclass"])
+		data := python_mgo.FindOne("buyer_enterprise", map[string]interface{}{
+			"buyer_name":buyer_name,
+		})
+		if data!=nil {
+			isok++
+			python_mgo.UpdateById("buyer_enterprise",BsonTOStringId(data["_id"]), map[string]interface{}{
+				"$set": map[string]interface{}{
+					"buyerclass":buyerclass,
+				},
+			})
+
+		}
+
+		tmp = make(map[string]interface{})
+	}
+	log.Debug("is over",total,isok)
+}
+
+//导出数据
+func exportNewStandData()  {
+	//zktest_new_buyerstand_data
+	//elastic.InitElasticSize("http://127.0.0.1:12003",10)
+	elastic.InitElasticSize("http://172.17.145.170:9800", 10)
+	sess := python_mgo.GetMgoConn()
+	defer python_mgo.DestoryMongoConn(sess)
+	q,total,isok:=map[string]interface{}{},0,0
+	it := sess.DB(python_mgo.DbName).C("zktest_new_buyerstand_data").Find(&q).Iter()
+	for tmp := make(map[string]interface{}); it.Next(&tmp);total++{
+		if total%1000==0 {
+			log.Debug("current index",total,isok,tmp["_id"])
+		}
+		buyer_name := qu.ObjToString(tmp["buyer_name"])
+		query := `{"query":{"bool":{"must":[{"term":{"bidding.buyer":`+`"`+buyer_name+`"`+`}}],"must_not":[],"should":[]}},"from":"0","size":"1","sort":[],"facets":{}}`
+		arr := *elastic.Get("bidding","bidding",query)
+		if arr!=nil && len(arr)>0 {
+			source_id := qu.ObjToString(arr[0]["_id"])
+			if source_id!="" {
+				data := save_mgo.FindById("result_20210108",source_id)
+				if data!=nil && len(data)>2 {
+					isok++
+					save_mgo.Save("zktest_new_stand_data",data)
+				}else {
+					isok++
+					save_mgo.Save("zktest_new_stand_data", map[string]interface{}{
+						"buyer":buyer_name,
+					})
+				}
+			}
+		}else {
+			isok++
+			save_mgo.Save("zktest_new_stand_data", map[string]interface{}{
+				"buyer":buyer_name,
+			})
+		}
+		tmp = make(map[string]interface{})
+	}
+	log.Debug("is over",total,isok)
+}
+
+
+//导出-err标准数据
+func exportOtherErrStandardataTwo()  {
+	sess := python_mgo.GetMgoConn()
+	defer python_mgo.DestoryMongoConn(sess)
+	q,total,isok:=map[string]interface{}{},0,0
+	it := sess.DB(python_mgo.DbName).C("buyer_enterprise_new").Find(&q).Select(map[string]interface{}{
+		"buyer_name":1,
+	}).Iter()
+	pool_data := make(chan bool, 8)
+	wg_data := &sync.WaitGroup{}
+	for tmp := make(map[string]interface{}); it.Next(&tmp);total++{
+		if total%10000==0 {
+			log.Debug("current index",total,isok,tmp["_id"])
+		}
+		buyer_name := qu.ObjToString(tmp["buyer_name"])
+		if utf8.RuneCountInString(buyer_name)<4 {
+			continue
+			tmp = make(map[string]interface{})
+		}
+		tmpid := BsonTOStringId(tmp["_id"])
+		pool_data <- true
+		wg_data.Add(1)
+		go func(buyer_name string ,tmpid string) {
+			defer func() {
+				<-pool_data
+				wg_data.Done()
+			}()
+			stand_data := python_mgo.FindOne("buyer_enterprise", map[string]interface{}{
+				"buyer_name":buyer_name,
+			})
+			if len(stand_data)>2 && stand_data!=nil {
+
+			}else {
+				//查询企业库是否存在
+				qy_data := python_mgo.FindOne("qyxy_std", map[string]interface{}{
+					"company_name":buyer_name,
+				})
+				if len(qy_data)>2 && qy_data!=nil {
+					//企业库存在-真实有效-看标准库是否存在
+					isok++
+					dict := map[string]interface{}{
+						"buyer_name":buyer_name,
+					}
+					python_mgo.Save("zktest_new_buyerstand_data",dict)
+				}
+			}
+		}(buyer_name,tmpid)
+		tmp = make(map[string]interface{})
+	}
+	wg_data.Wait()
+
+	log.Debug("is over",total,isok)
+}
+func exportOtherErrStandardataOne()  {
+	sess := python_mgo.GetMgoConn()
+	defer python_mgo.DestoryMongoConn(sess)
+	q,total,isok:=map[string]interface{}{},0,0
+	it := sess.DB(python_mgo.DbName).C("buyer_err").Find(&q).Select(map[string]interface{}{
+		"name":1,
+	}).Iter()
+	pool_data := make(chan bool, 8)
+	wg_data := &sync.WaitGroup{}
+	for tmp := make(map[string]interface{}); it.Next(&tmp);total++{
+		if total%10000==0 {
+			log.Debug("current index",total,isok,tmp["_id"])
+		}
+		buyer_name := qu.ObjToString(tmp["name"])
+		if utf8.RuneCountInString(buyer_name)<4 {
+			continue
+			tmp = make(map[string]interface{})
+		}
+		tmpid := BsonTOStringId(tmp["_id"])
+		pool_data <- true
+		wg_data.Add(1)
+		go func(buyer_name string ,tmpid string) {
+			defer func() {
+				<-pool_data
+				wg_data.Done()
+			}()
+
+			//查询企业库是否存在
+			qy_data := python_mgo.FindOne("qyxy_std", map[string]interface{}{
+				"company_name":buyer_name,
+			})
+			if len(qy_data)>2 && qy_data!=nil {
+				//企业库存在-真实有效-看标准库是否存在
+				stand_data := python_mgo.FindOne("buyer_enterprise", map[string]interface{}{
+					"buyer_name":buyer_name,
+				})
+				if len(stand_data)>2 && stand_data!=nil {
+					python_mgo.DeleteById("buyer_err",BsonTOStringId(tmp["_id"]))
+				}else {
+					isok++
+					dict := map[string]interface{}{
+						"buyer_name":buyer_name,
+					}
+					python_mgo.Save("zktest_new_buyerstand_data",dict)
+					python_mgo.DeleteById("buyer_err",BsonTOStringId(tmp["_id"]))
+				}
+			}
+		}(buyer_name,tmpid)
+		tmp = make(map[string]interface{})
+	}
+	wg_data.Wait()
+
+	log.Debug("is over",total,isok)
+}
+
+
+//导出数据
 func filterDataBuyerWinnerLength()  {
 	reg := regexp.MustCompile(`^.{2}([大|小|中|学][学|院]|公司)$`)
 	sess := save_mgo.GetMgoConn()
@@ -2402,6 +2582,7 @@ func analyTagData()  {
 			log.Debug("当前:",k)
 		}
 		query:=`{"query":{"filtered":{"filter":{"bool":{"must":[{}]}},"query":{"bool":{"should":[{"multi_match":{"query":`+`"`+value+`"`+`,"type":"phrase","fields":["detail","filetext"]}}]}}}},"from":0,"size":10}`
+
 		dataArr := *elastic.Get("bidding","bidding",query)
 		if len(dataArr)>0 {
 			//for _,data :=range dataArr {

+ 1 - 0
src/jy/extract/extract.go

@@ -2299,6 +2299,7 @@ func AnalysisSaveResult(j, jf *ju.Job, e *ExtractTask) {
 		}
 		if e.TaskInfo.TestColl == "" {
 			if len(tmp) > 0 { //保存抽取结果
+				delete(tmp, "_id")
 				tmparr := []map[string]interface{}{
 					map[string]interface{}{
 						"_id": qu.StringTOBsonId(_id),

+ 20 - 17
src/jy/extract/extractInit.go

@@ -1397,12 +1397,13 @@ func (e *ExtractTask) BidSave(init bool) {
 					arr := e.BidArr[:saveLimit]
 					e.BidArr = e.BidArr[saveLimit:]
 					e.RWMutex.Unlock()
-					arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
+					//arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
+					arr, _, _, _ = getFieldAllAndBlocks(arr)
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
-						e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
-						e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
-						e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
+						//e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
+						//e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
+						//e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
@@ -1410,12 +1411,12 @@ func (e *ExtractTask) BidSave(init bool) {
 					arr := e.BidArr
 					e.BidArr = [][]map[string]interface{}{}
 					e.RWMutex.Unlock()
-					arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
+					arr, _, _, _ = getFieldAllAndBlocks(arr)
 					qu.Try(func() {
 						e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
-						e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
-						e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
-						e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
+						//e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
+						//e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
+						//e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
 					}, func(err interface{}) {
 						log.Debug(err)
 					})
@@ -1435,17 +1436,17 @@ func (e *ExtractTask) BidSave(init bool) {
 					arr2 := arr[:saveLimit]
 					arr = arr[saveLimit:]
 					lenarr = len(arr)
-					arr2, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr2)
+					arr2, _, _, _ = getFieldAllAndBlocks(arr2)
 					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr2...)
-					e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
-					e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
-					e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
+					//e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
+					//e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
+					//e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
 				} else {
-					arr, blocks, fieldalls, fieldallsf := getFieldAllAndBlocks(arr)
+					arr, _, _, _ := getFieldAllAndBlocks(arr)
 					e.TaskInfo.TDB.UpSertBulk(e.TaskInfo.ToColl, arr...)
-					e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
-					e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
-					e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
+					//e.TaskInfo.TDB.SaveBulk("ext_blocks", blocks...)
+					//e.TaskInfo.TDB.SaveBulk("ext_fieldall", fieldalls...)
+					//e.TaskInfo.TDB.SaveBulk("ext_fieldallf", fieldallsf...)
 					break
 				}
 			}
@@ -1495,7 +1496,9 @@ func getFieldAllAndBlocks(a [][]map[string]interface{}) (arr [][]map[string]inte
 			}
 			delete(tmp, "fieldall")
 			delete(tmp, "fieldallf")
-			v[1] = tmp
+
+			v[1] = tmp //全部更新
+			//v[1]["$set"] = tmp //指定更新
 		}
 		arr = append(arr, v)
 	}

+ 2 - 2
standardata/src/standarwinner.go

@@ -377,8 +377,8 @@ func comHisMegerNewData(name, datatype string, ps []map[string]interface{}) map[
 		"company_email":   "",
 		"area_code":       "",
 		"province":        qu.ObjToString(tmp["company_area"]),
-		"city":            "",
-		"district":        "",
+		"city":            qu.ObjToString(tmp["company_city"]),
+		"district":        qu.ObjToString(tmp["company_district"]),
 		"company_type":    qu.ObjToString(tmp["company_type"]),
 		"legal_person":    qu.ObjToString(tmp["legal_person"]),
 		"company_phone":   "",

+ 7 - 7
standardata/src/task.go

@@ -30,7 +30,7 @@ func task_standarData() {
 	})
 	c.Start()
 	//立即执行一次
-	runOnce()
+	//runOnce()
 }
 
 func runOnce()  {
@@ -44,12 +44,12 @@ func runOnce()  {
 			"$lte": pici,
 		},
 	}
-	//query  = map[string]interface{}{}
+	query  = map[string]interface{}{}
 	log.Println(mgofromdb, query)
-	go buyerStandarData(mgofromdb, query)
-	time.Sleep(10 * time.Second)
-	go winnerStandarData(mgofromdb, query)
-	time.Sleep(10 * time.Second)
-	go agencyStandarData(mgofromdb, query)
+	buyerStandarData(mgofromdb, query)
+	//time.Sleep(10 * time.Second)
+	//go winnerStandarData(mgofromdb, query)
+	//time.Sleep(10 * time.Second)
+	//go agencyStandarData(mgofromdb, query)
 }