Skip to content

Python API

After running ICCS and MCCC alignment across many events, the accumulated quality metrics span stations and events in ways that are natural to analyse with pandas and matplotlib but impossible from the CLI or TUI. The Python API is the primary interface for that kind of post-processing quality analysis: you query AimbatSeismogram, AimbatStation, and AimbatEvent records directly, build DataFrames, and apply whatever aggregation or visualisation you need.

The same API also drives the CLI and TUI internally, so it covers the full workflow — data ingestion, parameter management, alignment, snapshots — not just quality analysis. See the full API reference for a complete listing.

Writing seismogram data

AimbatSeismogram.data is backed by an IO cache. Assigning a new array replaces the cached value and, for data types that support it, writes through to the source file on disk. In-place mutation of the returned array does not persist — each access returns the cached value, so changes to the array itself are silently discarded:

seis.data = my_modified_array  # replaces cache (and writes to disk if supported)
arr = seis.data
arr[0] = 42                    # raises ValueError — the cached array is read-only

AIMBAT does not write seismogram data in normal usage. All processing results are stored as parameters in the database; source files are treated as read-only.

Core Concepts

The API is built on three main components:

  1. Models: SQLModel classes that represent the database schema (aimbat.models) as Python objects.
  2. Core Functions: High-level operations that manipulate those models (aimbat.core).
  3. Database Session: A SQLAlchemy session used to track changes and interact with the project database.

Project Location

By default AIMBAT reads and writes aimbat.db in the current directory. Set AIMBAT_PROJECT to use a different path:

export AIMBAT_PROJECT=/path/to/my/project.db

The aimbat.db.engine singleton picks this up automatically, so scripts that import it will use the same database as the CLI.

Session Management

Every database operation requires a Session. Use it as a context manager so it is always closed cleanly:

from sqlmodel import Session
from aimbat.db import engine

with Session(engine) as session:
    # query or modify data here
    pass

Changes accumulate in the session and are written to disk only when session.commit() is called (or when you call a core function that commits internally). If an exception is raised before committing, the session is rolled back automatically.

Creating a Project

from aimbat.db import engine
from aimbat.core import create_project

create_project(engine)

This is a one-time operation that creates the schema and the SQLite triggers that enforce database constraints and track modification times. It raises RuntimeError if the schema already exists.

Adding Data

The central function is add_data_to_project:

from sqlmodel import Session
from aimbat.db import engine
from aimbat.core import add_data_to_project
from aimbat.io import DataType

with Session(engine) as session:
    add_data_to_project(session, paths, DataType.SAC)

The DataType enum controls what is read from each source:

DataType What is created
SAC Event + Station + Seismogram
JSON_EVENT Event only (no seismogram)
JSON_STATION Station only (no seismogram)

JSON formats

Event (DataType.JSON_EVENT):

{
    "time": "2024-03-15T14:22:11Z",
    "latitude": 37.5,
    "longitude": 143.0,
    "depth": 35.0
}

Station (DataType.JSON_STATION):

{
    "name": "ANMO",
    "network": "IU",
    "location": "00",
    "channel": "BHZ",
    "latitude": 34.946,
    "longitude": -106.457,
    "elevation": 1820.0
}

Providing event or station metadata externally

SAC files from some sources omit event or station headers. In that case, add the metadata separately first and then link the SAC files to the resulting database records using event_id and station_id:

with Session(engine) as session:
    add_data_to_project(session, [event_json], DataType.JSON_EVENT)
    add_data_to_project(session, [station_json], DataType.JSON_STATION)

    event = session.exec(select(AimbatEvent)).one()
    station = session.exec(select(AimbatStation)).one()

    add_data_to_project(
        session,
        sac_files,
        DataType.SAC,
        event_id=event.id,
        station_id=station.id,
    )

Quality Analysis

After alignment has been run across a set of events, each seismogram carries quality metrics that can be queried directly from the database. The sections below show the most common patterns.

Quality data model

Per-seismogram metrics are stored in AimbatSeismogramQuality and accessed via seismogram.quality:

Attribute Description
iccs_cc ICCS cross-correlation with the stack
mccc_cc_mean MCCC waveform quality — mean CC across seismogram pairs
mccc_cc_std MCCC waveform consistency — std of CC across pairs
mccc_error MCCC timing precision (pd.Timedelta, SEM from covariance matrix)

