Tulitula commited on
Commit
af6584b
·
verified ·
1 Parent(s): 6ab1999

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +120 -164
app.py CHANGED
@@ -1,178 +1,134 @@
1
- import os, io, math, warnings
2
- warnings.filterwarnings("ignore")
3
 
4
- from typing import List, Tuple, Dict
5
- import numpy as np
6
  import pandas as pd
7
- import matplotlib.pyplot as plt
8
- import gradio as gr
9
- from PIL import Image
10
- import requests
11
  import yfinance as yf
12
-
13
- # ---------------- config ----------------
14
- DATA_DIR = "data"
15
- MAX_TICKERS = 30
16
- DEFAULT_LOOKBACK_YEARS = 5
17
- MARKET_TICKER = "VOO"
18
- POS_COLS = ["ticker", "weight_exposure", "beta", "er_p", "sigma_p"]
19
-
20
- FRED_MAP = [
21
- (1, "DGS1"), (2, "DGS2"), (3, "DGS3"),
22
- (5, "DGS5"), (7, "DGS7"), (10, "DGS10"),
23
- (20, "DGS20"), (30, "DGS30"), (100, "DGS30")
24
- ]
25
-
26
- # ---------------- helpers ----------------
27
- def ensure_data_dir():
28
- os.makedirs(DATA_DIR, exist_ok=True)
29
-
30
- def fred_series_for_horizon(years: float) -> str:
31
- y = max(1.0, min(100.0, float(years)))
32
- for cutoff, code in FRED_MAP:
33
- if y <= cutoff:
34
- return code
35
- return "DGS30"
36
-
37
- def fetch_fred_yield_annual(code: str) -> float:
38
- url = f"https://fred.stlouisfed.org/graph/fredgraph.csv?id={code}"
39
- try:
40
- r = requests.get(url, timeout=10)
41
- r.raise_for_status()
42
- df = pd.read_csv(io.StringIO(r.text))
43
- s = pd.to_numeric(df.iloc[:, 1], errors="coerce").dropna()
44
- return float(s.iloc[-1] / 100.0) if len(s) else 0.03
45
- except Exception:
46
- return 0.03
47
-
48
- def fetch_prices_monthly(tickers: List[str], years: int) -> pd.DataFrame:
49
- start = pd.Timestamp.today(tz="UTC") - pd.DateOffset(years=years, days=7)
50
- end = pd.Timestamp.today(tz="UTC")
51
- frames = []
52
- for t in tickers:
53
- try:
54
- s = yf.download(
55
- t, start=start.date(), end=end.date(),
56
- interval="1mo", auto_adjust=True, progress=False
57
- )["Close"]
58
- if isinstance(s, pd.Series) and s.dropna().size > 0:
59
- frames.append(s.rename(t))
60
- except Exception:
61
- pass
62
- if frames:
63
- return pd.concat(frames, axis=1).dropna(how="any").fillna(method="ffill")
64
- return pd.DataFrame()
65
-
66
- def monthly_returns(prices: pd.DataFrame) -> pd.DataFrame:
67
- return prices.pct_change().dropna()
68
-
69
- def annualize_mean(m):
70
- return np.asarray(m, dtype=float) * 12.0
71
-
72
- def annualize_sigma(s):
73
- return np.asarray(s, dtype=float) * math.sqrt(12.0)
74
-
75
- def estimate_all_moments_aligned(symbols: List[str], years: int, rf_ann: float):
76
- px = fetch_prices_monthly(symbols, years)
77
- rets = monthly_returns(px)
78
- rf_m = rf_ann / 12.0
79
- mu = rets.mean()
80
- sigma = rets.std(ddof=1)
81
- betas = {}
82
- mkt = rets[MARKET_TICKER]
83
- var_m = np.var(mkt - rf_m, ddof=1)
84
- for s in symbols:
85
- if s == MARKET_TICKER:
86
- betas[s] = 1.0
87
- else:
88
- ex_s = rets[s] - rf_m
89
- betas[s] = np.cov(ex_s, mkt - rf_m, ddof=1)[0,1] / var_m
90
- erp = annualize_mean(mu[MARKET_TICKER]) - rf_ann
91
- sigma_mkt = annualize_sigma(sigma[MARKET_TICKER])
92
- covA = pd.DataFrame(np.cov(rets.T) * 12.0, index=symbols, columns=symbols)
93
- return betas, covA, erp, sigma_mkt
94
-
95
- def capm_er(beta: float, rf_ann: float, erp_ann: float) -> float:
96
- return float(rf_ann + beta * erp_ann)
97
-
98
- def portfolio_stats(weights: Dict[str, float], cov_ann: pd.DataFrame,
99
- betas: Dict[str, float], rf_ann: float, erp_ann: float):
100
- tickers = list(weights.keys())
101
- w = np.array(list(weights.values()))
102
- w_expo = w / sum(abs(w))
103
- beta_p = np.dot([betas[t] for t in tickers], w_expo)
104
- er_p = capm_er(beta_p, rf_ann, erp_ann)
105
- cov = cov_ann.loc[tickers, tickers].to_numpy()
106
- sigma_p = math.sqrt(max(w_expo @ cov @ w_expo, 0.0))
107
- return beta_p, er_p, sigma_p
108
-
109
- def efficient_same_sigma(sigma_target, rf_ann, erp_ann, sigma_mkt):
110
- a = sigma_target / sigma_mkt
111
- return a, 1 - a, rf_ann + a * erp_ann
112
-
113
- def efficient_same_return(mu_target, rf_ann, erp_ann, sigma_mkt):
114
- a = (mu_target - rf_ann) / erp_ann
115
- return a, 1 - a, abs(a) * sigma_mkt
116
-
117
- def build_synthetic_dataset(symbols: List[str], years: int, rf_ann: float, erp_ann: float):
118
- betas, covA, _, _ = estimate_all_moments_aligned(symbols, years, rf_ann)
119
- rng = np.random.default_rng(42)
120
- rows = []
121
- for _ in range(1000):
122
- k = rng.integers(2, len(symbols)+1)
123
- picks = list(rng.choice(symbols, size=k, replace=False))
124
- raw = rng.dirichlet(np.ones(k))
125
- gross = 1.0 + rng.gamma(2.0, 0.5)
126
- w = gross * raw
127
- stats = portfolio_stats({picks[i]: w[i] for i in range(k)}, covA, betas, rf_ann, erp_ann)
128
- rows.append({
129
- "tickers": ",".join(picks),
130
- "weights": ",".join(f"{x:.4f}" for x in w),
131
- "beta_p": stats[0], "er_p": stats[1], "sigma_p": stats[2]
132
  })
