import json import requests from hytest.common import * from cfg import cfg from bs4 import BeautifulSoup from requests.packages import urllib3 #存放公用方法 # 存储 全局共享 数据 GSTORE = {} class APIMgr(): #打印https请求与消息 def __init__(self): self.ui = None self.token = None def printRequest(self,req): if req.body==None: msgBody='' else: msgBody=req.body self.ui.outputWindow.append( '{}\n{}\n{}\n\n{}'.format( '\n\n-------发送请求--------', req.method+''+req.url, '\n'.join('{}:{}'.format(k,v) for k,v in req.headers.items()), msgBody, )) # 打印http相应消息的函数 def printResponse(self, response): print('\n\n----- https response begin -----') print(response.status_code) # print(response.headers) for k, v in response.headers.items(): print(f'{k}:{v}') print(response.content.decode('utf8')) print('----- https response end-----\n\n') headers = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67;jy-test" } # 定义第二个 headers headers_1 = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67;jy-test", "Content-Type":"application/json", "MiniprogramCode":"wy_zbxm" } #headers设置为全局变量 GSTORE['headers'] = headers GSTORE['headers_1'] = headers_1 # session对象设置为全局变量 s = requests.Session() GSTORE['s'] = s #pc登录接口 def mgr_login(self, phone='18211989987', password='#80Z!RVv52',useproxies=False): headers=GSTORE['headers'] s = GSTORE['s'] if useproxies: self.s.proxies.update({'http':'127.0.0.1:8888'}) response = self.s.post(f"{cfg.target_host}/phone/login",headers=headers,data= { 'reqType': 'phoneLogin', 'isAutoLogin': 'false', 'phone':phone, 'password':password }) # 把response对象返回出去 return response #登录后获取session def get_session(self): response=self.mgr_login() # 检查登录是否成功 if response.status_code == 200: # 通过 response.cookies 获取 session 信息 cookies = response.cookies session_id = cookies.get('SESSIONID') # 假设 session ID 存储在 cookie 中 print(f"Session ID: {session_id}") return session_id else: print("Login failed!") return None # app登录接口 def mgr_login_app(self, phone='18211989987', password='#80Z!RVv52',useproxies=False): headers=GSTORE['headers'] s = GSTORE['s'] if useproxies: self.s.proxies.update({'http':'127.0.0.1:8888'}) response = self.s.post(f"{cfg.target_host_app}/jyapp/free/login",headers=headers,params= { 'reqType': 'phoneLogin', 'phone':phone, 'password':password, 'rid':'', 'oid': '', 'phoneType': '', 'channel': '', 'deviceId': '' }) # 把response对象返回出去 return response """退出登录pc""" def mgr_logout(self): url = f"{cfg.target_host}/front/signOut" headers = GSTORE['headers'] s = GSTORE['s'] s.post(url=url, headers=headers) # return res """退出登录app""" def mgr_logout_app(self): url = f"{cfg.target_host_app}/jyapp/free/signOut" headers = GSTORE['headers'] s = GSTORE['s'] s.post(url=url, headers=headers) #招标搜索 def bidsearch(self, keywords="建筑", publishtime="fiveyear", selectType="content"): #使用全局变量 headers = GSTORE['headers'] params={"keywords":keywords,"publishtime":publishtime,"timeslot":"","area":"","subtype":"","minprice":"","maxprice":"","industry":"","buyerclass":"","buyertel":"","winnertel":"","selectType":selectType,"notkey":"","fileExists":"0","city":"","searchGroup":"0","searchMode":"0","wordsMode":"0","additionalWords":""} #保存session session = self.s response = session.post(f"{cfg.target_host}/jylab/supsearch/index.html", headers=headers, params=params) return response #超前项目搜索 def advanced_search(self): #使用全局变量 headers = GSTORE['headers'] #保存session session = self.s data={ "searchGroup": 2, "reqType": "lastNews", "pageNum": 1, "pageSize": 50, "keyWords": "科技", "searchMode": 0, "bidField": "", "publishTime": "1710142712-1741678712", "selectType": "title,content", "subtype": "", "exclusionWords": "", "buyer": "", "winner": "", "agency": "", "industry": "", "province": "", "city": "", "district": "", "buyerClass": "", "fileExists": "", "price": "", "buyerTel": "", "winnerTel": "" } response = session.post(f"{cfg.target_host}/jyapi/jybx/core/mType/searchList", headers=headers, params=data,timeout=10) return response # 企业搜索 def enterpriseSearch(self, match="北京剑鱼信息技术有限公司河南分公司", matchType="A", pageSize="10", pageNum="0"): # 使用全局变量 headers = GSTORE['headers'] # 保存session session = self.s response = session.post(f"{cfg.target_host}/publicapply/enterpriseSearch/doQuery", headers=headers, data={ 'match': match, 'matchType': matchType, 'pageSize': pageSize, 'pageNum': pageNum }) return response # 供应搜索 def supplySearch(self, keywords="PH计", searchType="title", province="", city="", time="", status="0", pageSize=50, pageIndex=1): headers = GSTORE['headers'] headers['Content-Type'] = 'application/json' # 添加Content-Type头部 url = f"{cfg.target_host}/jyinfo/supplySearch" data = { "keywords": keywords, "searchType": searchType, "province": province, "city": city, "time": time, "status": status, "pageSize": pageSize, "pageIndex": pageIndex } session=self.s response = session.post(url=url, headers=headers, data=json.dumps(data)) return response #采购单位搜索 def buyer_search(self, buyerName, province=None, city=None, buyerClass=None, isCheckFollow=True, isCheckReceive=True, isContact=0, pageSize=10, pageNum=1): if buyerClass is None: buyerClass = [] if city is None: city = [] if province is None: province = [] url = f"{cfg.target_host}/jyapi/jybx/buyer/mType/buyerList" data = { "buyerName": buyerName, "province": province, "city": city, "buyerClass": buyerClass, "isCheckFollow": isCheckFollow, "isCheckReceive": isCheckReceive, "isContact": isContact, "pageSize": pageSize, "pageNum":pageNum } response = requests.post(url=url, json=data, headers=self.headers) return response #融创用户搜索 def rc_search(self): headers = GSTORE['headers'] headers['Content-Type'] = 'application/json' # 添加Content-Type头部 url = f"{cfg.target_host}/jyapi/jybx/core/mType/searchList" params={ "searchGroup": 0, "reqType": "lastNews", "pageNum": 1, "pageSize": 50, "keyWords": "医疗设备", "searchMode": 0, "bidField": "", "publishTime": "1654704000-1657900799", "selectType": "title,content", "subtype": "", "exclusionWords": "", "buyer": "", "winner": "", "agency": "", "industry": "", "province": "", "city": "", "district": "", "buyerClass": "", "fileExists": "", "price": "", "buyerTel": "", "winnerTel": "", "mobileTag": [ "军队类", "武警类", "融通类", "退役类", "融办类", "某某类", "all" ] } session = self.s response = session.post(url=url, headers=headers, params=params) return response """三级页公告摘要""" #两个接口, def preagent(self,url='https://www.jianyu360.cn/nologin/content/ApGY1xdfTI4LyMsM3d4cE8JIzAvFj1jcXNlKwUkPT0dY2BwDidUCZM%3D.html'): headers = { 'Referer': url } s = GSTORE["s"] response = s.get(f"{cfg.target_host}/publicapply/detail/preAgent", headers=headers) response_data=json.loads(response.text) token=response_data["data"]["token"] return token def detail_baseinfo(self,url='https://www.jianyu360.cn/nologin/content/ApGY1xdfTI4LyMsM3d4cE8JIzAvFj1jcXNlKwUkPT0dY2BwDidUCZM%3D.html'): headers = GSTORE['headers'] s = GSTORE["s"] token=self.preagent(url) params = { "token":token } response = s.post(f"{cfg.target_host}/publicapply/detail/baseInfo", headers=headers, params=params) return response """三级页商机推荐""" def detail_advancedinfo(self): headers = GSTORE['headers'] s = GSTORE["s"] res = self.detail_baseinfo() response_data = json.loads(res.text) token = response_data["data"]["token"] params = { "token": token } response = s.post(f"{cfg.target_host}/publicapply/detail/advancedInfo", headers=headers, params=params) return response """三级页监控项目""" def detail_monitor_project(self): headers = GSTORE['headers'] s = GSTORE["s"] params = { "sid": "ABCY1xdfTI4LyMsM3d4cE8JIzAvFj1jcXNlKwUkPT0dY2BwDidUCZM=" } response = s.post(f"{cfg.target_host}/bigmember/follow/project/add", headers=headers, params=params) return response """取消项目监控""" def cacel_project(self): headers = GSTORE['headers'] s = GSTORE["s"] params = { "sid": "ABCY1xdfTI4LyMsM3d4cE8JIzAvFj1jcXNlKwUkPT0dY2BwDidUCZM=", "fid[followId]":"ABCY3ZdfT0FUCw4AnpUA3k%3D", "fid[limit_count]": 10, "fid[msg_open]": True, "fid[status]": True, } response = s.post(f"{cfg.target_host}/bigmember/follow/project/cancel", headers=headers, params=params) return response """用户信息获取isadd接口""" def isadd(self): headers = GSTORE['headers'] s = GSTORE["s"] response = s.get(f"{cfg.target_host}/bigmember/use/isAdd", headers=headers) return response #接口数据传值常用三种方式:urlencoded---params,键值对---data,json格式---json #获取推送记录接口 def push_list(self): headers=GSTORE['headers'] s = GSTORE['s'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/fType/list",headers=headers,json= {"pageNum": 1, "pageSize": 50, "format": "table", "area": "", "selectTime": "all", "city": "", "buyerClass": "", "subtype": "", "industry": "", "keyWords": "", "fileExists": "", "price": "", "source": "", "exportNum": "", "vt": ""}) return response # 不登录招标搜索 def notloggedin_search(self, keyword='科技', publishtime='thisyear', selectType='content,title'): headers = GSTORE['headers'] params={"keywords": keyword , "publishtime": publishtime, "timeslot": "", "area": "", "subtype": "", "minprice": "", "maxprice": "", "industry": "", "buyerclass": "", "buyertel": "", "winnertel": "", "selectType": selectType, "notkey": "", "fileExists": "0", "city": "", "searchGroup": "0", "searchMode": "0", "wordsMode": "0", "additionalWords": ""} response = requests.post(f"{cfg.target_host}/jylab/supsearch/index.html", headers=headers, params=params) response.raise_for_status() # 如果请求失败,会抛出异常 return response #不登录采购单位搜索 def notloggedin_buysearch(self,keyword): headers = GSTORE['headers'] s = GSTORE['s'] data = {"buyerName":keyword,"province":[],"city":[],"buyerClass":[],"isCheckFollow":False,"isCheckReceive":False,"isContact":0,"pageSize":10,"pageNum":1} response =s.post(f"{cfg.target_host}/jyapi/jybx/buyer/eType/buyerList", headers=headers, json=data) # response.raise_for_status() # 如果请求失败,会抛出异常 return response #消息中心列表 def get_messagelist(self): headers = GSTORE['headers'] s = GSTORE['s'] response=s.post(f"{cfg.target_host}/jyapi/messageCenter/MessageList",headers=headers,json= { "isColumn": True, "isColumnNewMsg": False, "isMsgList": True, "msgType": -1, "isRead": -1, "offset": 1, "size": 20 }) return response #点击查看消息消息状态修改成已读 def get_markread(self): hearders = GSTORE['headers'] s = GSTORE['s'] json={ "msgId":285266 } response = s.post(f"{cfg.target_host}/jyapi/messageCenter/markRead", headers=hearders,json=json) return response """用户中台""" def get_userCenter(self): hearders = GSTORE['headers'] s = GSTORE['s'] response = s.post(f"{cfg.target_host}/userCenter/workDesktop/menuInfo", headers=hearders) return response """我的订单""" def get_myOrder(self): hearders = GSTORE['headers'] s = GSTORE['s'] params = { "type": 0, "pageNum": 1, "fromPage": "pc", "page_size": 10 } response = s.post(f"{cfg.target_host}/subscribepay/orderListDetails/myOrder", headers=hearders, params=params) return response """优惠卷""" def get_coupon(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "mold": 4, "currentPage": 1, "pageSize": 8, "platform": "P" } response= s.post(f"{cfg.target_host}/jyCoupon/getInfoByUser", headers=hearders,params=params) return response """数据自动导出""" def get_dataExport(self,publishtime=1672502400_1688951529,keyword='数据',selectType='title'): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "publishtime":publishtime, "area":"", "city":"", "region":"", "industry":"", "buyerclass":"", "keyword": [{"keyword":keyword,"appended":[],"exclude":[]}], "selectType":selectType, "minprice":"", "maxprice":"", "subtype":"", "buyer":"", "winner":"", "dataType": 2 } response = s.post(f"{cfg.target_host}/front/dataExport/sieveData", headers=hearders, params=params) return response """数据导出记录""" def Export_recordList(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "pageNum": 0, "pageSize": 10 } response = s.post(f"{cfg.target_host}/subscribepay/dataExportPack/recordList", headers=hearders, params=params) return response """数据导出-超出2w条,点击不在提示""" def export_prompt(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "status": 1 } response = s.post(f"{cfg.target_host}/front/dataExport/setDontPromptAgain", headers=hearders, params=params) return response """数据导出-判断是否展示弹框""" def export_frame(self): hearders = GSTORE['headers'] s = GSTORE['s'] response = s.post(f"{cfg.target_host}/front/dataExport/getDontPromptAgain", headers=hearders) return response """数据导出筛选条件列表""" def export_list(self): hearders = GSTORE['headers'] s = GSTORE['s'] response = s.get(f"{cfg.target_host}/subscribepay/dataExportPack/screenList", headers=hearders) return response """剑鱼文库搜索""" def Library_search(self,keyWord='数据'): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "keyWord":keyWord, "tag":"", "sort": "tSort", "num": 1, "size": 10 } response = s.post(f"{cfg.target_host}/jydocs/search", headers=hearders, params=params) return response """剑鱼文库收藏""" def Library_collection(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "sign": 1, "num": 1, "size": 10 } response = s.post(f"{cfg.target_host}/jydocs/user/list", headers=hearders, params=params) return response """剑鱼文库我的文库""" def My_library(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "sign": 0, "num": 1, "size": 10 } response = s.post(f"{cfg.target_host}/jydocs/user/list", headers=hearders, params=params) return response """项目进度监控""" def Project_monitoring(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "pageNum": 0, "pageSize": 500 } response = s.post(f"{cfg.target_host}/bigmember/follow/project/list", headers=hearders, params=params) return response """企业情报监控""" def Enterprise_monitoring(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "pageNum": 0, "pageSize": 10, "match":"", "group":"", } response = s.post(f"{cfg.target_host}/bigmember/follow/ent/list", headers=hearders, params=params) return response """客户监控""" def Customer_monitoring(self,): hearders = GSTORE['headers'] s = GSTORE['s'] params = { "pagesize": 10, "pageno": 0, "keyword":"" } response = s.post(f"{cfg.target_host}/publicapply/customer/list", headers=hearders, params=params) return response """标讯收藏""" def Message_Collection(self): hearders = GSTORE['headers'] s = GSTORE['s'] params={ "buyerPhone":0, "buyerclass":"", "label":"", "pagenum":1, "pagesize":50, "selectTime":"", "winnerPhone":0 } response = s.post(f"{cfg.target_host}/publicapply/bidcoll/list", headers=hearders, params=params) return response """标讯收藏""" #信息获取 def Getuser(self): headers = GSTORE['headers'] s = GSTORE['s'] response = s.get(f"{cfg.target_host}/jypay/user/getAccountInfo", headers=headers) return response #密码校验 def Check_password(self): headers = GSTORE['headers'] s = GSTORE['s'] params = { "password": "#80Z!RVv52" } response = s.post(f"{cfg.target_host}/publicapply/password/check", headers=headers, params=params) return response #身份获取 def Identity_list(self,n=0): headers = GSTORE['headers'] s = GSTORE['s'] params = { } response = s.post(f"{cfg.target_host}/publicapply/identity/list",headers=headers, params=params) # 解析响应内容为JSON response_json = response.json() # 从JSON响应中提取token self.token = response_json['data'][n]['token'] return response def Identity_switch(self): headers = GSTORE['headers'] s = GSTORE["s"] params = { "token":self.token } response =s.post(f"{cfg.target_host}/publicapply/identity/switch", headers=headers, params=params) return response """获取用户信息""" def User_info(self): headers = { 'content-Type': 'application/json', 'appId': '10000', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67' } s = GSTORE["s"] params = {} response =s.post(f"{cfg.target_host}/userCenter/ent/userInfo",headers=headers,params=params) return response def Whether_buy(self): headers = GSTORE["headers"] s = GSTORE["s"] response =s.get(f"{cfg.target_host}/entnicheNew/buy/whetherbuy",headers=headers) return response def Commonly_List(self): headers = { 'content-Type': 'application/json', 'appId': '10000', 'userid': '63a41aa5cd7ea10389b2a8f3' } s = GSTORE['s'] response =s.post(f"{cfg.target_host}/userCenter/workDesktop/renew/commonlyList",headers=headers) return response """权限校验""" def Authorised_info(self): headers = { "Content-Type":"application/json", "functionCode":"znsj_kf_use" } s = GSTORE['s'] response =s.post(f"{cfg.target_host}/resourceCenter/waitEmpowerDetail",headers=headers) return response """未登录采购单位列表""" def nologin_buyer_list(self): headers = GSTORE['headers'] s = GSTORE['s'] data = {"buyerName":"","province":[],"city":[],"buyerClass":[],"isCheckFollow":False,"isCheckReceive":False,"isContact":0,"pageSize":10,"pageNum":1} response =s.post(f"{cfg.target_host}/jyapi/jybx/buyer/eType/buyerList",headers=headers,json=data) return response """未登录供应商列表""" def nologin_supplySearch(self): headers = GSTORE['headers'] s = GSTORE['s'] data = {"keywords":"信息","searchType":"title","province":"","city":"","time":"","status":"0","pageSize":50,"pageIndex":1} response =s.post(f"{cfg.target_host}/jyinfo/supplySearch",headers=headers,json=data) return response """未登录企业搜索""" def nologin_enterpriseSearch(self): headers = GSTORE['headers'] s = GSTORE['s'] params = {"match":"科技"} response =s.post(f"{cfg.target_host}/publicapply/enterpriseSearch/doQuery",headers=headers, params=params) return response """未登录招标信息搜索""" def nologin_Tender_search(self ): headers = GSTORE['headers'] s = GSTORE['s'] params = {"match":"科技"} response =s.post(f"{cfg.target_host}/jyapi/jybx/core/fType/searchList",headers=headers, params=params) return response """未登录拟在建搜索""" def nologin_Proposed_construction_search(self ): headers = GSTORE['headers'] s = GSTORE['s'] params = {"match":"科技"} response =s.post(f"{cfg.target_host}/front/project/nzj/search",headers=headers, params=params) return response """未登录拟在建搜索详情页""" def nologin_proposed_construction_content(self): headers = GSTORE['headers'] s = GSTORE['s'] params = {"pid":"040600055c0f00594f4351560153085d530f4e4200035201"} response =s.post(f"{cfg.target_host}/front/project/nzj/details",headers=headers, params=params) return response """工作桌面首页行业报告""" def industry_report(self): headers = GSTORE['headers'] s = GSTORE['s'] response = s.get(f"{cfg.target_host}/front/project/deskAnalysisReport", headers=headers) return response """工作桌面首页商机情报详情页""" def business_opportunity(self): headers = GSTORE['headers'] s = GSTORE['s'] # id='ABCY1xaYD0FJD0sNHN6cAcSMTI4DRZjV3diPB43LyEgfGpzbQFUCV8=' response = s.get(f"{cfg.target_host}/bigmember/project/businessDetails?id=ABCY1xaYD0FJD0sNHN6cAcSMTI4DRZjV3diPB43LyEgfGpzbQFUCV8=", headers=headers,data={}) return response """大会员推送记录列表""" def bigmember_push(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/mType/list", headers=headers, json= { "pageNum": 1, "pageSize": 50, "format": "table", "area": "", "selectTime": "all", "city": "", "buyerClass": "", "subtype": "", "industry": "", "keyWords": "", "fileExists": "", "price": "", "source": "", "exportNum": "", "district": "", "isRead": "", "vt": "m" }) return response """商机管理推送记录""" def entniche_push(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/eType/list", headers=headers, json= { "pageNum": 1, "pageSize": 50, "format": "table", "area": "", "selectTime": "all", "city": "", "buyerClass": "", "subtype": "", "industry": "", "keyWords": "", "fileExists": "", "price": "", "source": "", "exportNum": "", "district": "", "isRead": "", "vt": "s" }) return response """订阅搜索""" #免费用户订阅搜索 def free_subscription_search(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/fType/list", headers=headers, json= { "pageNum": 1, "pageSize": 50, "format": "table", "area": "", "selectTime": "1718534447_1721126447", "city": "", "buyerClass": "", "subtype": "", "industry": "", "keyWords": "", "fileExists": "", "price": "", "source": "", "exportNum": "", "district": "", "isRead": "", "vt": "" }) return response #超级订阅用户订阅搜索 def svip_subscription_search(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/vType/list", headers=headers, json= { "pageNum": 1, "pageSize": 50, "format": "table", "area": "安徽", "selectTime": "1675094400_1719763199", "city": "", "buyerClass": "传媒,采矿业,电信行业,金融业,建筑业,能源化工,农林牧渔,批发零售,信息技术,运输物流,制造业,住宿餐饮", "subtype": "招标公告,招标,邀标,询价,竞谈,单一,竞价,变更", "industry": "", "keyWords": "计算机", "fileExists": "", "price": "", "source": "", "exportNum": "", "district": "", "isRead": "", "vt": "v" }) return response #大会员用户订阅搜索 def bigmember_subscription_search(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/mType/list", headers=headers, json= { "pageNum": 1, "pageSize": 50, "format": "table", "area": "安徽,北京,甘肃", "selectTime": "1672502400_1719763199", "city": "", "buyerClass": "人大,政协,党委办,组织,宣传,统战,纪委,政府办,发改,财政,教育,科技,工信,民政,民宗,人社,公安,检察院,法院,司法,应急管理,军队,自然资源,生态环境,住建,市政,城管,交通,水利,农业,气象,文旅,卫健委,医疗,学校,档案,体育,政务中心,机关事务,国资委,海关,税务,市场监管,商务,人行,银保监,证监,审计,出版广电,统计,公共资源交易,社会团体", "subtype": "招标预告,预告,预审,预审结果,论证意见,需求公示,招标结果,中标,成交,废标,流标", "industry": "", "keyWords": "信息,科技,能源", "fileExists": "", "price": "", "source": "", "exportNum": "", "isRead": "1", "vt": "m" }) return response #新增项目关注 def new_project_attention(self): headers = GSTORE['headers'] response =self.s.post(f"{cfg.target_host}/bigmember/follow/project/add?sid=ABCY1xaYD0vLD86RHxkcwcvJDAZDSB3dlVkKDgzIDodeGpzfQlUCdA=",headers=headers) return response #关注项目地区筛选 def project_screenarea(self): headers = GSTORE['headers'] response =self.s.get(f"{cfg.target_host}/bigmember/follow/project/screenArea",headers=headers, params={}) return response #商机管理订阅搜索 def entname_subscription_search(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/jybx/subscribe/eType/list", headers=headers, json= { "pageNum": 1, "pageSize": 50, "format": "table", "area": "安徽,重庆,河南,江苏", "selectTime": "1672502400_1719763199", "city": "", "buyerClass": "", "subtype": "招标公告,招标,邀标,询价,竞谈,单一,竞价,变更,招标结果,中标,成交,废标,流标", "industry": "农林牧渔_生产物资,农林牧渔_生产设备,农林牧渔_相关服务", "keyWords": "农业,园林,森林", "fileExists": "", "price": "", "source": "1", "exportNum": "", "isRead": "", "vt": "s" }) return response #招标采购搜索信息对外接口 def bid_search_api(self): headers = { 'Accept-Charset': 'utf-8', 'timestamp': '1726207205', 'sign': '1058A958B8D562EA9F2234AC42716778', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } response = self.s.post(f"{cfg.target_host_api}/thirdpartyapi/standard/bid/search?appid=jianyuDev", headers=headers, json={ "releaseTimeStart": 1695456870,"keyWord": ["市政粮食"],"searchMode":1}) return response #招标采购搜索信息普通字段包对外接口 def bid_ordinarypkg(self): headers = { 'Accept-Charset': 'utf-8', 'timestamp': '1726207205', 'sign': '1058A958B8D562EA9F2234AC42716778', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } response = self.s.post(f"{cfg.target_host_api}/thirdpartyapi/standard/bid/ordinarypkg?appid=jianyuDev", headers=headers, json={"id": "040654015b5f500a1b4774135a57015651085a585c4f4c7605dc"}) return response # 招标采购搜索信息高级字段包对外接口 def bid_seniorpkg(self): headers = { 'Accept-Charset': 'utf-8', 'timestamp': '1726207205', 'sign': '1058A958B8D562EA9F2234AC42716778', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } response = self.s.post(f"{cfg.target_host_api}/thirdpartyapi/standard/bid/seniorpkg?appid=jianyuDev", headers=headers, json={"id": "040654015b5f500a1b4774135a57015651085a585c4f4c7605dc"}) return response #招标采购信息附件下载接口 def bid_annex(self): headers = { 'Accept-Charset': 'utf-8', 'timestamp': '1726207205', 'sign': '1058A958B8D562EA9F2234AC42716778', 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } response = self.s.post(f"{cfg.target_host_api}/thirdpartyapi/standard/bid/annex?appid=jianyuDev", headers=headers, json={"fid":"040507510f0a075618137614500208530c095c500f1814785d44030451"}) return response #客户标签类型获取 def customer_getLabel(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/entnicheNew/customer/getLabel", headers=headers, json={}) return response #商机管理 def customer_attention(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/publicapply/customer/attention", headers=headers, json={"name":"石嘴山市交通工程建设管理中心","mold":1,"D":True}) return response #app登录后获取新用户留资报表 def new_uer_sales(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host_app}/salesLeads/appIsNewUerSales", headers=headers, json={}) return response #定制化分析报告_超前项目 def reports_advanced_projects(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/leadGeneration/getDate", headers=headers, json={}) return response #行为采集 def behavior_acquisition(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/leadGeneration/clickRecord", headers=headers, json={"type":"B"}) return response #获取样例报告信息 def sample_report_information(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/salesLeads/sampleReport", headers=headers, json={}) return response #项目名称联想 def project_name_lenovo(self): headers = GSTORE['headers'] params = {"pName":[ "科技信息"]} response = self.s.post(f"{cfg.target_host}/bigmember/analysis/projectName", headers=headers, params=params) return response #采购单位名称联想 def buyer_name_lenovo(self): headers = GSTORE['headers'] params = { "pName":[ "科技信息"], "sType":1 } response = self.s.post(f"{cfg.target_host}/bigmember/analysis/projectName", headers=headers, params=params) return response #投标决策分析内容 def bidding_decision_analysis_content(self): headers = GSTORE['headers'] json = { "area": {"北京": []}, "buyer": "北京大学", "buyerContent": [ { "key": ["设备"], "matchway": 1 }, { "key": ["多功能投影仪"], "matchway": 1 }], "pname": "", "limitTime": "fiveYear", "searchItem": 1, "searchType": 0, "page": 1, "pageSize": 5 } response = self.s.post(f"{cfg.target_host}/bigmember/decision/decInfo", headers=headers, json=json) return response #app新用户报表提交/跳过 def app_new_uer_sales(self): headers = GSTORE['headers'] json = { "area##business":"广州,北京", "company":"剑鱼", "department":"其他", "model##business":"投标方", "position":"职员", "product##engineering":"交通设施,外工程", "product##product":"化工用品包装材料,机电设备", "product##seryice":"广告传媒,餐饮服务" } response = self.s.post(f"{cfg.target_host_app}/salesLeads/appNewUerSales", headers=headers, json=json) return response # 商机情报详情页 def business_intelligence(self): headers = GSTORE['headers'] = GSTORE['headers'] response = self.s.post( f"{cfg.target_host}/bigmember/project/businessDetails?id=ABCY1xZdSlYKD0sAnd6cAcSMTI4DRZjV3diPB40OS8NfGpzZQFUCYI%3D", headers=headers) return response def me_Monitoring(self): headers = GSTORE['headers'] response = self.s.get(f"{cfg.target_host}/bigmember/project/meMonitoring", headers=headers) return response # 要闻 def important_News(self): heards = GSTORE['headers'] response = self.s.get(f"{cfg.target_host}/front/project/importantNews", headers=heards) return response # 文库分类 def jydocs_indexTag(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jydocs/indexTag", headers=headers, json={}) return response # 文库搜索 def jydocs_search(self): headers = GSTORE["headers"] response = self.s.post(f"{cfg.target_host}/jydocs/search?keyWord=法律", headers=headers) return response # 文库详情 def jydocs_detail(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jydocs/detail?docId=docin-4756450426", headers=headers) return response # 留资提交2.0 def salesLeads_collectInfo(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/salesLeads/collectInfo", headers=headers, json={"source": "article_proposed_project", "name": "质量部测试", "phone": "18211989987", "mail": "1550987547@qq.com", "industry": "", "company": "质量部测试企业", "companyType": "投标企业", "position": "总裁", "workScope": "", "partnerNeeds": "", "agree": True, "branch": ""}) return response # 留资提交 def salesLeads_retainedCapital(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/salesLeads/retainedCapital", headers=headers, params={"source": "article_proposed_project"}) return response #供应信息附件下载 def supplyInfo_attachment(self): headers = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/publicapply/attachment/supplyInfo?msgId=RV9AWV1F&fileName=%E7%8E%8B%E7%BF%94%E5%90%8D%E7%89%87.png", headers=headers,) return response #采购联系人 def buyer_contact(self): header = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/bigmember/search/buyer/association?name=河南科技大学", headers=header) return response #消息查看记录 def msgopenlog(self): header = GSTORE['headers'] data={ "msgLogId":287204, "platform":1} response = self.s.post(f"{cfg.target_host}/jyapi/messageCenter/msgOpenLog",headers=header,json=data) return response #查看消息详情 def messagedetail(self): header = GSTORE['headers'] data={ "msgLogId":240818 } response = self.s.post(f"{cfg.target_host}/jyapi/messageCenter/messageDetail",headers=header,json=data) return response #拟在建搜索条件接口 def conditions(self): header = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/front/project/nzj/conditions",headers=header) return response # 工作桌面商机情报列表 def workdesk_intelligence(self): header = GSTORE['headers'] data = { "msgType": 5, "needDealtWithCount": True, "offset": 1, "size": 5 } response =self.s.post(f"{cfg.target_host}/jyapi/messageCenter/WorkDeskList", headers=header, json=data) return response #商品积分查询 def goods_points(self): header = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyintegral/productPoints/introduction",headers=header) return response #工作桌面首页-我的消息-未读消息数量 def workdesk_unread_messages(self): header = GSTORE['headers'] json ={ "size": 20, "msgType": -1, "offset": 1, "isRead": 0, "isColumn": False, "isColumnNewMsg": False, "isContainLetter": False, "isMsgList": False } response =self.s.post(f"{cfg.target_host}/jyapi/messageCenter/MessageList",headers=header,json=json) return response #一键清空未读消息 def clear_unread_msg(self): header = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/jyapi/messageCenter/ClearUnreadMsg",headers = header) return response #新商机管理-企业订阅-修改关键词设置 def update_keyword_entnichenew(self): header = GSTORE['headers'] json = { "a_items": [ { "a_key": [ { "key": [ "计算机" ], "notkey": [], "matchway": 1 } ], "groupIndex": 0, "opened": True, "s_item": "测试分组", "showForm": False, "updatetime": 0 } ], "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/key/update", headers=header,json=json) return response #获取区域设置 def entnichenew_area(self): headers = GSTORE['headers'] s = GSTORE['s'] response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/area/get",headers = headers) return response #商机管理-企业订阅-获取关键词设置 def keywords_get(self): headers = GSTORE['headers'] data ={ "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/key/get", headers=headers, data =data) return response #商机管理-企业订阅-获取推送设置 def pushset_get(self): headers = GSTORE['headers'] data={ "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/pushset/get", headers=headers, data=data) return response #设置登录保持期 def set_keeplogin(self): headers = GSTORE['headers'] data = { "isAutoLogin" : "true" , "loginER" : "NSg7BAFFBQMHBkZXRFpXRAsDBQJHBzwN___NjcAJDRFBQMHBkZXRFpXRAsDBQJHGTE8" } response = self.s.post(f"{cfg.target_host}/free/setKeepLogin",headers = headers ,params = data) return response #发送邮箱验证码 def send_mailverify(self): headers = GSTORE["headers"] data = { "email" : "luwenna@topnet.net.cn" } response = self.s.post(f"{cfg.target_host}/front/dataExport/sendMailVerify",headers = headers , params = data) return response #验证数据导出邮箱验证码 def check_mailverify(self): headers = GSTORE["headers"] data = { "emailVerity" : "YQ09Z1", "email" : "luwenna@topnet.net.cn" } response = self.s.post(f"{cfg.target_host}/front/dataExport/checkMailVerify", headers = headers ,data = data) return response #数据导出筛选预览数据 def preview_sievedata(self): headers = GSTORE["headers"] data = { "publishtime": "1566214955 _1660909355", "area": "北京", "city": "成都市", "region": "成都市,北京", "industry": "建筑工程_勘察设计, 建筑工程_工程施工, 建筑工程_监理咨询, 建筑工程_材料设备, 建筑工程_机电安装", "buyerclass": "党委办, 财政", "keyword": [{ "keyword": "软件", "appended": [], "exclude": [] }], "selectType": "filetext, title, detail, title", "minprice": "0.1", "maxprice": "1000000", "subtype": "中标, 成交, 废标, 流标" } response =self.s.post(f"{cfg.target_host}/front/dataExport/sieveData",headers = headers ,data =data) return response # 修改采购单位类型设置 def entnichenew_buyesclass_update(self): headers = GSTORE['headers'] s = GSTORE['s'] json = { "buyer": [ "法院", "发改" ], "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/area/get", headers=headers,json=json) return response # 获取信息类型 def entnichenew_infotype(self): headers = GSTORE['headers'] s = GSTORE['s'] json = { "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/index", headers=headers,json=json) return response # 修改信息类型 def entnichenew_infotype_update(self): headers = GSTORE['headers'] s = GSTORE['s'] json = { "msg": [ "预告", "预审", "预审结果", "论证意见", "需求公示" ], "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/infotype/update", headers=headers,json=json) return response #商机管理-企业订阅-修改推送设置 def entnichenew_pushset_update(self): headers = GSTORE['headers'] data = { "ratemode": 2, "apppush": 1, "mailpush": 1, "matchway": 2, "wxpush": 1, "matchmode": "title,detail", "switch": 1, "identity": "ent" } response = self.s.post(f"{cfg.target_host}/entnicheNew/subscribe/pushset/update", headers=headers,data=data) return response #未登录“拟在建项目”详情页 def nzj_details(self): headers = GSTORE['headers'] params = { "pid":"040709005b58070d4a46020952075350590c1d4302530802" } response = self.s.get(f"{cfg.target_host}/front/project/nzj/details", headers=headers,params=params) return response #获取小程序广告位 def debrisproduct_getAdList(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/free/getAdList", headers=headers_1,data={"Code":"jy-appsearch-home-top"}) return response #获取地理位置 def debrisproduct_getLocation(self): headers_1 = GSTORE['headers_1'] response = self.s.get(f"{cfg.target_host_wxapi}/debrisproduct/free/location", headers=headers_1) return response #所有商机获取 def debrisproduct_getAllBusiness(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/free/getAllBusiness", headers=headers_1) return response #获取小程序配置 def debrisproduct_getConfig(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/free/getSetting", headers=headers_1) return response #业主单位名称联想 def debrisproduct_getAssName(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/buyer/getAssName", headers=headers_1,data={"buyerName":"科技"}) return response #业主单位列表 def debrisproduct_getList(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/buyer/getList", headers=headers_1,data={"page_num":10,"pagesize":1}) return response #业主搜索历史 def debrisproduct_buyer_getSearchList(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/buyer/getSearchList", headers=headers_1) return response #获取价格列表 def debrisproduct_getPriceList(self): headers_1 = GSTORE['headers_1'] response = self.s.post(f"{cfg.target_host_wxapi}/debrisproduct/getPriceList", headers=headers_1) return response """子部门""" def subdepartment(self): hearders = GSTORE['headers'] s = GSTORE['s'] data = { "id":26741 } response = s.post(f"{cfg.target_host}/entbase/department/childrens", headers=hearders,params=data) return response """添加人员""" def add_personnel(self): hearders = GSTORE['headers'] s = GSTORE['s'] data = { "name":"贾路瑶", "phone":"19588533063", "id":26741 } response = s.post(f"{cfg.target_host}/entbase/person/add", headers=hearders, params=data) return response """企业信息""" def enterprise_information(self): hearders = GSTORE['headers'] s = GSTORE['s'] params= { "t":"1736561573400" } response = s.get(f"{cfg.target_host}/entbase/ent/entinfo", headers=hearders, params=params) return response """分发设置列表接口""" def distribution_list(self): hearders = GSTORE['headers'] s = GSTORE['s'] data= { "identity":"ent" } response = s.get(f"{cfg.target_host}/entnicheNew/distribute/list", headers=hearders, data=data) return response """分发规则修改接口""" def distribution_update(self): hearders = GSTORE['headers'] s = GSTORE['s'] json= { "id": "67820b98e7364a57c3b7eaf5", "buyerclass": ["财政"], "items": ["测试分组"], "persons": [96208], "area": {"安徽": []}, "district": {}, "identity": "ent" } response = s.get(f"{cfg.target_host}/entnicheNew/distribute/update", headers=hearders, data=json) return response """分发规则详情接口""" def distribution_details(self): hearders = GSTORE['headers'] s = GSTORE['s'] data= { "identity": "ent" } response = s.get(f"{cfg.target_host_app}/entnicheNew/distribute/detail?id=67820b98e7364a57c3b7eaf5", headers=hearders, data=data) return response """采购单位画像数据""" def prchasing_profile_data(self): hearders = GSTORE['headers'] s = GSTORE['s'] params= { "buyer":"黎明化工研究设计院有限责任公司", "exactMatch":0 } response = s.post(f"{cfg.target_host}/entnicheNew/portrait/buyer/getData", headers=hearders, params=params) return response """最新项目动态""" def latest_project_news(self): hearders = GSTORE['headers'] s = GSTORE['s'] params = { "buyer": "黎明化工研究设计院有限责任公司", "PageNum": 1, "pageSize": 5 } response = s.post(f"{cfg.target_host}/entnicheNew/portrait/buyer/getNewMsg", headers=hearders, params=params) return response #搜索结果数据导出 def searchexport(self): hearders = GSTORE['headers'] data ={ "searchvalue":"软件", "selectIds":"ABCY1xWZC4%2FMD8vM3dzc1w4PiRfTBZmYUFkKD8jJiEgaGdzYFJUCnA%3D" } response = self.s.post(f"{cfg.target_host}/front/wx_dataExport/searchExport", headers=hearders,data=data) return response #电销 #个人信息查询 def personal_info(self): header = GSTORE['headers'] session=self.get_session() json = { "sid":session } response = self.s.post(f"{cfg.target_host}/jyapi/biService/myInfo", headers=header,json=json) return response #是否登录 def hassign(self): header = GSTORE['headers'] response = self.s.post(f"{cfg.target_host}/front/hasSign",headers=header) return response apimgr = APIMgr()