The per-event MCCC global array fit is stored in AimbatEventQuality and accessed via event.quality:

Attribute Description
mccc_rmse Global array fit (pd.Timedelta)

Build a per-seismogram DataFrame across all events

The most flexible starting point is a flat DataFrame with one row per seismogram:

from sqlalchemy.orm import selectinload
from sqlmodel import Session, select
import pandas as pd

from aimbat.db import engine
from aimbat.models import AimbatSeismogram
from aimbat.utils import rel

with Session(engine) as session:
    seismograms = session.exec(
        select(AimbatSeismogram).options(
            selectinload(rel(AimbatSeismogram.station)),
            selectinload(rel(AimbatSeismogram.event)),
            selectinload(rel(AimbatSeismogram.quality)),
        )
    ).all()

    rows = []
    for seis in seismograms:
        q = seis.quality
        rows.append({
            "station": f"{seis.station.network}.{seis.station.name}",
            "event_time": seis.event.time,
            "iccs_cc": q.iccs_cc if q else None,
            "mccc_cc_mean": q.mccc_cc_mean if q else None,
            "mccc_error_s": q.mccc_error.total_seconds() if (q and q.mccc_error) else None,
        })

df = pd.DataFrame(rows)

From here you can groupby station, pivot on event, filter by quality threshold, or feed the result directly into matplotlib.

Station-level quality summary

SeismogramQualityStats.from_station aggregates all per-seismogram metrics across every event recorded at a station:

from aimbat.models import AimbatSeismogram, AimbatStation, SeismogramQualityStats

with Session(engine) as session:
    stations = session.exec(
        select(AimbatStation).options(
            selectinload(rel(AimbatStation.seismograms)).selectinload(
                rel(AimbatSeismogram.quality)
            )
        )
    ).all()
    stats = [SeismogramQualityStats.from_station(s) for s in stations]

Each stats item exposes cc_mean, mccc_cc_mean, and mccc_error as (mean, SEM) pairs aggregated across all events at that station.

Event-level quality summary

SeismogramQualityStats.from_event aggregates per-seismogram metrics for a single event and also carries the global mccc_rmse array-fit value:

from aimbat.models import AimbatEvent, AimbatSeismogram, SeismogramQualityStats

with Session(engine) as session:
    events = session.exec(
        select(AimbatEvent).options(
            selectinload(rel(AimbatEvent.seismograms)).selectinload(
                rel(AimbatSeismogram.quality)
            ),
            selectinload(rel(AimbatEvent.quality)),
        )
    ).all()
    stats = [SeismogramQualityStats.from_event(e) for e in events]

mccc_rmse on each stats object is the global array fit for that event — useful for comparing event difficulty across a dataset.

Worked Example

The script below builds a complete project from scratch. It loads 3 events, 10 stations, and 20 seismograms where the SAC files carry waveform data but no event or station headers — all metadata is provided via JSON — and takes an initial snapshot of each event before any processing.

"""
Load a project from SAC files that carry no event/station headers.

Layout:
  - 3 events
  - 10 broadband stations
  - 20 seismograms: events 1 and 2 recorded at 7 stations each,
    event 3 recorded at 6 stations
"""

import json
from pathlib import Path
from typing import Any

from sqlmodel import Session, select

from aimbat.db import engine
from aimbat.core import (
    add_data_to_project,
    create_project,
    create_snapshot,
)
from aimbat.io import DataType
from aimbat.models import AimbatEvent, AimbatStation

# ------------------------------------------------------------------ #
# Metadata                                                            #
# ------------------------------------------------------------------ #

EVENTS: list[dict[str, Any]] = [
    {
        "time": "2024-01-12T08:14:33Z",
        "latitude": 37.52,
        "longitude": 143.04,
        "depth": 35.0,
    },
    {
        "time": "2024-02-28T21:07:55Z",
        "latitude": -23.11,
        "longitude": -67.89,
        "depth": 120.0,
    },
    {
        "time": "2024-03-09T03:51:20Z",
        "latitude": 51.72,
        "longitude": 178.35,
        "depth": 55.0,
    },
]

