import { getPortPromise } from 'portfinder'; import { Options, PythonShell } from 'python-shell'; import { defaultsDeep } from 'lodash'; import ZeroClient, { Logger } from './ZeroClient'; export default class PyProcessor extends ZeroClient { private readonly scriptPath: string; private readonly options: Options; private pyShell: PythonShell; private retryCount: number = 0; private retryDelay: number = 3000; constructor(scriptPath: string, options: Options = {}, logger: Logger = console) { super(logger); this.scriptPath = scriptPath; this.options = options; } async bind(port?: string | number) { const freePort = port || (await getPortPromise({ port: 12022, stopPort: 12122, })); // "./streamPredictor.py", "--inspect" const options = defaultsDeep( { args: [...(this.options.args || []), '-p', `${freePort}`], }, this.options ); this.logger.info(`[python-shell]: starting python shell. path: ${this.scriptPath}`); this.pyShell = new PythonShell(this.scriptPath, options); this.pyShell.stdout.on('data', (data) => this.logger.info(data)); this.pyShell.on('pythonError', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} pythonError:`, err)); this.pyShell.on('stderr', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} stderr:`, err)); this.pyShell.on('error', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} error:`, err)); this.pyShell.on('close', () => { // python子进程关闭事件 if (this.retryCount < 5) { this.retryCount++; this.logger.info(`[python-shell]: ${this.scriptPath} will retry ${this.retryCount}th time after 3 seconds`); setTimeout(() => { this.bind(); }, this.retryDelay); } }); super.bind(`tcp://127.0.0.1:${freePort}`); } }