File size: 7,291 Bytes
5f9a0c8
327c8e7
 
5f9a0c8
 
175f732
327c8e7
5f9a0c8
b988502
cee6a7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f9a0c8
327c8e7
 
 
 
5f9a0c8
 
 
 
 
 
 
 
b988502
 
5f9a0c8
b988502
cee6a7f
5f9a0c8
 
 
327c8e7
cee6a7f
5f9a0c8
 
 
 
 
cee6a7f
 
 
 
 
 
 
 
327c8e7
5f9a0c8
 
 
b988502
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f9a0c8
b988502
 
5f9a0c8
 
 
cee6a7f
b988502
cee6a7f
 
 
 
 
b988502
5f9a0c8
b988502
 
 
 
5f9a0c8
b988502
327c8e7
 
cee6a7f
5f9a0c8
 
327c8e7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import gradio as gr
from gradio.components import File as InputFile, Number as InputNumber, Checkbox as InputCheckbox
from gradio.components import File as OutputFile
import yfinance as yf
import pandas as pd
import pandas_ta as ta
from scipy.signal import find_peaks
import csv
import os
from scipy.signal import argrelextrema
from collections import deque
import numpy as np

def getHigherLows(data: np.array, order=5, K=2):  
  # Get lows
  low_idx = argrelextrema(data, np.less, order=order)[0]
  lows = data[low_idx]
  # Ensure consecutive lows are higher than previous lows
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(low_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if lows[i] < lows[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def getLowerHighs(data: np.array, order=5, K=2):
  # Get highs
  high_idx = argrelextrema(data, np.greater, order=order)[0]
  highs = data[high_idx]
  # Ensure consecutive highs are lower than previous highs
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(high_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if highs[i] > highs[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def getHigherHighs(data: np.array, order=5, K=2):
  # Get highs
  high_idx = argrelextrema(data, np.greater, order=5)[0]
  highs = data[high_idx]
  # Ensure consecutive highs are higher than previous highs
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(high_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if highs[i] < highs[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema

def getLowerLows(data: np.array, order=5, K=2):
  # Get lows
  low_idx = argrelextrema(data, np.less, order=order)[0]
  lows = data[low_idx]
  # Ensure consecutive lows are lower than previous lows
  extrema = []
  ex_deque = deque(maxlen=K)
  for i, idx in enumerate(low_idx):
    if i == 0:
      ex_deque.append(idx)
      continue
    if lows[i] > lows[i-1]:
      ex_deque.clear()

    ex_deque.append(idx)
    if len(ex_deque) == K:
      extrema.append(ex_deque.copy())

  return extrema


def check_trend(hist):
    
    close = hist['Close'].values
    order = 5
    K = 2

    hh = getHigherHighs(close, order, K)
    hl = getHigherLows(close, order, K)
    ll = getLowerLows(close, order, K)
    lh = getLowerHighs(close, order, K)

    # Get the most recent top and bottom labels
    top_labels = hh if len(hh) > 0 and hh[-1][-1] > (lh[-1][-1] if len(lh) > 0 else 0) else lh
    bottom_labels = hl if len(hl) > 0 and hl[-1][-1] > (ll[-1][-1] if len(ll) > 0 else 0) else ll

    # Check if the most recent top and bottom labels form a pair
    if top_labels == hh and bottom_labels == hl:
        return True, 'bullish'
    elif top_labels == lh and bottom_labels == ll:
        return True, 'bearish'
    else:
        return False, 'uncertain'



def process_csv(csv_file, lookback, bullish_stoch_value, bearish_stoch_value, check_bullish_swing, check_bearish_swing):

    lookback = int(lookback)

    all_tickers = []
    with open(csv_file.name, 'r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip the header
        for row in reader:
            all_tickers.append(row[0])  # Append the value in the first column

    ttl_tickers = len(all_tickers)
    bullish_tickers = []
    bearish_tickers = []
    for ticker in all_tickers:
        print(f"Processing {ticker} ({all_tickers.index(ticker)+1}/{ttl_tickers})")
        
        try:
            data = yf.Ticker(ticker)
            hist = data.history(period="1y", actions=False)
            if not hist.empty:
                
                hist.ta.ema(close='Close', length=20, append=True)
                hist.ta.ema(close='Close', length=50, append=True)
                hist.ta.ema(close='Close', length=100, append=True)
                hist.ta.sma(close='Close', length=150, append=True)
                stoch = hist.ta.stoch(high='High', low='Low', close='Close')

                trend_exists, trend_type = check_trend(hist)

            if all(hist['EMA_20'][-lookback:] > hist['EMA_50'][-lookback:]) and all(hist['EMA_50'][-lookback:] > hist['EMA_100'][-lookback:]) and all(hist['EMA_100'][-lookback:] > hist['SMA_150'][-lookback:]) and stoch['STOCHk_14_3_3'][-1] <= bullish_stoch_value and trend_exists and trend_type == 'bullish':
                bullish_tickers.append([ticker, hist['Close'][-1], hist['Volume'][-1], 'Bullish'])
            elif all(hist['EMA_20'][-lookback:] < hist['EMA_50'][-lookback:]) and all(hist['EMA_50'][-lookback:] < hist['EMA_100'][-lookback:]) and all(hist['EMA_100'][-lookback:] < hist['SMA_150'][-lookback:]) and stoch['STOCHk_14_3_3'][-1] >= bearish_stoch_value and trend_exists and trend_type == 'bearish':
                bearish_tickers.append([ticker, hist['Close'][-1], hist['Volume'][-1], 'Bearish'])


        except Exception as e:
            print(f"An error occurred with ticker {ticker}: {e}")

    df_bullish = pd.DataFrame(bullish_tickers, columns=['Ticker', 'Close', 'Volume', 'Trend'])
    df_bearish = pd.DataFrame(bearish_tickers, columns=['Ticker', 'Close', 'Volume', 'Trend'])

    base_filename = os.path.splitext(os.path.basename(csv_file.name))[0]
    output_bullish_xlsx = f'{base_filename}-bullish.xlsx'
    output_bearish_xlsx = f'{base_filename}-bearish.xlsx'
    df_bullish.to_excel(output_bullish_xlsx, index=False)
    df_bearish.to_excel(output_bearish_xlsx, index=False)

    output_bullish_txt = f'{base_filename}-bullish.txt'
    output_bearish_txt = f'{base_filename}-bearish.txt'
    with open(output_bullish_txt, 'w') as f:
        for ticker in df_bullish['Ticker']:
            f.write(f"{ticker}\n")
    with open(output_bearish_txt, 'w') as f:
        for ticker in df_bearish['Ticker']:
            f.write(f"{ticker}\n")

    return output_bullish_xlsx, output_bearish_xlsx, output_bullish_txt, output_bearish_txt

iface = gr.Interface(
    fn=process_csv, 
    inputs=[        
        InputFile(label="Upload CSV"), 
        InputNumber(label="EMA Loockback period (days, min 5 and max 60)", value=20),
        InputNumber(label="Bullish Stochastic Value (equal or below)", value=30),
        InputNumber(label="Bearish Stochastic Value (equal or above)", value=70),
        InputCheckbox(label="Check Bullish Swing (HH + HL) (slower, in beta)"),
        InputCheckbox(label="Check Bearish Swing (LH + LL) (slower, in beta)")
    ], 
    outputs=[
        OutputFile(label="Download Bullish XLSX"),
        OutputFile(label="Download Bearish XLSX"),
        OutputFile(label="Download Bullish TXT"),
        OutputFile(label="Download Bearish TXT")
    ],
    title="Stock Analysis",
    description="""Upload a CSV file with a column named 'Ticker' (from TradingView or other) and
                    get a filtered shortlist of bullish and bearish stocks in return.
                    The tool will find 'stacked' EMAs + SMAs and check for Stochastics above or below the set values."""
)

iface.launch()