STATIONS: list[dict[str, Any]] = [
    {
        "name": "ANMO",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 34.946,
        "longitude": -106.457,
        "elevation": 1820.0,
    },
    {
        "name": "COLA",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 64.874,
        "longitude": -147.861,
        "elevation": 84.0,
    },
    {
        "name": "GUMO",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 13.589,
        "longitude": 144.868,
        "elevation": 74.0,
    },
    {
        "name": "HRV",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 42.506,
        "longitude": -71.558,
        "elevation": 200.0,
    },
    {
        "name": "MAJO",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 36.536,
        "longitude": 138.204,
        "elevation": 399.0,
    },
    {
        "name": "MIDW",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 28.216,
        "longitude": -177.370,
        "elevation": 150.0,
    },
    {
        "name": "POHA",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 19.757,
        "longitude": -155.531,
        "elevation": 1936.0,
    },
    {
        "name": "SSPA",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 40.636,
        "longitude": -77.888,
        "elevation": 270.0,
    },
    {
        "name": "TATO",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 24.975,
        "longitude": 121.498,
        "elevation": 75.0,
    },
    {
        "name": "YSS",
        "network": "IU",
        "location": "00",
        "channel": "BHZ",
        "latitude": 46.958,
        "longitude": 142.760,
        "elevation": 89.0,
    },
]

# Which stations recorded each event (indices into STATIONS list).
# 7 + 7 + 6 = 20 seismograms total.
EVENT_STATION_MAP = {
    0: [0, 1, 2, 3, 4, 5, 6],  # event 1 — 7 seismograms
    1: [0, 1, 2, 3, 4, 5, 6],  # event 2 — 7 seismograms
    2: [0, 1, 2, 3, 4, 5],  # event 3 — 6 seismograms
}

# ------------------------------------------------------------------ #
# Helpers                                                             #
# ------------------------------------------------------------------ #


def write_json(data: dict, path: Path) -> Path:
    path.write_text(json.dumps(data))
    return path


def sac_path(event_idx: int, station_idx: int) -> Path:
    """Return the path to the SAC file for a given event/station pair."""
    return Path(f"data/ev{event_idx + 1:02d}_st{station_idx + 1:02d}.sac")


# ------------------------------------------------------------------ #
# Main                                                                #
# ------------------------------------------------------------------ #

workdir = Path("json_metadata")
workdir.mkdir(exist_ok=True)

# 1. Initialise project
create_project(engine)

with Session(engine) as session:

    # 2. Register events from JSON
    event_files = [
        write_json(ev, workdir / f"event_{i:02d}.json") for i, ev in enumerate(EVENTS)
    ]
    add_data_to_project(session, event_files, DataType.JSON_EVENT)

    # 3. Register stations from JSON
    station_files = [
        write_json(st, workdir / f"station_{i:02d}.json")
        for i, st in enumerate(STATIONS)
    ]
    add_data_to_project(session, station_files, DataType.JSON_STATION)

    # 4. Retrieve the newly created records
    events = session.exec(select(AimbatEvent)).all()
    stations = session.exec(select(AimbatStation)).all()

    # Build lookup maps by (time, network+name) so insertion order doesn't matter
    event_by_time = {str(e.time)[:19]: e for e in events}
    station_by_key = {(s.network, s.name): s for s in stations}

    # 5. Add SAC files, linking each to its pre-registered event and station
    for ev_idx, st_indices in EVENT_STATION_MAP.items():
        ev_time = EVENTS[ev_idx]["time"][:19]
        db_event = event_by_time[ev_time]

        for st_idx in st_indices:
            st_meta = STATIONS[st_idx]
            db_station = station_by_key[(st_meta["network"], st_meta["name"])]

            add_data_to_project(
                session,
                [sac_path(ev_idx, st_idx)],
                DataType.SAC,
                event_id=db_event.id,
                station_id=db_station.id,
            )

    # 6. Snapshot the initial state before any processing
    events = session.exec(select(AimbatEvent)).all()
    for event in events:
        create_snapshot(session, event, comment="initial import")

print("Project ready.")
print(f"  Events:   {len(events)}")
print(f"  Stations: {len(stations)}")

Querying the Database

Models can be queried directly using SQLModel's select:

from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.models import AimbatEvent, AimbatSeismogram

