File size: 1,806 Bytes
2b7aae2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import { getPortPromise } from 'portfinder';
import { Options, PythonShell } from 'python-shell';
import { defaultsDeep } from 'lodash';
import ZeroClient, { Logger } from './ZeroClient';

export default class PyProcessor extends ZeroClient {
	private readonly scriptPath: string;
	private readonly options: Options;
	private pyShell: PythonShell;

	private retryCount: number = 0;
	private retryDelay: number = 3000;

	constructor(scriptPath: string, options: Options = {}, logger: Logger = console) {
		super(logger);
		this.scriptPath = scriptPath;
		this.options = options;
	}

	async bind(port?: string | number) {
		const freePort =
			port ||
			(await getPortPromise({
				port: 12022,
				stopPort: 12122,
			}));

		// "./streamPredictor.py", "--inspect"
		const options = defaultsDeep(
			{
				args: [...(this.options.args || []), '-p', `${freePort}`],
			},
			this.options
		);

		this.logger.info(`[python-shell]: starting python shell. path: ${this.scriptPath}`);

		this.pyShell = new PythonShell(this.scriptPath, options);

		this.pyShell.stdout.on('data', (data) => this.logger.info(data));

		this.pyShell.on('pythonError', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} pythonError:`, err));
		this.pyShell.on('stderr', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} stderr:`, err));
		this.pyShell.on('error', (err) => this.logger.error(`[python-shell]: ${this.scriptPath} error:`, err));
		this.pyShell.on('close', () => {
			// python子进程关闭事件
			if (this.retryCount < 5) {
				this.retryCount++;
				this.logger.info(`[python-shell]: ${this.scriptPath} will retry ${this.retryCount}th time after 3 seconds`);
				setTimeout(() => {
					this.bind();
				}, this.retryDelay);
			}
		});

		super.bind(`tcp://127.0.0.1:${freePort}`);
	}
}