""" Sector Analyzer Get all important stocks for a sector and perform peer comparison analysis """ import yfinance as yf import pandas as pd import numpy as np from typing import List, Dict, Optional, Tuple from fundamental_analysis.calculator import calculate_metrics_for_ticker import warnings warnings.filterwarnings('ignore') # Predefined sector/industry stock lists # These can be expanded or replaced with dynamic fetching from financial APIs SECTOR_STOCKS = { 'Technology': { 'Mega Cap': ['AAPL', 'MSFT', 'GOOGL', 'META', 'NVDA', 'TSLA', 'AVGO', 'ORCL', 'ADBE', 'CRM'], 'Large Cap': ['AMD', 'INTC', 'QCOM', 'TXN', 'AMAT', 'ADI', 'LRCX', 'KLAC', 'SNPS', 'CDNS'], 'Mid Cap': ['SNOW', 'CRWD', 'NET', 'DDOG', 'ZS', 'OKTA', 'MDB', 'TEAM', 'WDAY', 'PANW'] }, 'Financial': { 'Banks': ['JPM', 'BAC', 'WFC', 'C', 'GS', 'MS', 'USB', 'PNC', 'TFC', 'SCHW'], 'Insurance': ['BRK-B', 'UNH', 'PGR', 'MET', 'PRU', 'AIG', 'ALL', 'TRV', 'AXP', 'CB'], 'Asset Management': ['BLK', 'BX', 'KKR', 'APO', 'TROW', 'IVZ', 'BEN', 'AMG'] }, 'Healthcare': { 'Pharma': ['JNJ', 'PFE', 'ABBV', 'MRK', 'LLY', 'TMO', 'ABT', 'BMY', 'AMGN', 'GILD'], 'Biotech': ['VRTX', 'REGN', 'BIIB', 'ILMN', 'MRNA', 'BNTX', 'ALNY', 'SGEN', 'INCY', 'EXAS'], 'Medical Devices': ['MDT', 'DHR', 'SYK', 'BSX', 'EW', 'ZBH', 'BAX', 'IDXX', 'ALGN', 'HOLX'] }, 'Consumer': { 'Retail': ['AMZN', 'WMT', 'HD', 'LOW', 'TGT', 'COST', 'TJX', 'ROST', 'DG', 'DLTR'], 'Consumer Goods': ['PG', 'KO', 'PEP', 'PM', 'MDLZ', 'CL', 'EL', 'KMB', 'GIS', 'K'], 'Restaurants': ['MCD', 'SBUX', 'CMG', 'YUM', 'QSR', 'DPZ', 'DRI', 'TXRH', 'WING', 'SHAK'] }, 'Industrial': { 'Aerospace': ['BA', 'RTX', 'LMT', 'NOC', 'GD', 'HON', 'GE', 'TDG', 'HWM', 'LDOS'], 'Manufacturing': ['CAT', 'DE', 'ETN', 'EMR', 'ITW', 'ROK', 'PH', 'IR', 'AME', 'XYL'], 'Transportation': ['UPS', 'FDX', 'UNP', 'NSC', 'CSX', 'DAL', 'UAL', 'AAL', 'LUV', 'JBHT'] }, 'Energy': { 'Oil & Gas': ['XOM', 'CVX', 'COP', 'EOG', 'SLB', 'MPC', 'PSX', 'VLO', 'OXY', 'HES'], 'Utilities': ['NEE', 'DUK', 'SO', 'D', 'AEP', 'EXC', 'SRE', 'XEL', 'WEC', 'ES'] }, 'Materials': ['LIN', 'APD', 'SHW', 'ECL', 'NEM', 'FCX', 'NUE', 'VMC', 'MLM', 'DOW'], 'Real Estate': ['PLD', 'AMT', 'CCI', 'EQIX', 'PSA', 'DLR', 'WELL', 'AVB', 'EQR', 'VICI'], 'Communication': ['GOOGL', 'META', 'NFLX', 'DIS', 'CMCSA', 'T', 'VZ', 'TMUS', 'CHTR', 'PARA'] } class SectorAnalyzer: """Analyze stocks within a sector for peer comparison""" def __init__(self, sector: str, subsector: Optional[str] = None): """ Initialize sector analyzer Args: sector: Main sector name (e.g., 'Technology', 'Healthcare') subsector: Optional subsector/industry (e.g., 'Banks', 'Pharma') """ self.sector = sector self.subsector = subsector self.tickers = [] self.metrics_data = {} def get_sector_tickers(self) -> List[str]: """ Get list of tickers for the sector/subsector Returns: List of ticker symbols """ if self.sector not in SECTOR_STOCKS: print(f"Warning: Sector '{self.sector}' not found in predefined list") print(f"Available sectors: {list(SECTOR_STOCKS.keys())}") return [] sector_data = SECTOR_STOCKS[self.sector] # If sector has subsectors (nested dict) if isinstance(sector_data, dict) and any(isinstance(v, list) for v in sector_data.values()): if self.subsector: if self.subsector in sector_data: self.tickers = sector_data[self.subsector] else: print(f"Warning: Subsector '{self.subsector}' not found") print(f"Available subsectors: {list(sector_data.keys())}") return [] else: # Flatten all subsectors self.tickers = [ticker for subsector_list in sector_data.values() for ticker in subsector_list] else: # Direct list of tickers self.tickers = sector_data print(f"Found {len(self.tickers)} tickers for {self.sector}" + (f" > {self.subsector}" if self.subsector else "")) return self.tickers def calculate_sector_metrics(self, tickers: Optional[List[str]] = None) -> pd.DataFrame: """ Calculate metrics for all stocks in the sector Args: tickers: Optional custom list of tickers (uses sector tickers if None) Returns: DataFrame with all stocks and their key metrics """ if tickers is None: tickers = self.tickers if self.tickers else self.get_sector_tickers() if not tickers: print("No tickers to analyze") return pd.DataFrame() print(f"\nCalculating metrics for {len(tickers)} stocks...") print("=" * 80) results = [] failed_tickers = [] for i, ticker in enumerate(tickers, 1): print(f"\n[{i}/{len(tickers)}] Processing {ticker}...") try: metrics_df, summary = calculate_metrics_for_ticker(ticker) if metrics_df.empty: print(f"✗ Failed to get data for {ticker}") failed_tickers.append(ticker) continue # Extract key metrics for comparison key_metrics = self._extract_key_metrics(ticker, metrics_df) results.append(key_metrics) # Store full metrics for later reference self.metrics_data[ticker] = metrics_df except Exception as e: print(f"✗ Error processing {ticker}: {str(e)}") failed_tickers.append(ticker) continue if not results: print("\n✗ No data collected for any tickers") return pd.DataFrame() # Create comparison DataFrame comparison_df = pd.DataFrame(results) comparison_df = comparison_df.set_index('Ticker') print("\n" + "=" * 80) print(f"✓ Successfully processed {len(results)}/{len(tickers)} stocks") if failed_tickers: print(f"✗ Failed: {', '.join(failed_tickers)}") return comparison_df def _extract_key_metrics(self, ticker: str, metrics_df: pd.DataFrame) -> Dict: """Extract key metrics for peer comparison""" def get_metric_value(metric_name: str) -> Optional[float]: """Helper to safely extract metric value""" row = metrics_df[metrics_df['Metric'] == metric_name] if not row.empty and row.iloc[0]['Status'] == 'Available': return row.iloc[0]['Value'] return None key_metrics = { 'Ticker': ticker, # Valuation 'Market_Cap': get_metric_value('Market Capitalization'), 'PE_Ratio': get_metric_value('P/E Ratio (TTM)'), 'PEG_Ratio': get_metric_value('PEG Ratio'), 'EV_EBITDA': get_metric_value('EV/EBITDA'), 'Price_FCF': get_metric_value('Price / FCF'), 'FCF_Yield_%': get_metric_value('FCF Yield (Enterprise) %'), 'Price_Book': get_metric_value('Price / Book'), # Profitability 'Gross_Margin_%': get_metric_value('Gross Margin %'), 'EBITDA_Margin_%': get_metric_value('EBITDA Margin %'), 'Net_Margin_%': get_metric_value('Net Margin %'), # Cash Flow 'Free_Cash_Flow': get_metric_value('Free Cash Flow'), 'Cash_Conversion': get_metric_value('Cash Conversion Ratio'), # Leverage 'Net_Debt_EBITDA': get_metric_value('Net Debt / EBITDA'), 'Debt_Equity': get_metric_value('Debt / Equity'), 'Current_Ratio': get_metric_value('Current Ratio'), # Returns 'ROE_%': get_metric_value('Return on Equity (ROE) %'), 'ROA_%': get_metric_value('Return on Assets (ROA) %'), 'ROIC_%': get_metric_value('Return on Invested Capital (ROIC) %'), # Growth 'Revenue_Growth_%': get_metric_value('Revenue Growth (YoY) %'), 'EPS_Growth_%': get_metric_value('EPS Growth (YoY) %'), # Capital Allocation 'Payout_Ratio_%': get_metric_value('Payout Ratio %'), 'Total_Payout_%': get_metric_value('Total Payout Ratio %'), } return key_metrics def get_peer_statistics(self, comparison_df: pd.DataFrame) -> pd.DataFrame: """ Calculate sector statistics (median, mean, percentiles) Args: comparison_df: DataFrame from calculate_sector_metrics Returns: DataFrame with sector statistics """ if comparison_df.empty: return pd.DataFrame() stats_df = pd.DataFrame({ 'Median': comparison_df.median(), 'Mean': comparison_df.mean(), 'Std_Dev': comparison_df.std(), 'Min': comparison_df.min(), 'Q1': comparison_df.quantile(0.25), 'Q3': comparison_df.quantile(0.75), 'Max': comparison_df.max(), 'Count': comparison_df.count() }) return stats_df def compare_stock_to_peers(self, ticker: str, comparison_df: pd.DataFrame) -> pd.DataFrame: """ Compare a specific stock to sector peers Args: ticker: Stock to compare comparison_df: Sector comparison data Returns: DataFrame showing stock vs sector statistics """ if ticker not in comparison_df.index: print(f"Ticker {ticker} not found in comparison data") return pd.DataFrame() stock_data = comparison_df.loc[ticker] sector_stats = self.get_peer_statistics(comparison_df) comparison = pd.DataFrame({ 'Stock_Value': stock_data, 'Sector_Median': sector_stats['Median'], 'Sector_Mean': sector_stats['Mean'], 'Percentile_Rank': comparison_df.rank(pct=True).loc[ticker] * 100, 'vs_Median': ((stock_data - sector_stats['Median']) / sector_stats['Median'] * 100) }) return comparison def rank_stocks(self, comparison_df: pd.DataFrame, metrics: Optional[List[str]] = None) -> pd.DataFrame: """ Rank stocks based on key metrics Args: comparison_df: Sector comparison data metrics: List of metrics to rank by (None = use default key metrics) Returns: DataFrame with rankings """ if comparison_df.empty: return pd.DataFrame() # Default key metrics for ranking (higher is better for most) if metrics is None: metrics = [ 'FCF_Yield_%', # Higher is better 'ROIC_%', # Higher is better 'ROE_%', # Higher is better 'Revenue_Growth_%', # Higher is better 'EPS_Growth_%', # Higher is better ] # Reverse ranking for these (lower is better) reverse_metrics = [ 'PE_Ratio', 'PEG_Ratio', 'EV_EBITDA', 'Net_Debt_EBITDA', 'Debt_Equity' ] # Calculate composite score scores = pd.DataFrame(index=comparison_df.index) for metric in metrics: if metric in comparison_df.columns: # Normalize and rank (higher is better) scores[f'{metric}_rank'] = comparison_df[metric].rank(pct=True, na_option='keep') # Calculate average rank across all metrics scores['Composite_Score'] = scores.mean(axis=1) scores['Rank'] = scores['Composite_Score'].rank(ascending=False, method='min') # Add key metrics for context result = scores[['Composite_Score', 'Rank']].copy() for metric in metrics: if metric in comparison_df.columns: result[metric] = comparison_df[metric] return result.sort_values('Rank') def analyze_sector(sector: str, subsector: Optional[str] = None, custom_tickers: Optional[List[str]] = None) -> Tuple[pd.DataFrame, pd.DataFrame, SectorAnalyzer]: """ Main function to analyze a sector Args: sector: Sector name subsector: Optional subsector name custom_tickers: Optional custom list of tickers Returns: Tuple of (comparison_df, sector_stats, analyzer_object) """ analyzer = SectorAnalyzer(sector, subsector) if custom_tickers: comparison_df = analyzer.calculate_sector_metrics(custom_tickers) else: analyzer.get_sector_tickers() comparison_df = analyzer.calculate_sector_metrics() sector_stats = analyzer.get_peer_statistics(comparison_df) return comparison_df, sector_stats, analyzer def list_available_sectors() -> None: """Print all available sectors and subsectors""" print("\nAVAILABLE SECTORS:") print("=" * 80) for sector, data in SECTOR_STOCKS.items(): if isinstance(data, dict) and any(isinstance(v, list) for v in data.values()): print(f"\n{sector}:") for subsector, tickers in data.items(): print(f" - {subsector} ({len(tickers)} stocks)") else: print(f"\n{sector}: {len(data)} stocks") if __name__ == "__main__": # Test with a sample sector print("SECTOR ANALYZER TEST") print("=" * 80) # Show available sectors list_available_sectors() # Test with a small subset of Technology stocks print("\n\nTesting with Technology > Mega Cap (first 3 stocks)...") print("=" * 80) test_tickers = ['AAPL', 'MSFT', 'GOOGL'] comparison_df, sector_stats, analyzer = analyze_sector('Technology', custom_tickers=test_tickers) if not comparison_df.empty: print("\n" + "=" * 80) print("COMPARISON DATA") print("=" * 80) # Show key valuation metrics valuation_cols = ['Market_Cap', 'PE_Ratio', 'PEG_Ratio', 'FCF_Yield_%', 'ROIC_%'] print("\nValuation Metrics:") print(comparison_df[valuation_cols].to_string()) # Show sector statistics print("\n" + "=" * 80) print("SECTOR STATISTICS") print("=" * 80) print(sector_stats.loc[valuation_cols].to_string()) # Rank stocks print("\n" + "=" * 80) print("STOCK RANKINGS") print("=" * 80) rankings = analyzer.rank_stocks(comparison_df) print(rankings.to_string()) # Compare AAPL to peers print("\n" + "=" * 80) print("AAPL vs PEERS") print("=" * 80) aapl_comparison = analyzer.compare_stock_to_peers('AAPL', comparison_df) print(aapl_comparison.loc[valuation_cols].to_string()) # Save results comparison_df.to_csv('sector_comparison.csv') sector_stats.to_csv('sector_statistics.csv') print("\n✓ Results saved to sector_comparison.csv and sector_statistics.csv")