Spaces:
Runtime error
Runtime error
| from fmpy import * | |
| from fmpy import read_model_description, extract | |
| from fmpy.fmi2 import FMU2Slave | |
| import numpy as np | |
| import shutil | |
| import pandas as pd | |
| import random | |
| import plotly.graph_objects as go | |
| import json | |
| from fmpy import * | |
| from fmpy import read_model_description, extract | |
| from fmpy.fmi2 import FMU2Slave | |
| import numpy as np | |
| import shutil | |
| import pandas as pd | |
| import random | |
| import plotly.graph_objects as go | |
| import json | |
| df_profile = pd.read_csv("profile_processed.csv") | |
| def getProfileFromID(id): | |
| return df_profile[df_profile.ID==id].iloc[0, 1:].to_list() | |
| def simulation(id, kp, ki, kd, bis_target=40, min_noise=50, max_noise=150): | |
| profile = getProfileFromID(id) | |
| age = profile[0] | |
| weight = profile[1] | |
| height = profile[2] | |
| gender = profile[3] | |
| vrs = {} | |
| fmu = 'Pharmacokinetics_4_comportmental_model_PI_ref_FMU_base4_OAAS_lnx.fmu' | |
| model_description = read_model_description(fmu) | |
| for variable in model_description.modelVariables: | |
| vrs[variable.name] = variable.valueReference | |
| start_time = 0.0 | |
| stop_time = 7000 | |
| step_size = 1 | |
| unzipdir = extract(fmu) | |
| fmu = FMU2Slave(guid=model_description.guid, | |
| unzipDirectory=unzipdir, | |
| modelIdentifier=model_description.coSimulation.modelIdentifier, | |
| instanceName='instance1') | |
| # initialize | |
| fmu.instantiate() | |
| fmu.setupExperiment(startTime=start_time) | |
| fmu.enterInitializationMode() | |
| fmu.exitInitializationMode() | |
| fmu.setReal([vrs["amesim_interface.Age_year"]], [age]) | |
| fmu.setReal([vrs["amesim_interface.BIS0"]], [95.6]) | |
| fmu.setReal([vrs["amesim_interface.BISmin"]], [8.9]) | |
| fmu.setReal([vrs["amesim_interface.Drug_concentration_mgmL"]], [20]) | |
| fmu.setReal([vrs["amesim_interface.EC50"]], [2.23]) | |
| fmu.setReal([vrs["amesim_interface.Gamma"]], [1.58]) | |
| fmu.setReal([vrs["amesim_interface.Gender_0male_1female"]], [gender]) | |
| fmu.setReal([vrs["amesim_interface.Height_cm"]], [height]) | |
| fmu.setReal([vrs["amesim_interface.Infusion_rate_mLh"]], [200]) | |
| fmu.setReal([vrs["amesim_interface.Weight_kg"]], [weight]) | |
| vr_input = vrs["amesim_interface.Infusion_rate_mLh"] | |
| vr_output = vrs["amesim_interface.BIS_Index"] | |
| rows = [] # list to record the results | |
| time = start_time | |
| infusion_rate = 200 | |
| i = 0 | |
| target = bis_target | |
| last_error = 0 | |
| # simulation loop | |
| impulsive_noise = random.randint(min_noise, max_noise) | |
| print("noise level:", impulsive_noise) | |
| while time < stop_time: | |
| if time >= 2.4e3 and time < 4.5e3: | |
| target = 60 | |
| p = 0 | |
| i = 0 | |
| if time >= 4.5e3: | |
| target = bis_target | |
| p = 0 | |
| i = 0 | |
| bis = fmu.getReal([int(vr_output)])[0] if time > step_size else 95.6 | |
| p = bis - target | |
| i = i + p | |
| d = p - last_error | |
| last_error = p | |
| infusion_rate = np.clip(kp*p + ki*i + kd*d, 0, 200) | |
| # add impulsive noise to infusion rate | |
| n = time // 100 | |
| start = 100 * n | |
| end = start + 50 | |
| if (time > start and time < end and n % 15 == 0): | |
| infusion_rate += impulsive_noise | |
| fmu.setReal([vr_input], [int(infusion_rate)]) | |
| # perform one step | |
| fmu.doStep(currentCommunicationPoint=time, communicationStepSize=step_size) | |
| # advance the time | |
| time += step_size | |
| # get the values for 'inputs' and 'outputs[4]' | |
| inputs, outputs = fmu.getReal([int(vr_input), int(vr_output)]) | |
| # append the results | |
| rows.append((time, bis, inputs)) | |
| fmu.terminate() | |
| fmu.freeInstance() | |
| shutil.rmtree(unzipdir, ignore_errors=True) | |
| result = np.array(rows, dtype=np.dtype([('time', np.float64), ('BIS', np.float64), ('Infusion', np.float64)])) | |
| return result, impulsive_noise | |
| def plot_result(result, show_original): | |
| df = pd.DataFrame(result) | |
| trace1 = go.Scatter(x=df.index, y=df['BIS'], mode='lines', name='BIS') | |
| fig1 = go.Figure(data=trace1) | |
| fig1.update_layout(height=400, width=1200, title_text="BIS evolution") | |
| # Add a line trace for column_2 in the second subplot | |
| trace2 = go.Scatter(x=df.index, y=df['Infusion'], mode='lines', name='Infusion rate') | |
| fig2 = go.Figure(data=trace2) | |
| fig2.update_layout(height=400, width=1200, title_text="Infusion rate evolution") | |
| if show_original: | |
| result_baseline = np.load("result_impulsive.npy") | |
| df_original = pd.DataFrame(result_baseline) | |
| fig1.add_trace(go.Scatter(x=df_original.index, y=df_original['BIS'], mode='lines', name='BIS original', line=dict(color="red"), opacity=0.5)) | |
| fig2.add_trace(go.Scatter(x=df_original.index, y=df_original['Infusion'], mode='lines', name='Infusion rate original', line=dict(color="red"), opacity=0.5)) | |
| else: | |
| np.save("result_impulsive.npy", result) | |
| return fig1, fig2 | |
| def gradio_display_profile(id): | |
| profile = getProfileFromID(id) | |
| gender = "Male" if profile[3] == 0 else "Female" | |
| data = {} | |
| data["age"] = [profile[0]] | |
| data["weight"] = [profile[1]] | |
| data["height"] = [profile[2]] | |
| data["gender"] = [gender] | |
| df = pd.DataFrame(data) | |
| return df | |
| def gradio_simulation(id, kp, ki, kd, show_original, bis_target, min_noise, max_noise): | |
| result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise) | |
| fig1, fig2 = plot_result(result, show_original) | |
| return fig1, fig2, noise_level | |
| def gradio_save(id, kp, ki, kd, bis_target, min_noise, max_noise): | |
| result, noise_level = simulation(id, kp, ki, kd, bis_target, min_noise, max_noise) | |
| patient_profile = getProfileFromID(id) | |
| # Assuming patient_profile is a list of 4 integers, bis_trace is a list of 7000 floats, and kp, ki, kd are floats | |
| data = { | |
| 'inputs': { | |
| 'patient_profile': { | |
| 'age': patient_profile[0], | |
| 'weight': patient_profile[1], | |
| 'height': patient_profile[2], | |
| 'gender': patient_profile[3] | |
| }, | |
| 'bis_trace': result['BIS'].tolist(), | |
| 'noise_level': noise_level | |
| }, | |
| 'outputs': { | |
| 'kp': kp, | |
| 'ki': ki, | |
| 'kd': kd | |
| } | |
| } | |
| with open(f'saved_data/patient-{id}.json', 'w') as f: | |
| json.dump(data, f) | |
| return "Saved" | |
| import gradio as gr | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("# BIS Target") | |
| bis_target = gr.Slider(minimum=0, maximum=100, step=1, value=30, label="BIS target") | |
| gr.Markdown("# Impulsive noise range") | |
| min_noise = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min") | |
| max_noise = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max") | |
| gr.Markdown("# Patient profile") | |
| id = gr.Number(value=1, precision=0, label="Patient ID") | |
| profile_output = gr.Dataframe(value=gradio_display_profile(1), label="Patient profile") | |
| id.change(gradio_display_profile, inputs=[id], outputs=[profile_output]) | |
| # with gr.Blocks(): | |
| # with gr.Accordion("noise range"): | |
| # min_pul = gr.Slider(minimum=0, maximum=50, step=1, value=50, label="noise min") | |
| # max_pul = gr.Slider(minimum=0, maximum=150, step=1, value=150, label="noise max") | |
| gr.Markdown("# PID parameters") | |
| with gr.Blocks(): | |
| kp = gr.Slider(minimum=0, maximum=20, value=4, label="kp") | |
| ki = gr.Slider(minimum=0, maximum=1, value=0.01, label="ki") | |
| kd = gr.Slider(minimum=0, maximum=200, value=0, label="kd") | |
| button = gr.Button("Simulate") | |
| show_original = gr.Checkbox(label="Show original") | |
| gr.Markdown("# Save the best parameters") | |
| save_result = gr.Button("Save") | |
| save_output = gr.Textbox(label="Save status") | |
| with gr.Column(scale=5): | |
| plot1 = gr.Plot(label="BIS evolution") | |
| plot2 = gr.Plot(label="Infusion rate evolution") | |
| button.click(gradio_simulation, inputs=[id, kp, ki, kd, show_original, bis_target, min_noise, max_noise], outputs=[plot1, plot2]) | |
| save_result.click(gradio_save, inputs=[id, kp, ki, kd, bis_target, min_noise, max_noise], outputs=[save_output]) | |
| demo.launch() |