initial version. downloads historical market data. creates a graph from peak to trough.

This commit is contained in:
2026-05-15 12:54:44 -04:00
commit e234bec1fc
15 changed files with 43598 additions and 0 deletions
View File
+166
View File
@@ -0,0 +1,166 @@
from __future__ import annotations
import warnings
from pathlib import Path
import pandas as pd
import yfinance as yf
from dotenv import load_dotenv
load_dotenv()
warnings.filterwarnings("ignore", category=FutureWarning)
INDICES = {
"^GSPC": "S&P_500",
"^IXIC": "NASDAQ",
"^DJI": "DOW_JONES",
"^RUT": "RUSSELL_2000",
}
FILL_TICKERS = {
"DOW_JONES": "DJI",
}
START_DATE = "1986-01-01"
END_DATE = "2026-05-15"
def download_raw_prices(output_dir: Path) -> dict[str, pd.DataFrame]:
raw_dir = output_dir / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)
all_data: dict[str, pd.DataFrame] = {}
for ticker, name in INDICES.items():
print(f"Downloading {name} ({ticker}) from Yahoo...")
df = yf.download(ticker, start=START_DATE, end=END_DATE, auto_adjust=False, progress=False)
if df.empty:
print(f" WARNING: No data returned for {ticker}")
continue
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df = df[["Adj Close"]].copy()
df.columns = ["adj_close"]
df.index = pd.to_datetime(df.index)
if name in FILL_TICKERS and not df.empty:
df = _fill_with_alternate_ticker(df, name, raw_dir)
else:
filepath = raw_dir / f"{name}.csv"
df.to_csv(filepath)
print(f" Saved {len(df)} rows ({df.index.min().date()} to {df.index.max().date()})")
all_data[name] = df
return all_data
def _fill_with_alternate_ticker(
yahoo_df: pd.DataFrame,
name: str,
raw_dir: Path,
) -> pd.DataFrame:
fill_ticker = FILL_TICKERS[name]
yahoo_start = yahoo_df.index.min()
print(f" Fetching {fill_ticker} from Yahoo to fill gap before {yahoo_start.date()}...")
fill_df = yf.download(fill_ticker, start=START_DATE, end=yahoo_start.strftime("%Y-%m-%d"), progress=False)
if isinstance(fill_df.columns, pd.MultiIndex):
fill_df.columns = fill_df.columns.get_level_values(0)
if fill_df.empty:
print(f" No data from {fill_ticker} for the gap period")
filepath = raw_dir / f"{name}.csv"
yahoo_df.to_csv(filepath)
return yahoo_df
close_col = "Close" if "Close" in fill_df.columns else "Adj Close"
fill_df = fill_df[[close_col]].copy()
fill_df.columns = ["raw_close"]
fill_df.index = pd.to_datetime(fill_df.index)
fill_df = fill_df.dropna()
overlap_start = fill_df.index.max()
if overlap_start < yahoo_start:
fill_df = fill_df[fill_df.index < yahoo_start]
merged_overlap = yahoo_df.join(fill_df, how="inner")
if not merged_overlap.empty:
ratio = (merged_overlap["adj_close"] / merged_overlap["raw_close"]).median()
print(f" Alignment ratio (adj_close/raw_close): {ratio:.6f}")
else:
ratio = 1.0
fill_df["adj_close"] = fill_df["raw_close"] * ratio
fill_df = fill_df[fill_df.index < yahoo_start]
combined = pd.concat([fill_df[["adj_close"]], yahoo_df])
combined = combined[~combined.index.duplicated(keep="last")]
combined = combined.sort_index()
print(f" Filled {len(fill_df)} rows for {name} (new start: {combined.index.min().date()})")
filepath = raw_dir / f"{name}.csv"
combined.to_csv(filepath)
return combined
def compute_monthly_returns(all_data: dict[str, pd.DataFrame]) -> pd.DataFrame:
frames = []
for name, df in all_data.items():
monthly = df["adj_close"].resample("ME").last()
monthly_returns = monthly.pct_change() * 100
monthly_returns = monthly_returns.dropna().reset_index()
monthly_returns.columns = ["date", "monthly_return"]
monthly_returns.insert(0, "index", name)
frames.append(monthly_returns)
combined = pd.concat(frames, ignore_index=True)
combined["date"] = combined["date"].dt.strftime("%Y-%m")
return combined
def compute_annual_returns(all_data: dict[str, pd.DataFrame]) -> pd.DataFrame:
frames = []
for name, df in all_data.items():
annual = df["adj_close"].resample("YE").last()
annual_returns = annual.pct_change() * 100
annual_returns = annual_returns.dropna().reset_index()
annual_returns.columns = ["date", "annual_return"]
annual_returns["year"] = annual_returns["date"].dt.year
annual_returns.insert(0, "index", name)
frames.append(annual_returns[["index", "year", "annual_return"]])
combined = pd.concat(frames, ignore_index=True)
return combined
def validate_coverage(all_data: dict[str, pd.DataFrame]) -> None:
print("\n--- Data Coverage Summary ---")
for name, df in all_data.items():
first_date = df.index.min()
last_date = df.index.max()
span_years = (last_date - first_date).days / 365.25
all_dates = pd.date_range(start=first_date, end=last_date, freq="B")
missing = all_dates.difference(df.index)
max_gap_days = 0
if len(missing) > 0:
gaps = missing.to_series().diff().dt.days.dropna()
if len(gaps) > 0:
max_gap_days = int(gaps.max())
print(f" {name}: {first_date.date()} to {last_date.date()} ({span_years:.1f} years)")
print(f" Missing trading days: {len(missing)}, max gap: {max_gap_days} days")
if span_years < 35:
print(f" WARNING: Less than 35 years of data!")
if max_gap_days > 5:
print(f" WARNING: Gap of {max_gap_days} consecutive trading days detected!")
print()