diff options
Diffstat (limited to 'market.py')
-rwxr-xr-x | market.py | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/market.py b/market.py new file mode 100755 index 0000000..6ac95ee --- /dev/null +++ b/market.py | |||
@@ -0,0 +1,272 @@ | |||
1 | #!/usr/bin/python | ||
2 | import argparse | ||
3 | from collections import namedtuple | ||
4 | import requests | ||
5 | import os | ||
6 | import sys | ||
7 | import time | ||
8 | |||
9 | from textual.app import App, ComposeResult | ||
10 | from textual.widgets import DataTable | ||
11 | |||
12 | # Rapid API key must be set in the environment. | ||
13 | RAPIDAPI_KEY = os.environ.get('RAPIDAPI_KEY') | ||
14 | |||
15 | # The file where the application's state is persistent. | ||
16 | STATE_FILE = "state.txt" | ||
17 | |||
18 | # API endpoint. | ||
19 | Endpoint = namedtuple('Endpoint', ['url', 'headers', 'update_delay']) | ||
20 | |||
21 | # API endpoints. | ||
22 | ENDPOINTS = { | ||
23 | 'stock': Endpoint(url="https://realstonks.p.rapidapi.com/", headers={ | ||
24 | "X-RapidAPI-Key": f"{RAPIDAPI_KEY}", | ||
25 | "X-RapidAPI-Host": "realstonks.p.rapidapi.com" | ||
26 | }, update_delay=5 * 60), # 5 minutes | ||
27 | 'currency': Endpoint( | ||
28 | url="https://exchange-rate-api1.p.rapidapi.com/convert", headers={ | ||
29 | "X-RapidAPI-Key": f"{RAPIDAPI_KEY}", | ||
30 | "X-RapidAPI-Host": "realstonks.p.rapidapi.com" | ||
31 | }, update_delay=60 * 60) # 1 hour | ||
32 | } | ||
33 | |||
34 | # Application state. | ||
35 | State = namedtuple('State', ['stocks', 'exchanges']) | ||
36 | |||
37 | # Stock quote. | ||
38 | Stock = namedtuple('Stock', | ||
39 | ['sticker', 'price', 'change_point', 'change_percent']) | ||
40 | |||
41 | # Exchange rate. | ||
42 | Exchange = namedtuple('Exchange', ['source', 'target', 'rate']) | ||
43 | |||
44 | |||
45 | def get_stock(stickers: list[str]) -> list[Stock]: | ||
46 | """Query the stock prices for the given stickers. | ||
47 | |||
48 | The result may not have prices for all the input stickers if some of the | ||
49 | queries fail. This function attempts to get as many prices as possible such | ||
50 | that failure in a query does not preclude other stocks from being queried. | ||
51 | """ | ||
52 | # This API does not allow querying multiple stickers in a single request. | ||
53 | # Free tier: 100,000 requests/month. | ||
54 | # | ||
55 | # Make sure that a request failure does not preclude from getting other | ||
56 | # stocks. | ||
57 | # | ||
58 | # Example response: | ||
59 | # { | ||
60 | # "price": 466.4, | ||
61 | # "change_point": 7.4, | ||
62 | # "change_percentage": 1.61, | ||
63 | # "total_vol": "11.29M" | ||
64 | # } | ||
65 | stocks = [] | ||
66 | for sticker in stickers: | ||
67 | try: | ||
68 | endpoint = ENDPOINTS['stock'] | ||
69 | response = requests.get(f"{endpoint.url}{sticker}", | ||
70 | headers=endpoint.headers).json() | ||
71 | stocks.append( | ||
72 | Stock(sticker, float(response['price']), | ||
73 | float(response['change_point']), | ||
74 | float(response['change_percentage']))) | ||
75 | except Exception as e: | ||
76 | print(e) | ||
77 | return stocks | ||
78 | |||
79 | |||
80 | def get_exchange_rate(source: str, target: str) -> float: | ||
81 | """Get the exchange rate between two currencies. Return 0 on failure.""" | ||
82 | # Free tier: | ||
83 | # | ||
84 | # Example response: | ||
85 | # { | ||
86 | # "code": "0", | ||
87 | # "msg": "success", | ||
88 | # "convert_result": { | ||
89 | # "base": "USD", | ||
90 | # "target": "EUR", | ||
91 | # "rate": 0.9063 | ||
92 | # }, | ||
93 | # "time_update": { | ||
94 | # "time_unix": 1690556940, | ||
95 | # "time_utc": "2023-07-28T08:09:00Z", | ||
96 | # "time_zone": "America/Los_Angeles" | ||
97 | # } | ||
98 | # } | ||
99 | try: | ||
100 | query = {"base": source, "target": target} | ||
101 | endpoint = ENDPOINTS['currency'] | ||
102 | response = requests.get(endpoint.url, headers=endpoint.headers, | ||
103 | params=query).json() | ||
104 | return float(response['convert_result']['rate']) | ||
105 | except Exception as e: | ||
106 | print(e) | ||
107 | return 0.0 | ||
108 | |||
109 | |||
110 | def update_stocks(state: State) -> State: | ||
111 | stickers = [stock.sticker for stock in state.stocks] | ||
112 | updated_stocks = get_stock(stickers) | ||
113 | # Note that updated_stocks may not have all the stocks in the input. | ||
114 | updated_stocks_stickers = [stock.sticker for stock in updated_stocks] | ||
115 | missing_stocks = [stock for stock in state.stocks if | ||
116 | stock.sticker not in updated_stocks_stickers] | ||
117 | stocks = updated_stocks + missing_stocks | ||
118 | return State(stocks, state.exchanges) | ||
119 | |||
120 | |||
121 | def update_exchanges(state: State) -> State: | ||
122 | exchanges = [] | ||
123 | for exchange in state.exchanges: | ||
124 | rate = get_exchange_rate(exchange.source, exchange.target) | ||
125 | if rate != 0: | ||
126 | exchanges.append(Exchange(exchange.source, exchange.target, rate)) | ||
127 | else: | ||
128 | exchanges.append(exchange) | ||
129 | return State(state.stocks, exchanges) | ||
130 | |||
131 | |||
132 | def format_delta(stock: Stock, percent: bool = False) -> str: | ||
133 | sign = "+" if stock.change_point >= 0 else "-" | ||
134 | change = f"{sign}{abs(stock.change_point)}{'%' if percent else ''}" | ||
135 | return change | ||
136 | |||
137 | |||
138 | def format_exchange_name(exchange: Exchange) -> str: | ||
139 | return f"{exchange.source}/{exchange.target}" | ||
140 | |||
141 | |||
142 | def load_state(filepath: str) -> State: | ||
143 | stocks = [] | ||
144 | exchanges = [] | ||
145 | |||
146 | lines = [] | ||
147 | with open(filepath, 'r') as file: | ||
148 | lines = file.readlines() | ||
149 | |||
150 | for line in lines: | ||
151 | values = line.split(' ') | ||
152 | key = values[0] | ||
153 | if '/' in key: | ||
154 | source, target = key.split('/') | ||
155 | rate = float(values[1]) | ||
156 | exchanges.append(Exchange(source, target, rate)) | ||
157 | else: | ||
158 | sticker = key | ||
159 | price = float(values[1]) | ||
160 | change_point = float(values[2]) | ||
161 | change_percent = float(values[3]) | ||
162 | stocks.append( | ||
163 | Stock(sticker, price, change_point, change_percent)) | ||
164 | |||
165 | return State(stocks, exchanges) | ||
166 | |||
167 | |||
168 | def save_state(state: State, filepath: str): | ||
169 | with open(filepath, 'w') as file: | ||
170 | for stock in state.stocks: | ||
171 | values = [str(x) for x in list(stock)] | ||
172 | file.write(f"{' '.join(values)}\n") | ||
173 | |||
174 | for exchange in state.exchanges: | ||
175 | file.write(f"{format_exchange_name(exchange)} {exchange.rate}\n") | ||
176 | |||
177 | |||
178 | class Updater: | ||
179 | def __init__(self, update, delay): | ||
180 | self.update = update | ||
181 | self.delay = delay | ||
182 | self.last_update_time = 0 | ||
183 | |||
184 | |||
185 | def update_stub(msg: str, state: State) -> State: | ||
186 | print(msg) | ||
187 | return state | ||
188 | |||
189 | |||
190 | def make_updaters(use_stubs: bool) -> list[Updater]: | ||
191 | updaters = [] | ||
192 | if use_stubs: | ||
193 | updaters = [ | ||
194 | Updater(lambda s: update_stub("Update stocks", s), 1), | ||
195 | Updater(lambda s: update_stub("Update exchange", s), 5) | ||
196 | ] | ||
197 | else: | ||
198 | updaters = [ | ||
199 | Updater(update_stocks, ENDPOINTS['stock'].update_delay), | ||
200 | Updater(update_exchanges, ENDPOINTS['currency'].update_delay) | ||
201 | ] | ||
202 | return updaters | ||
203 | |||
204 | |||
205 | def update_state(t: float, updaters: list[Updater], state: State) -> State: | ||
206 | for updater in updaters: | ||
207 | if t - updater.last_update_time >= updater.delay: | ||
208 | state = updater.update(state) | ||
209 | updater.last_update_time = t | ||
210 | return state | ||
211 | |||
212 | |||
213 | class MarketApp(App): | ||
214 | TITLE = "Market Watch" | ||
215 | |||
216 | def __init__(self, updaters: list[Updater]): | ||
217 | super().__init__() | ||
218 | self.state = None | ||
219 | self.table = None | ||
220 | self.updaters = updaters | ||
221 | self.min_update_delay = min([updater.delay for updater in updaters]) | ||
222 | |||
223 | def render(self): | ||
224 | assert self.state is not None | ||
225 | assert self.table is not None | ||
226 | |||
227 | # Stock/ex | Price | Change | ||
228 | # xyz | xxx | xxx | ||
229 | # usd/eur | xxx | <empty> | ||
230 | table = self.table | ||
231 | table.clear(columns=True) | ||
232 | table.add_columns("Stock", "Price($)", "Change($)", "%") | ||
233 | for stock in self.state.stocks: | ||
234 | table.add_row(stock.sticker, stock.price, format_delta(stock), | ||
235 | format_delta(stock, percent=True)) | ||
236 | for exchange in self.state.exchanges: | ||
237 | table.add_row(format_exchange_name(exchange), exchange.rate, "", "") | ||
238 | |||
239 | def compose(self) -> ComposeResult: | ||
240 | self.state = load_state(STATE_FILE) | ||
241 | |||
242 | table = DataTable() | ||
243 | table.show_cursor = False | ||
244 | self.table = table | ||
245 | yield table | ||
246 | |||
247 | self.render() | ||
248 | |||
249 | self.update() | ||
250 | self.set_interval(self.min_update_delay, self.update) | ||
251 | |||
252 | def update(self) -> None: | ||
253 | t = time.time() | ||
254 | self.state = update_state(t, self.updaters, self.state) | ||
255 | self.render() | ||
256 | save_state(self.state, STATE_FILE) | ||
257 | |||
258 | |||
259 | def main(): | ||
260 | parser = argparse.ArgumentParser() | ||
261 | parser.add_argument("--stub", action='store_true', | ||
262 | help="Use stub update functions") | ||
263 | args = parser.parse_args() | ||
264 | |||
265 | updaters = make_updaters(args.stub) | ||
266 | |||
267 | app = MarketApp(updaters) | ||
268 | app.run() | ||
269 | |||
270 | |||
271 | if __name__ == '__main__': | ||
272 | sys.exit(main()) | ||