1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- # coding:utf-8
- '''
- 训练客户端
- '''
- import nsq
- import json
- from machine_models import train_model
- from loguru import logger
- from queue import Queue
- import time
- from threading import Thread
- logger.add('./logs/runtime_{time}.log', rotation='00:00')
- queueSave = Queue(maxsize=10000) # 任务队列
- def train_start():
- # 检查任务列表,开始训练
- global queueSave
- while True:
- if not queueSave.empty():
- params = queueSave.get()
- train_model(params)
- continue
- time.sleep(5)
- def handler(message):
- '''
- nsq队列回调函数
- :param message:
- :return:
- '''
- global queueSave
- try:
- body = message.body
- body = json.loads(body)
- queueSave.put(body)
- except Exception as e:
- logger.warning("start-->", e)
- return True
- r = nsq.Reader(message_handler=handler, nsqd_tcp_addresses=['192.168.3.13:4150'], topic='machine_train',
- channel='NO.1',
- lookupd_poll_interval=5,
- lookupd_connect_timeout=10000,
- lookupd_request_timeout=10000)
- if __name__ == '__main__':
- train_thread = Thread(target=train_start)
- train_thread.start()
- nsq.run()
- train_thread.join()
|