starry / backend /libs /ZeroClient.ts
k-l-lambda's picture
feat: add Python ML services (CPU mode) with model download
2b7aae2
import { pack, unpack } from 'msgpackr';
import { Request } from 'zeromq';
import { AsyncQueue } from './async-queue';
interface Response {
code: number;
msg: string;
data?: any;
}
export interface Logger {
info: (...data: any[]) => void;
error: (...data: any[]) => void;
}
type PyArgs = any[];
type PyKwargs = Record<string, any>;
export default class ZeroClient {
logger: Logger;
private socket: Request;
private queue: AsyncQueue = new AsyncQueue();
private url: string;
constructor(logger: Logger = console) {
this.logger = logger;
}
bind(url?: string) {
url && (this.url = url);
this.socket = new Request({
sendTimeout: 15e3,
receiveTimeout: 300e3,
});
this.socket.connect(this.url);
}
private __request(payload) {
let retryTimes = 0;
const req = async (data) => {
try {
if (this.socket.closed) this.bind();
return await this.socket.send(pack(data)).then(() => this.socket.receive());
} catch (err) {
if (retryTimes < 2) {
retryTimes++;
console.log(`请求失败,${err.stack}`);
console.error(`3s后重试第${retryTimes}次`);
this.socket.close();
await new Promise((resolve) => setTimeout(resolve, 3000));
return req(data);
} else {
throw err;
}
}
};
return req(payload);
}
async request(method: string, args: PyArgs | PyKwargs = null, kwargs: PyKwargs = null): Promise<any> {
const [args_, kwargs_] = Array.isArray(args) ? [args, kwargs] : [undefined, args];
const msg: any = { method };
if (args_) msg.args = args_;
if (kwargs_) msg.kwargs = kwargs_;
return this.queue.addTask([
async (opt) => {
const [result] = await this.__request(opt);
const obj = unpack(result) as Response;
if (obj.code === 0) {
return obj.data;
} else {
return Promise.reject(obj.msg);
}
},
msg,
]);
}
}