133
- return pd.DataFrame(rows)
134
 
135
- def select_risk_profiles(df):
136
- high = df.sort_values("er_p", ascending=False).head(1)
137
- low = df.sort_values("sigma_p", ascending=True).head(1)
138
- med_idx = ((df["er_p"] - df["er_p"].median())**2 + (df["sigma_p"] - df["sigma_p"].median())**2).idxmin()
139
- medium = df.loc[[med_idx]]
 
 
 
 
 
 
140
  return high, medium, low
141
 
142
- # ---------------- main compute ----------------
143
- def compute(years_lookback, tickers_df):
144
- tickers_df["ticker"] = tickers_df["ticker"].str.upper().str.strip()
145
- tickers = tickers_df["ticker"].tolist()
146
- amounts = tickers_df["amount_usd"].tolist()
147
- rf_ann = fetch_fred_yield_annual(fred_series_for_horizon(5))
148
- betas, covA, erp_ann, sigma_mkt = estimate_all_moments_aligned(tickers + [MARKET_TICKER], years_lookback, rf_ann)
149
- weights = {t: a for t, a in zip(tickers, amounts)}
150
- beta_p, er_p, sigma_p = portfolio_stats(weights, covA, betas, rf_ann, erp_ann)
151
- eff_sigma = efficient_same_sigma(sigma_p, rf_ann, erp_ann, sigma_mkt)
152
- eff_return = efficient_same_return(er_p, rf_ann, erp_ann, sigma_mkt)
153
- synth_df = build_synthetic_dataset(tickers + [MARKET_TICKER], years_lookback, rf_ann, erp_ann)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
  high, medium, low = select_risk_profiles(synth_df)