with Session(engine) as session:
    events = session.exec(select(AimbatEvent)).all()
    for event in events:
        print(f"{event.time}  {len(event.seismograms)} seismograms")

    # Filter — seismograms marked as selected
    selected = session.exec(
        select(AimbatSeismogram).where(
            AimbatSeismogram.parameters.has(select=True)  # type: ignore[attr-defined]
        )
    ).all()

Deduplicating Events

add_data_to_project deduplicates stations automatically by SEED code (network, name, location, channel), so importing the same station from multiple sources never creates duplicate records.

Events are a different story: they are deduplicated by exact origin time. When two data sources report the same earthquake with times that differ by a second or two, they are stored as separate AimbatEvent records. The script below detects such near-duplicates, merges their seismograms into the record with the most data, averages the location and depth, and removes the extras.

"""
Deduplicate events that were imported from sources reporting slightly different
origin times for the same earthquake.

Background
----------
``add_data_to_project`` deduplicates stations by SEED code
``(network, name, location, channel)`` — so importing the same station twice,
even with different coordinates, always reuses the existing record.  Station
duplicates therefore cannot arise through the normal import path.

Events are deduplicated by exact origin time.  When two data sources report
the same earthquake with origin times that differ by a second or two, they are
stored as *separate* ``AimbatEvent`` records. This script finds such
near-duplicate events, merges their seismograms into the canonical record
(the one with the most seismograms), averages the location and depth, then
removes the duplicates.

Run this script *before* starting any processing, and take a snapshot
afterwards so the clean state is recoverable.
"""

from pandas import Timedelta
from sqlmodel import Session, select

from aimbat.db import engine
from aimbat.models import AimbatEvent

# Merge events whose origin times differ by less than this value.
TIME_TOLERANCE = Timedelta(seconds=10)


def _mean(values: list[float]) -> float:
    return sum(values) / len(values)


def _mean_opt(values: list[float | None]) -> float | None:
    clean = [v for v in values if v is not None]
    return sum(clean) / len(clean) if clean else None


def deduplicate_events(session: Session, tolerance: Timedelta = TIME_TOLERANCE) -> int:
    """Merge event records whose origin times are within *tolerance*.

    Events are sorted by time and clustered greedily: a new cluster begins
    whenever the gap to the previous event exceeds *tolerance*.

    For each cluster the record with the most seismograms is kept as the
    canonical entry; its location and depth are updated to the group mean.

    Returns the number of duplicate records removed.
    """
    events = sorted(
        session.exec(select(AimbatEvent)).all(),
        key=lambda e: e.time,
    )

    # Build clusters of near-simultaneous events.
    clusters: list[list[AimbatEvent]] = []
    for event in events:
        if clusters and event.time - clusters[-1][-1].time <= tolerance:
            clusters[-1].append(event)
        else:
            clusters.append([event])

    removed = 0
    for cluster in clusters:
        if len(cluster) < 2:
            continue

        canonical = max(cluster, key=lambda e: len(e.seismograms))
        duplicates = [e for e in cluster if e.id != canonical.id]

        # Set location / depth to the group mean.
        canonical.latitude = _mean([e.latitude for e in cluster])
        canonical.longitude = _mean([e.longitude for e in cluster])
        canonical.depth = _mean_opt([e.depth for e in cluster])

        for dup in duplicates:
            for seis in list(dup.seismograms):
                seis.event_id = canonical.id
                session.add(seis)
            session.flush()  # apply FK changes before deleting the row
            session.delete(dup)
            removed += 1

        session.add(canonical)

    session.commit()
    return removed


with Session(engine) as session:
    n = deduplicate_events(session)

print(f"Removed {n} duplicate event(s).")

Running Alignment

from sqlmodel import Session, select
from aimbat.db import engine
from aimbat.core import (
    create_iccs_instance,
    create_snapshot,
    run_iccs,
    run_mccc,
)
from aimbat.models import AimbatEvent

with Session(engine) as session:
    event = session.exec(select(AimbatEvent)).first()
    assert event is not None

    bound = create_iccs_instance(session, event)

    run_iccs(session, bound.iccs, autoflip=True, autoselect=True)
    create_snapshot(session, event, comment="after ICCS")

    run_mccc(session, event, bound.iccs, all_seismograms=False)
    create_snapshot(session, event, comment="after MCCC")