starry / backend /libs /PyProcessor.ts
k-l-lambda's picture
feat: add Python ML services (CPU mode) with model download
2b7aae2
import { getPortPromise } from 'portfinder';
import { Options, PythonShell } from 'python-shell';
import { defaultsDeep } from 'lodash';
import ZeroClient, { Logger } from './ZeroClient';
export default class PyProcessor extends ZeroClient {
private readonly scriptPath: string;
private readonly options: Options;
private pyShell: PythonShell;
private retryCount: number = 0;
private retryDelay: number = 3000;
constructor(scriptPath: string, options: Options = {}, logger: Logger = console) {
super(logger);
this.scriptPath = scriptPath;
this.options = options;
}
async bind(port?: string | number) {
const freePort =
port ||
(await getPortPromise({
port: 12022,
stopPort: 12122,
}));
// "./streamPredictor.py", "--inspect"
const options = defaultsDeep(
{
args: [...(this.options.args || []), '-p', `${freePort}`],
},
this.options
);
this.logger.info(`[python-shell]: starting python shell. path: ${this.scriptPath}`);
this.pyShell = new PythonShell(this.scriptPath, options);
this.pyShell.stdout.on('data', (data) => this.logger.info(data));
this.pyShell.on('pythonError', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} pythonError:`, err));
this.pyShell.on('stderr', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} stderr:`, err));
this.pyShell.on('error', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} error:`, err));
this.pyShell.on('close', () => {
// python子进程关闭事件
if (this.retryCount < 5) {
this.retryCount++;
this.logger.info(`[python-shell]: ${this.scriptPath} will retry ${this.retryCount}th time after 3 seconds`);
setTimeout(() => {
this.bind();
}, this.retryDelay);
}
});
super.bind(`tcp://127.0.0.1:${freePort}`);
}
}