155
- return {
156
- "user": (beta_p, er_p, sigma_p, weights),
157
- "eff_sigma": eff_sigma,
158
- "eff_return": eff_return,
159
- "high": high,
160
- "medium": medium,
161
- "low": low
 
 
162
  }
163
 
164
- # ---------------- UI ----------------
 
 
 
 
 
 
 
165
  with gr.Blocks() as demo:
166
- gr.Markdown("## Efficient Portfolio Advisor with Synthetic Risk Profiles")
167
- table = gr.Dataframe(headers=["ticker", "amount_usd"], datatype=["str", "number"], row_count=3)
168
- lookback = gr.Slider(1, 10, value=DEFAULT_LOOKBACK_YEARS, step=1, label="Lookback years")
169
- run_btn = gr.Button("Compute")
170
- output = gr.Textbox(label="Results")
171
- def run_app(lookback, table):
172
- res = compute(lookback, table)
173
- return str(res)
174
- run_btn.click(fn=run_app, inputs=[lookback, table], outputs=[output])
 
 
 
 
175
 
176
  if __name__ == "__main__":
177
- ensure_data_dir()
178
  demo.launch()
 
1
+ # app.py - Part 1
 
2
 
 
 
3
  import pandas as pd
4
+ import numpy as np
 
 
 
5
  import yfinance as yf
6
+ import gradio as gr
7
+ from itertools import combinations_with_replacement
8
+
9
+ # -------------------
10
+ # Helper functions
11
+ # -------------------
12
+
13
+ def fetch_live_data(tickers, period="1y"):
14
+ """Fetch historical adjusted close prices for given tickers."""
15
+ data = yf.download(tickers, period=period)["Adj Close"]
16
+ return data.dropna()
17
+
18
+ def calculate_portfolio_metrics(weights, mean_returns, cov_matrix, risk_free_rate=0.045):
19
+ """Return expected portfolio return, volatility, and beta."""
20
+ weights = np.array(weights)
21
+ portfolio_return = np.sum(mean_returns * weights)
22
+ portfolio_volatility = np.sqrt(np.dot(weights.T, np.dot(cov_matrix, weights)))
23
+ beta = np.sum(weights) # Placeholder if no real beta calc
24
+ return portfolio_return, portfolio_volatility, beta
25
+
26
+ def generate_synthetic_portfolios(tickers, num_portfolios=1000):
27
+ """Generate synthetic portfolios from live data for given tickers."""
28
+ df_prices = fetch_live_data(tickers)
29
+ returns = df_prices.pct_change().dropna()
30
+ mean_returns = returns.mean()
31
+ cov_matrix = returns.cov()
32
+
33
+ synthetic_data = []
34
+ for _ in range(num_portfolios):
35
+ weights = np.random.random(len(tickers))
36
+ weights /= np.sum(weights)
37
+ er, sigma, beta = calculate_portfolio_metrics(weights, mean_returns, cov_matrix)
38
+ synthetic_data.append({
39
+ "weights": weights,
40
+ "er_p": er,
41
+ "sigma_p": sigma,
42
+ "beta_p": beta
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  })
 
44
 
45
+ return pd.DataFrame(synthetic_data)
46
+
47
+ def select_risk_profiles(synth_df):
48
+ """Select high/high, medium/medium, low/low risk profiles from synthetic dataset."""
49
+ high = synth_df.sort_values("er_p", ascending=False).iloc[0]
50
+ low = synth_df.sort_values("sigma_p", ascending=True).iloc[0]
51
+
52
+ median_idx = ((synth_df["sigma_p"] - synth_df["sigma_p"].median()).abs() +
53
+ (synth_df["er_p"] - synth_df["er_p"].median()).abs()).idxmin()
54
+ medium = synth_df.loc[median_idx]
55
+
56
  return high, medium, low
57
 
