Python API
After running ICCS and MCCC alignment across many events, the accumulated
quality metrics span stations and events in ways that are natural to analyse
with pandas and matplotlib but impossible from the CLI or TUI. The Python API
is the primary interface for that kind of post-processing quality analysis: you
query AimbatSeismogram, AimbatStation, and AimbatEvent records directly,
build DataFrames, and apply whatever aggregation or visualisation you need.
The same API also drives the CLI and TUI internally, so it covers the full workflow — data ingestion, parameter management, alignment, snapshots — not just quality analysis. See the full API reference for a complete listing.
Writing seismogram data
AimbatSeismogram.data is backed by
an IO cache. Assigning a new array replaces the cached value and, for data
types that support it, writes through to the source file on disk. In-place
mutation of the returned array does not persist — each access returns the
cached value, so changes to the array itself are silently discarded:
seis.data = my_modified_array # replaces cache (and writes to disk if supported)
arr = seis.data
arr[0] = 42 # raises ValueError — the cached array is read-only
AIMBAT does not write seismogram data in normal usage. All processing results are stored as parameters in the database; source files are treated as read-only.
Core Concepts
The API is built on three main components:
- Models: SQLModel classes that represent
the database schema (
aimbat.models) as Python objects. - Core Functions: High-level operations that manipulate those models
(
aimbat.core). - Database Session: A SQLAlchemy session used to track changes and interact with the project database.
Project Location
By default AIMBAT reads and writes aimbat.db in the current directory. Set
AIMBAT_PROJECT to use a different path:
The aimbat.db.engine singleton picks this up automatically, so scripts that
import it will use the same database as the CLI.
Session Management
Every database operation requires a Session. Use it as a context manager so
it is always closed cleanly:
from sqlmodel import Session
from aimbat.db import engine
with Session(engine) as session:
# query or modify data here
pass
Changes accumulate in the session and are written to disk only when
session.commit() is called (or when you call a core function that commits
internally). If an exception is raised before committing, the session is rolled
back automatically.
Creating a Project
This is a one-time operation that creates the schema and the SQLite triggers
that enforce database constraints and track modification times.
It raises RuntimeError if the schema already exists.
Adding Data
The central function is add_data_to_project:
from sqlmodel import Session
from aimbat.db import engine
from aimbat.core import add_data_to_project
from aimbat.io import DataType
with Session(engine) as session:
add_data_to_project(session, paths, DataType.SAC)
The DataType enum controls what is read from each source:
DataType |
What is created |
|---|---|
SAC |
Event + Station + Seismogram |
JSON_EVENT |
Event only (no seismogram) |
JSON_STATION |
Station only (no seismogram) |
JSON formats
Event (DataType.JSON_EVENT):
Station (DataType.JSON_STATION):
{
"name": "ANMO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 34.946,
"longitude": -106.457,
"elevation": 1820.0
}
Providing event or station metadata externally
SAC files from some sources omit event or station headers. In that case, add
the metadata separately first and then link the SAC files to the resulting
database records using event_id and station_id:
with Session(engine) as session:
add_data_to_project(session, [event_json], DataType.JSON_EVENT)
add_data_to_project(session, [station_json], DataType.JSON_STATION)
event = session.exec(select(AimbatEvent)).one()
station = session.exec(select(AimbatStation)).one()
add_data_to_project(
session,
sac_files,
DataType.SAC,
event_id=event.id,
station_id=station.id,
)
Quality Analysis
After alignment has been run across a set of events, each seismogram carries quality metrics that can be queried directly from the database. The sections below show the most common patterns.
Quality data model
Per-seismogram metrics are stored in AimbatSeismogramQuality and accessed via
seismogram.quality:
| Attribute | Description |
|---|---|
iccs_cc |
ICCS cross-correlation with the stack |
mccc_cc_mean |
MCCC waveform quality — mean CC across seismogram pairs |
mccc_cc_std |
MCCC waveform consistency — std of CC across pairs |
mccc_error |
MCCC timing precision (pd.Timedelta, SEM from covariance matrix) |
The per-event MCCC global array fit is stored in AimbatEventQuality and
accessed via event.quality:
| Attribute | Description |
|---|---|
mccc_rmse |
Global array fit (pd.Timedelta) |
Build a per-seismogram DataFrame across all events
The most flexible starting point is a flat DataFrame with one row per seismogram:
from sqlalchemy.orm import selectinload
from sqlmodel import Session, select
import pandas as pd
from aimbat.db import engine
from aimbat.models import AimbatSeismogram
from aimbat.utils import rel
with Session(engine) as session:
seismograms = session.exec(
select(AimbatSeismogram).options(
selectinload(rel(AimbatSeismogram.station)),
selectinload(rel(AimbatSeismogram.event)),
selectinload(rel(AimbatSeismogram.quality)),
)
).all()
rows = []
for seis in seismograms:
q = seis.quality
rows.append({
"station": f"{seis.station.network}.{seis.station.name}",
"event_time": seis.event.time,
"iccs_cc": q.iccs_cc if q else None,
"mccc_cc_mean": q.mccc_cc_mean if q else None,
"mccc_error_s": q.mccc_error.total_seconds() if (q and q.mccc_error) else None,
})
df = pd.DataFrame(rows)
From here you can groupby station, pivot on event, filter by quality threshold, or feed the result directly into matplotlib.
Station-level quality summary
SeismogramQualityStats.from_station aggregates all per-seismogram metrics
across every event recorded at a station:
from aimbat.models import AimbatSeismogram, AimbatStation, SeismogramQualityStats
with Session(engine) as session:
stations = session.exec(
select(AimbatStation).options(
selectinload(rel(AimbatStation.seismograms)).selectinload(
rel(AimbatSeismogram.quality)
)
)
).all()
stats = [SeismogramQualityStats.from_station(s) for s in stations]
Each stats item exposes cc_mean, mccc_cc_mean, and mccc_error as
(mean, SEM) pairs aggregated across all events at that station.
Event-level quality summary
SeismogramQualityStats.from_event aggregates per-seismogram metrics for a
single event and also carries the global mccc_rmse array-fit value:
from aimbat.models import AimbatEvent, AimbatSeismogram, SeismogramQualityStats
with Session(engine) as session:
events = session.exec(
select(AimbatEvent).options(
selectinload(rel(AimbatEvent.seismograms)).selectinload(
rel(AimbatSeismogram.quality)
),
selectinload(rel(AimbatEvent.quality)),
)
).all()
stats = [SeismogramQualityStats.from_event(e) for e in events]
mccc_rmse on each stats object is the global array fit for that event —
useful for comparing event difficulty across a dataset.
Worked Example
The script below builds a complete project from scratch. It loads 3 events, 10 stations, and 20 seismograms where the SAC files carry waveform data but no event or station headers — all metadata is provided via JSON — and takes an initial snapshot of each event before any processing.
"""
Load a project from SAC files that carry no event/station headers.
Layout:
- 3 events
- 10 broadband stations
- 20 seismograms: events 1 and 2 recorded at 7 stations each,
event 3 recorded at 6 stations
"""
import json
from pathlib import Path
from typing import Any
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.core import (
add_data_to_project,
create_project,
create_snapshot,
)
from aimbat.io import DataType
from aimbat.models import AimbatEvent, AimbatStation
# ------------------------------------------------------------------ #
# Metadata #
# ------------------------------------------------------------------ #
EVENTS: list[dict[str, Any]] = [
{
"time": "2024-01-12T08:14:33Z",
"latitude": 37.52,
"longitude": 143.04,
"depth": 35.0,
},
{
"time": "2024-02-28T21:07:55Z",
"latitude": -23.11,
"longitude": -67.89,
"depth": 120.0,
},
{
"time": "2024-03-09T03:51:20Z",
"latitude": 51.72,
"longitude": 178.35,
"depth": 55.0,
},
]
STATIONS: list[dict[str, Any]] = [
{
"name": "ANMO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 34.946,
"longitude": -106.457,
"elevation": 1820.0,
},
{
"name": "COLA",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 64.874,
"longitude": -147.861,
"elevation": 84.0,
},
{
"name": "GUMO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 13.589,
"longitude": 144.868,
"elevation": 74.0,
},
{
"name": "HRV",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 42.506,
"longitude": -71.558,
"elevation": 200.0,
},
{
"name": "MAJO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 36.536,
"longitude": 138.204,
"elevation": 399.0,
},
{
"name": "MIDW",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 28.216,
"longitude": -177.370,
"elevation": 150.0,
},
{
"name": "POHA",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 19.757,
"longitude": -155.531,
"elevation": 1936.0,
},
{
"name": "SSPA",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 40.636,
"longitude": -77.888,
"elevation": 270.0,
},
{
"name": "TATO",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 24.975,
"longitude": 121.498,
"elevation": 75.0,
},
{
"name": "YSS",
"network": "IU",
"location": "00",
"channel": "BHZ",
"latitude": 46.958,
"longitude": 142.760,
"elevation": 89.0,
},
]
# Which stations recorded each event (indices into STATIONS list).
# 7 + 7 + 6 = 20 seismograms total.
EVENT_STATION_MAP = {
0: [0, 1, 2, 3, 4, 5, 6], # event 1 — 7 seismograms
1: [0, 1, 2, 3, 4, 5, 6], # event 2 — 7 seismograms
2: [0, 1, 2, 3, 4, 5], # event 3 — 6 seismograms
}
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def write_json(data: dict, path: Path) -> Path:
path.write_text(json.dumps(data))
return path
def sac_path(event_idx: int, station_idx: int) -> Path:
"""Return the path to the SAC file for a given event/station pair."""
return Path(f"data/ev{event_idx + 1:02d}_st{station_idx + 1:02d}.sac")
# ------------------------------------------------------------------ #
# Main #
# ------------------------------------------------------------------ #
workdir = Path("json_metadata")
workdir.mkdir(exist_ok=True)
# 1. Initialise project
create_project(engine)
with Session(engine) as session:
# 2. Register events from JSON
event_files = [
write_json(ev, workdir / f"event_{i:02d}.json") for i, ev in enumerate(EVENTS)
]
add_data_to_project(session, event_files, DataType.JSON_EVENT)
# 3. Register stations from JSON
station_files = [
write_json(st, workdir / f"station_{i:02d}.json")
for i, st in enumerate(STATIONS)
]
add_data_to_project(session, station_files, DataType.JSON_STATION)
# 4. Retrieve the newly created records
events = session.exec(select(AimbatEvent)).all()
stations = session.exec(select(AimbatStation)).all()
# Build lookup maps by (time, network+name) so insertion order doesn't matter
event_by_time = {str(e.time)[:19]: e for e in events}
station_by_key = {(s.network, s.name): s for s in stations}
# 5. Add SAC files, linking each to its pre-registered event and station
for ev_idx, st_indices in EVENT_STATION_MAP.items():
ev_time = EVENTS[ev_idx]["time"][:19]
db_event = event_by_time[ev_time]
for st_idx in st_indices:
st_meta = STATIONS[st_idx]
db_station = station_by_key[(st_meta["network"], st_meta["name"])]
add_data_to_project(
session,
[sac_path(ev_idx, st_idx)],
DataType.SAC,
event_id=db_event.id,
station_id=db_station.id,
)
# 6. Snapshot the initial state before any processing
events = session.exec(select(AimbatEvent)).all()
for event in events:
create_snapshot(session, event, comment="initial import")
print("Project ready.")
print(f" Events: {len(events)}")
print(f" Stations: {len(stations)}")
Querying the Database
Models can be queried directly using SQLModel's select:
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.models import AimbatEvent, AimbatSeismogram
with Session(engine) as session:
events = session.exec(select(AimbatEvent)).all()
for event in events:
print(f"{event.time} {len(event.seismograms)} seismograms")
# Filter — seismograms marked as selected
selected = session.exec(
select(AimbatSeismogram).where(
AimbatSeismogram.parameters.has(select=True) # type: ignore[attr-defined]
)
).all()
Deduplicating Events
add_data_to_project deduplicates stations automatically by SEED code
(network, name, location, channel), so importing the same station from
multiple sources never creates duplicate records.
Events are a different story: they are deduplicated by exact origin time.
When two data sources report the same earthquake with times that differ by a
second or two, they are stored as separate AimbatEvent records. The script
below detects such near-duplicates, merges their seismograms into the record
with the most data, averages the location and depth, and removes the extras.
"""
Deduplicate events that were imported from sources reporting slightly different
origin times for the same earthquake.
Background
----------
``add_data_to_project`` deduplicates stations by SEED code
``(network, name, location, channel)`` — so importing the same station twice,
even with different coordinates, always reuses the existing record. Station
duplicates therefore cannot arise through the normal import path.
Events are deduplicated by exact origin time. When two data sources report
the same earthquake with origin times that differ by a second or two, they are
stored as *separate* ``AimbatEvent`` records. This script finds such
near-duplicate events, merges their seismograms into the canonical record
(the one with the most seismograms), averages the location and depth, then
removes the duplicates.
Run this script *before* starting any processing, and take a snapshot
afterwards so the clean state is recoverable.
"""
from pandas import Timedelta
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.models import AimbatEvent
# Merge events whose origin times differ by less than this value.
TIME_TOLERANCE = Timedelta(seconds=10)
def _mean(values: list[float]) -> float:
return sum(values) / len(values)
def _mean_opt(values: list[float | None]) -> float | None:
clean = [v for v in values if v is not None]
return sum(clean) / len(clean) if clean else None
def deduplicate_events(session: Session, tolerance: Timedelta = TIME_TOLERANCE) -> int:
"""Merge event records whose origin times are within *tolerance*.
Events are sorted by time and clustered greedily: a new cluster begins
whenever the gap to the previous event exceeds *tolerance*.
For each cluster the record with the most seismograms is kept as the
canonical entry; its location and depth are updated to the group mean.
Returns the number of duplicate records removed.
"""
events = sorted(
session.exec(select(AimbatEvent)).all(),
key=lambda e: e.time,
)
# Build clusters of near-simultaneous events.
clusters: list[list[AimbatEvent]] = []
for event in events:
if clusters and event.time - clusters[-1][-1].time <= tolerance:
clusters[-1].append(event)
else:
clusters.append([event])
removed = 0
for cluster in clusters:
if len(cluster) < 2:
continue
canonical = max(cluster, key=lambda e: len(e.seismograms))
duplicates = [e for e in cluster if e.id != canonical.id]
# Set location / depth to the group mean.
canonical.latitude = _mean([e.latitude for e in cluster])
canonical.longitude = _mean([e.longitude for e in cluster])
canonical.depth = _mean_opt([e.depth for e in cluster])
for dup in duplicates:
for seis in list(dup.seismograms):
seis.event_id = canonical.id
session.add(seis)
session.flush() # apply FK changes before deleting the row
session.delete(dup)
removed += 1
session.add(canonical)
session.commit()
return removed
with Session(engine) as session:
n = deduplicate_events(session)
print(f"Removed {n} duplicate event(s).")
Running Alignment
from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.core import (
create_iccs_instance,
create_snapshot,
run_iccs,
run_mccc,
)
from aimbat.models import AimbatEvent
with Session(engine) as session:
event = session.exec(select(AimbatEvent)).first()
assert event is not None
bound = create_iccs_instance(session, event)
run_iccs(session, bound.iccs, autoflip=True, autoselect=True)
create_snapshot(session, event, comment="after ICCS")
run_mccc(session, event, bound.iccs, all_seismograms=False)
create_snapshot(session, event, comment="after MCCC")