import { pack, unpack } from 'msgpackr'; import { Request } from 'zeromq'; import { AsyncQueue } from './async-queue'; interface Response { code: number; msg: string; data?: any; } export interface Logger { info: (...data: any[]) => void; error: (...data: any[]) => void; } type PyArgs = any[]; type PyKwargs = Record; export default class ZeroClient { logger: Logger; private socket: Request; private queue: AsyncQueue = new AsyncQueue(); private url: string; constructor(logger: Logger = console) { this.logger = logger; } bind(url?: string) { url && (this.url = url); this.socket = new Request({ sendTimeout: 15e3, receiveTimeout: 300e3, }); this.socket.connect(this.url); } private __request(payload) { let retryTimes = 0; const req = async (data) => { try { if (this.socket.closed) this.bind(); return await this.socket.send(pack(data)).then(() => this.socket.receive()); } catch (err) { if (retryTimes < 2) { retryTimes++; console.log(`请求失败,${err.stack}`); console.error(`3s后重试第${retryTimes}次`); this.socket.close(); await new Promise((resolve) => setTimeout(resolve, 3000)); return req(data); } else { throw err; } } }; return req(payload); } async request(method: string, args: PyArgs | PyKwargs = null, kwargs: PyKwargs = null): Promise { const [args_, kwargs_] = Array.isArray(args) ? [args, kwargs] : [undefined, args]; const msg: any = { method }; if (args_) msg.args = args_; if (kwargs_) msg.kwargs = kwargs_; return this.queue.addTask([ async (opt) => { const [result] = await this.__request(opt); const obj = unpack(result) as Response; if (obj.code === 0) { return obj.data; } else { return Promise.reject(obj.msg); } }, msg, ]); } }