58
+ def find_efficient_same_sigma(user_er, user_sigma, synth_df):
59
+ """Find portfolio with same sigma but highest return."""
60
+ close_sigma = synth_df[np.isclose(synth_df["sigma_p"], user_sigma, atol=0.002)]
61
+ if close_sigma.empty:
62
+ return synth_df.iloc[0]
63
+ return close_sigma.sort_values("er_p", ascending=False).iloc[0]
64
+
65
+ def find_efficient_same_return(user_er, user_sigma, synth_df):
66
+ """Find portfolio with same return but lowest sigma."""
67
+ close_return = synth_df[np.isclose(synth_df["er_p"], user_er, atol=0.002)]
68
+ if close_return.empty:
69
+ return synth_df.iloc[0]
70
+ return close_return.sort_values("sigma_p", ascending=True).iloc[0]
71
+ # -------------------
72
+ # Main compute function
73
+ # -------------------
74
+
75
+ def compute(user_tickers):
76
+ # Convert comma-separated string into ticker list
77
+ tickers = [t.strip().upper() for t in user_tickers.split(",") if t.strip()]
78
+ if len(tickers) < 2:
79
+ return "Please enter at least two tickers.", None
80
+
81
+ # Fetch live data & compute user portfolio metrics (equal weights for now)
82
+ df_prices = fetch_live_data(tickers)
83
+ if df_prices.empty:
84
+ return "Could not fetch data. Check tickers.", None
85
+
86
+ returns = df_prices.pct_change().dropna()
87
+ mean_returns = returns.mean()
88
+ cov_matrix = returns.cov()
89
+ user_weights = np.ones(len(tickers)) / len(tickers)
90
+ user_er, user_sigma, user_beta = calculate_portfolio_metrics(user_weights, mean_returns, cov_matrix)
91
+
92
+ # Generate synthetic dataset
93
+ synth_df = generate_synthetic_portfolios(tickers, num_portfolios=1000)
94
+
95
+ # Select profiles
96
+ eff_sigma = find_efficient_same_sigma(user_er, user_sigma, synth_df)
97
+ eff_return = find_efficient_same_return(user_er, user_sigma, synth_df)
98
  high, medium, low = select_risk_profiles(synth_df)
99
+
100
+ # Prepare results DataFrame
101
+ portfolios = {
102
+ "User Portfolio": [user_er, user_sigma, user_beta, user_weights],
103
+ "Efficient (Same Sigma)": [eff_sigma.er_p, eff_sigma.sigma_p, eff_sigma.beta_p, eff_sigma.weights],
104
+ "Efficient (Same Return)": [eff_return.er_p, eff_return.sigma_p, eff_return.beta_p, eff_return.weights],
105
+ "High Risk / High Return": [high.er_p, high.sigma_p, high.beta_p, high.weights],
106
+ "Medium Risk / Medium Return": [medium.er_p, medium.sigma_p, medium.beta_p, medium.weights],
107
+ "Low Risk / Low Return": [low.er_p, low.sigma_p, low.beta_p, low.weights],
108
  }
109
 
110
+ df_out = pd.DataFrame(portfolios, index=["Expected Return", "Sigma", "Beta", "Weights"])
111
+
112
+ return df_out.to_markdown(), df_out
113
+
114
+ # -------------------
115
+ # Gradio Interface
116
+ # -------------------
117
+
118
  with gr.Blocks() as demo:
119
+ gr.Markdown("## Portfolio Optimizer and Risk Profiles")
120
+ tickers_input = gr.Textbox(label="Enter tickers (comma separated)", placeholder="AAPL, MSFT, GOOG")
121
+ output_md = gr.Markdown()
122
+ output_df = gr.Dataframe(headers=["Portfolio", "Value"], interactive=False)
123
+
124
+ def run_and_display(tickers):
125
+ md, df = compute(tickers)
126
+ if df is None:
127
+ return md, None
128
+ return md, df
129
+
130
+ run_btn = gr.Button("Run Analysis")
131
+ run_btn.click(fn=run_and_display, inputs=tickers_input, outputs=[output_md, output_df])
132
 
133
  if __name__ == "__main__":
 
134
  demo.launch()