import json import requests from hytest.common import * from cfg import cfg from bs4 import BeautifulSoup from requests.packages import urllib3 #存放公用方法 # 存储 全局共享 数据 GSTORE = {} class APIMgr(): #打印https请求与消息 def __init__(self): self.ui = None self.token = None def printRequest(self,req): if req.body==None: msgBody='' else: msgBody=req.body self.ui.outputWindow.append( '{}\n{}\n{}\n\n{}'.format( '\n\n-------发送请求--------', req.method+''+req.url, '\n'.join('{}:{}'.format(k,v) for k,v in req.headers.items()), msgBody, )) # 打印http相应消息的函数 def printResponse(self, response): print('\n\n----- https response begin -----') print(response.status_code) # print(response.headers) for k, v in response.headers.items(): print(f'{k}:{v}') print(response.content.decode('utf8')) print('----- https response end-----\n\n') headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67" } #headers设置为全局变量 GSTORE['headers'] = headers # session对象设置为全局变量 s = requests.Session() GSTORE['s'] = s #pc登录接口 def mgr_login(self, phone='18211989987', password='123456',useproxies=False): headers=GSTORE['headers'] s = GSTORE['s'] if useproxies: self.s.proxies.update({'http':'127.0.0.1:8888'}) response = self.s.post(f"{cfg.target_host}/phone/login",headers=headers,data= { 'reqType': 'phoneLogin', 'isAutoLogin': 'false', 'phone':phone, 'password':password }) self.printResponse(response) # 把response对象返回出去 return response # app登录接口 def mgr_login_app(self, phone='18211989987', password='123456',useproxies=False): headers=GSTORE['headers'] s = GSTORE['s'] if useproxies: self.s.proxies.update({'http':'127.0.0.1:8888'}) response = self.s.post(f"{cfg.target_host_app}/jyapp/free/login",headers=headers,params= { 'reqType': 'phoneLogin', 'phone':phone, 'password':password, 'rid':'', 'oid': '', 'phoneType': '', 'channel': '', 'deviceId': '' }) self.printResponse(response) # 把response对象返回出去 return response """退出登录pc""" def mgr_logout(self): url = f"{cfg.target_host}/front/signOut" headers = GSTORE['headers'] s = GSTORE['s'] s.post(url=url, headers=headers) # self.printResponse(res) # return res """退出登录app""" def mgr_logout_app(self): url = f"{cfg.target_host_app}/jyapp/free/signOut" headers = GSTORE['headers'] s = GSTORE['s'] s.post(url=url, headers=headers) #招标搜索 def bidsearch(self, keywords="建筑", publishtime="fiveyear", selectType="content"): #使用全局变量 headers = GSTORE['headers'] params={"keywords":keywords,"publishtime":publishtime,"timeslot":"","area":"","subtype":"","minprice":"","maxprice":"","industry":"","buyerclass":"","buyertel":"","winnertel":"","selectType":selectType,"notkey":"","fileExists":"0","city":"","searchGroup":"0","searchMode":"0","wordsMode":"0","additionalWords":""} #保存session session = self.s response = session.post(f"{cfg.target_host}/jylab/supsearch/index.html", headers=headers, params=params) self.printResponse(response) return response # 企业搜索 def enterpriseSearch(self, match="北京剑鱼信息技术有限公司河南分公司", matchType="A", pageSize="10", pageNum="0"): # 使用全局变量 headers = GSTORE['headers'] # 保存session session = self.s response = session.post(f"{cfg.target_host}/publicapply/enterpriseSearch/doQuery", headers=headers, data={ 'match': match, 'matchType': matchType, 'pageSize': pageSize, 'pageNum': pageNum }) self.printResponse(response) return response # 供应搜索 def supplySearch(self, keywords="PH计", searchType="title", province="", city="", time="", status="0", pageSize=50, pageIndex=1): headers = GSTORE['headers'] headers['Content-Type'] = 'application/json' # 添加Content-Type头部 url = f"{cfg.target_host}/jyinfo/supplySearch" data = { "keywords": keywords, "searchType": searchType, "province": province, "city": city, "time": time, "status": status, "pageSize": pageSize, "pageIndex": pageIndex } session=self.s response = session.post(url=url, headers=headers, data=json.dumps(data)) self.printResponse(response) return response #采购单位搜索 def buyer_search(self, buyerName, province=None, city=None, buyerClass=None, isCheckFollow=True, isCheckReceive=True, isContact=0, pageSize=10, pageNum=1): if buyerClass is None: buyerClass = [] if city is None: city = [] if province is None: province = [] url = f"{cfg.target_host}/jyapi/jybx/buyer/eType/buyerList" data = { "buyerName": buyerName, "province": province, "city": city, "buyerClass": buyerClass, "isCheckFollow": isCheckFollow, "isCheckReceive": isCheckReceive, "isContact": isContact, "pageSize": pageSize, "pageNum":pageNum } response = requests.post(url=url, json=data, headers=self.headers) self.printResponse(response) return response #接口数据传值常用三种方式:urlencoded---params,键值对---data,json格式---json #获取推送记录接口 def push_list(self): headers=GSTORE['headers'] s = GSTORE['s'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/fType/list",headers=headers,json= {"pageNum": 1, "pageSize": 50, "format": "table", "area": "", "selectTime": "all", "city": "", "buyerClass": "", "subtype": "", "industry": "", "keyWords": "", "fileExists": "", "price": "", "source": "", "exportNum": "", "vt": ""}) return response # 不登录招标搜索 def notloggedin_search(self, keyword='科技', publishtime='thisyear', selectType='content,title'): headers = GSTORE['headers'] params={"keywords": keyword , "publishtime": publishtime, "timeslot": "", "area": "", "subtype": "", "minprice": "", "maxprice": "", "industry": "", "buyerclass": "", "buyertel": "", "winnertel": "", "selectType": selectType, "notkey": "", "fileExists": "0", "city": "", "searchGroup": "0", "searchMode": "0", "wordsMode": "0", "additionalWords": ""} response = requests.post(f"{cfg.target_host}/jylab/supsearch/index.html", headers=headers, params=params) response.raise_for_status() # 如果请求失败,会抛出异常 return response #不登录采购单位搜索 def notloggedin_buysearch(self,keyword): headers = GSTORE['headers'] s = GSTORE['s'] data = {"buyerName":keyword,"province":[],"city":[],"buyerClass":[],"isCheckFollow":False,"isCheckReceive":False,"isContact":0,"pageSize":10,"pageNum":1} response =s.post(f"{cfg.target_host}/jyapi/jybx/buyer/eType/buyerList", headers=headers, json=data) # response.raise_for_status() # 如果请求失败,会抛出异常 return response #消息中心列表 def get_messagelist(self): headers = GSTORE['headers'] s = GSTORE['s'] response=s.post(f"{cfg.target_host}/jyapi/messageCenter/MessageList",headers=headers,json= {"msgType":-1,"isRead":-1,"offset":1,"size":20}) return response """用户中台""" def get_userCenter(self): hearders = GSTORE['headers'] s = GSTORE['s'] response = s.post(f"{cfg.target_host}/userCenter/workDesktop/menuInfo", headers=hearders) return response """我的订单""" def get_myOrder(self): hearders = GSTORE['headers'] s = GSTORE['s'] params = { "type": 0, "pageNum": 1, "fromPage": "pc", "page_size": 10 } response = s.post(f"{cfg.target_host}/subscribepay/orderListDetails/myOrder", headers=hearders, params=params) return response """优惠卷""" def get_coupon(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "mold": 4, "currentPage": 1, "pageSize": 8, "platform": "P" } response= s.post(f"{cfg.target_host}/jyCoupon/getInfoByUser", headers=hearders,params=params) return response """数据自动导出""" def get_dataExport(self,publishtime=1672502400_1688951529,keyword='数据',selectType='title'): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "publishtime":publishtime, "area":"", "city":"", "region":"", "industry":"", "buyerclass":"", "keyword": [{"keyword":keyword,"appended":[],"exclude":[]}], "selectType":selectType, "minprice":"", "maxprice":"", "subtype":"", "buyer":"", "winner":"", "dataType": 2 } response = s.post(f"{cfg.target_host}/front/dataExport/sieveData", headers=hearders, params=params) return response """数据导出记录""" def Export_recordList(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "pageNum": 0, "pageSize": 10 } response = s.post(f"{cfg.target_host}/subscribepay/dataExportPack/recordList", headers=hearders, params=params) return response """剑鱼文库搜索""" def Library_search(self,keyWord='数据'): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "keyWord":keyWord, "tag":"", "sort": "tSort", "num": 1, "size": 10 } response = s.post(f"{cfg.target_host}/jydocs/search", headers=hearders, params=params) return response """剑鱼文库收藏""" def Library_collection(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "sign": 1, "num": 1, "size": 10 } response = s.post(f"{cfg.target_host}/jydocs/user/list", headers=hearders, params=params) return response """剑鱼文库我的文库""" def My_library(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "sign": 0, "num": 1, "size": 10 } response = s.post(f"{cfg.target_host}/jydocs/user/list", headers=hearders, params=params) return response """项目进度监控""" def Project_monitoring(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "pageNum": 0, "pageSize": 500 } response = s.post(f"{cfg.target_host}/bigmember/follow/project/list", headers=hearders, params=params) return response """企业情报监控""" def Enterprise_monitoring(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "pageNum": 0, "pageSize": 10, "match":"", "group":"", } response = s.post(f"{cfg.target_host}/bigmember/follow/ent/list", headers=hearders, params=params) return response """客户监控""" def Customer_monitoring(self,): hearders = GSTORE['headers'] s = GSTORE['s'] params = { "pagesize": 10, "pageno": 0, "keyword":"" } response = s.post(f"{cfg.target_host}/publicapply/customer/list", headers=hearders, params=params) return response """标讯收藏""" def Message_Collection(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "buyerPhone":0, "buyerclass":"", "label":"", "pagenum":1, "pagesize":50, "selectTime":"", "winnerPhone":0 } response = s.post(f"{cfg.target_host}/publicapply/bidcoll/list", headers=hearders, params=params) return response """标讯收藏""" #信息获取 def Getuser(self): headers = GSTORE['headers'] s = GSTORE['s'] response = s.get(f"{cfg.target_host}/jypay/user/getAccountInfo", headers=headers) return response #密码校验 def Check_password(self): headers = GSTORE['headers'] s = GSTORE['s'] params = { "password": "123456" } response = s.post(f"{cfg.target_host}/publicapply/password/check", headers=headers, params=params) return response #身份获取 def Identity_list(self): headers = GSTORE['headers'] s = GSTORE['s'] params = { } response = s.post(f"{cfg.target_host}/publicapply/identity/list",headers=headers, params=params) # 解析响应内容为JSON response_json = response.json() # 从JSON响应中提取token self.token = response_json['data'][0]['token'] return response def Identity_switch(self): headers = GSTORE["headers"] s = GSTORE["s"] params = { "token":self.token } response =s.post(f"{cfg.target_host}/publicapply/identity/switch", headers=self.headers,params=params) return response def User_info(self): headers = { 'content-Type': 'application/json', 'appId': '10000', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67' } s = GSTORE["s"] params = {} response =s.post(f"{cfg.target_host}/userCenter/ent/userInfo",headers=headers,params=params) return response def Whether_buy(self): headers = GSTORE["headers"] s = GSTORE["s"] response =s.get(f"{cfg.target_host}/entnicheNew/buy/whetherbuy",headers=headers) return response def Commonly_List(self): headers = { 'content-Type': 'application/json', 'appId': '10000', 'userid': '63a41aa5cd7ea10389b2a8f3' } s = GSTORE['s'] response =s.post(f"{cfg.target_host}/userCenter/workDesktop/renew/commonlyList",headers=headers) return response def Authorised_info(self): headers = { "Content-Type":"application/json", "functionCode":"znsj_kf_use" } s = GSTORE['s'] response =s.post(f"{cfg.target_host}/resourceCenter/waitEmpowerDetail",headers=headers) return response def nologin_buyer_list(self): headers = GSTORE['headers'] s = GSTORE['s'] data = {"buyerName":"","province":[],"city":[],"buyerClass":[],"isCheckFollow":False,"isCheckReceive":False,"isContact":0,"pageSize":10,"pageNum":1} response =s.post(f"{cfg.target_host}/jyapi/jybx/buyer/eType/buyerList",headers=headers,json=data) return response def nologin_supplySearch(self): headers = GSTORE['headers'] s = GSTORE['s'] data = {"keywords":"信息","searchType":"title","province":"","city":"","time":"","status":"0","pageSize":50,"pageIndex":1} response =s.post(f"{cfg.target_host}/jyinfo/supplySearch",headers=headers,json=data) return response def nologin_enterpriseSearch(self): headers = GSTORE['headers'] s = GSTORE['s'] params = {"match":"科技"} response =s.post(f"{cfg.target_host}/publicapply/enterpriseSearch/doQuery",headers=headers, params=params) return response def nologin_Tender_search(self ): headers = GSTORE['headers'] s = GSTORE['s'] params = {"match":"科技"} response =s.post(f"{cfg.target_host}/jyapi/jybx/core/fType/searchList",headers=headers, params=params) return response def nologin_Proposed_construction_search(self ): headers = GSTORE['headers'] s = GSTORE['s'] params = {"match":"科技"} response =s.post(f"{cfg.target_host}/front/project/nzj/search",headers=headers, params=params) return response apimgr = APIMgr()