Browse Source

更新dataprocess 值

wcc 1 year ago
parent
commit
c3f597791d
2 changed files with 37 additions and 31 deletions
  1. 36 30
      src/task/task.go
  2. 1 1
      src/web/task.go

+ 36 - 30
src/task/task.go

@@ -617,38 +617,44 @@ func NewTaskRunAll(tt *TTask, budp bool, mapInfo map[string]interface{}) int {
 					}
 					}
 				}
 				}
 				if tt.S_querycon == "1" { //id查询
 				if tt.S_querycon == "1" { //id查询
-					//临时修改查询id区间段
-					comeintime := time.Now().Unix() - 5*60
-					query := map[string]interface{}{
-						"comeintime": map[string]interface{}{
-							"$gt": comeintime,
-						},
-					}
-					qId := tt.MgoTask.GetMgoConn()
-					defer tt.MgoTask.DestoryMongoConn(qId)
-					tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
-					eId := ""
-					for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
-						eId = u.BsonIdToSId(tmp["_id"])
-					}
-					if tt.LastId != "" && q["_id"] == nil {
-						sid := tt.LastId
-						if eId <= sid || eId == "" {
-							return
+					if tt.S_query != "" {
+						//页面上配置了查询条件,直接使用,不再单独查询上次任务结束ID
+
+					} else {
+						//临时修改查询id区间段
+						comeintime := time.Now().Unix() - 5*60
+						query := map[string]interface{}{
+							"comeintime": map[string]interface{}{
+								"$gt": comeintime,
+							},
 						}
 						}
-						q["_id"] = map[string]interface{}{
-							"$gt":  u.StringTOBsonId(sid),
-							"$lte": u.StringTOBsonId(eId),
+						qId := tt.MgoTask.GetMgoConn()
+						defer tt.MgoTask.DestoryMongoConn(qId)
+						tmpData := qId.DB(tt.S_mgodb).C(tt.S_coll).Find(&query).Limit(1).Sort("-_id").Iter()
+						eId := ""
+						for tmp := make(map[string]interface{}); tmpData.Next(tmp); {
+							eId = u.BsonIdToSId(tmp["_id"])
+						}
+						if tt.LastId != "" && q["_id"] == nil {
+							sid := tt.LastId
+							if eId <= sid || eId == "" {
+								return
+							}
+							q["_id"] = map[string]interface{}{
+								"$gt":  u.StringTOBsonId(sid),
+								"$lte": u.StringTOBsonId(eId),
+							}
+						}
+						//按id查询,为了保证有新数据入库,每次休息2分钟
+						time.Sleep(time.Second * 60)
+						//测试环境q的赋值执行下述代码
+						if tt.LastId != "" && q["_id"] == nil {
+							q["_id"] = map[string]interface{}{
+								"$gt": u.StringTOBsonId(tt.LastId),
+							}
 						}
 						}
 					}
 					}
-					//按id查询,为了保证有新数据入库,每次休息2分钟
-					time.Sleep(time.Minute * 2)
-					//测试环境q的赋值执行下述代码
-					//if tt.LastId != "" && q["_id"] == nil {
-					//	q["_id"] = map[string]interface{}{
-					//		"$gt": u.StringTOBsonId(tt.LastId),
-					//	}
-					//}
+
 				} else { //时间查询
 				} else { //时间查询
 					name := tt.S_timefieldname
 					name := tt.S_timefieldname
 					q[name] = map[string]interface{}{
 					q[name] = map[string]interface{}{
@@ -1302,7 +1308,7 @@ func FindId(coll string) (gtid, lteid string) {
 		lteid = d["lteid"].(string)
 		lteid = d["lteid"].(string)
 		set := map[string]interface{}{
 		set := map[string]interface{}{
 			"$set": map[string]interface{}{
 			"$set": map[string]interface{}{
-				"dataprocess": 1,
+				"dataprocess": 2,
 				"updatetime":  time.Now().Unix(),
 				"updatetime":  time.Now().Unix(),
 			},
 			},
 		}
 		}

+ 1 - 1
src/web/task.go

@@ -17,7 +17,7 @@ func Main(w http.ResponseWriter, r *http.Request) {
 		ss, _ := Store.Get(r, SN)
 		ss, _ := Store.Get(r, SN)
 		data := map[string]interface{}{"s_name": ss.Values["s_name"]}
 		data := map[string]interface{}{"s_name": ss.Values["s_name"]}
 		//查询配置规则
 		//查询配置规则
-		res, _ := MgoClass.Find(COLL_TASK, nil, `{"_id":-1}`, nil, false, 0, 200)
+		res, _ := MgoClass.Find(COLL_TASK, nil, `{"l_lasttime":-1}`, nil, false, 0, 200)
 		for _, r := range *res {
 		for _, r := range *res {
 			if r["s_starttime"] == nil || r["s_starttime"] == int64(0) {
 			if r["s_starttime"] == nil || r["s_starttime"] == int64(0) {
 				r["s_starttime"] = ""
 				r["s_starttime"] = ""