Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions sumpy/test/test_m2m.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""
Verification of compressed M2M translation.

Compares two approaches: 1. Compress coefficients -> embed -> M2M translate
2. M2M translate with full coefficients
"""
from __future__ import annotations

import os


os.environ["PYOPENCL_CTX"] = "0"

import math

import numpy as np
import pytest
import scipy.special as spsp
import sympy as sp

Check warning on line 19 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Stub file not found for "sympy" (reportMissingTypeStubs)

import sumpy.toys as t
from sumpy.array_context import _acf
from sumpy.expansion.local import (
LinearPDEConformingVolumeTaylorLocalExpansion,
)
from sumpy.expansion.multipole import (
LinearPDEConformingVolumeTaylorMultipoleExpansion,
VolumeTaylorMultipoleExpansion,
)
from sumpy.kernel import (
BiharmonicKernel,
HelmholtzKernel,
LaplaceKernel,
YukawaKernel,
)
from sumpy.tools import build_matrix


VERBOSE = False


def to_scalar(val):

Check warning on line 42 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Type annotation is missing for parameter "val" (reportMissingParameterType)

Check warning on line 42 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Type of parameter "val" is unknown (reportUnknownParameterType)
"""Convert symbolic or array value to scalar."""
if hasattr(val, "evalf"):

Check warning on line 44 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Argument type is unknown   Argument corresponds to parameter "obj" in function "hasattr" (reportUnknownArgumentType)
val = val.evalf()

Check warning on line 45 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Type of "evalf" is unknown (reportUnknownMemberType)
if hasattr(val, "item"):

Check warning on line 46 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Argument type is unknown   Argument corresponds to parameter "obj" in function "hasattr" (reportUnknownArgumentType)
val = val.item()

Check warning on line 47 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Type of "item" is unknown (reportUnknownMemberType)
return complex(val)

Check warning on line 48 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Argument type is unknown   Argument corresponds to parameter "real" in function "__new__" (reportUnknownArgumentType)


class NumericMatVecOperator:
"""Wrapper for symbolic matrix-vector operator with numeric
substitution."""

def __init__(self, symbolic_op, repl_dict):

Check warning on line 55 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Type annotation is missing for parameter "symbolic_op" (reportMissingParameterType)

Check warning on line 55 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Type of parameter "symbolic_op" is unknown (reportUnknownParameterType)
self.symbolic_op = symbolic_op
self.repl_dict = repl_dict
self.shape = symbolic_op.shape

def matvec(self, vec):
result = self.symbolic_op.matvec(vec)
out = []
for expr in result:
if hasattr(expr, "xreplace"):
out.append(complex(expr.xreplace(self.repl_dict).evalf()))
else:
out.append(complex(expr))
return np.array(out)


def get_repl_dict(kernel, extra_kwargs):
"""Numeric substitution for symbolic kernel parameters."""
repl_dict = {}
if "lam" in extra_kwargs:
repl_dict[sp.Symbol("lam")] = extra_kwargs["lam"]
if "k" in extra_kwargs:
repl_dict[sp.Symbol("k")] = extra_kwargs["k"]
return repl_dict


@pytest.mark.parametrize("knl,extra_kwargs", [
(LaplaceKernel(2), {}),
(YukawaKernel(2), {"lam": 0.1}),
(HelmholtzKernel(2), {"k": 0.5}),
(BiharmonicKernel(2), {}),
])
def test_m2m(knl, extra_kwargs):
"""Verify M2M errors by comparing formula vs direct computation."""
order = 7
dim = 2
repl_dict = get_repl_dict(knl, extra_kwargs)
global_const = to_scalar(knl.get_global_scaling_const())

# Set up source, centers, and target
source = np.array([[0.0], [0.1]])
strength = np.array([1.0])

m_center1 = np.array([0.0, 0.0])
offset_direction = np.array([-0.5, 0.25])
c2_c1_dist = 0.1
m_center2 = m_center1 + c2_c1_dist * offset_direction
h = m_center2 - m_center1

target = np.array([[2.0], [2.0]])

if VERBOSE:
print(f"M2M Coefficient Verification for {type(knl).__name__}:")
print(f"m_center1 = {m_center1}")
print(f"m_center2 = {m_center2}")
print(f"h = m_center2 - m_center1 = {h}")
print()
print(f"{'k':>3s} | {'ν(k)':>15s} | {'|ν(k)|':6s} | " # noqa: RUF001
f"{'difference by formula':>31s} | "
f"{'difference by direct computation':>31s} | "
f"{'abs err':>10s}")
print("-" * 120)

actx = _acf()

toy_ctx_full = t.ToyContext(
knl,
mpole_expn_class=VolumeTaylorMultipoleExpansion,
extra_kernel_kwargs=extra_kwargs
)

toy_ctx_local = t.ToyContext(
knl,
local_expn_class=LinearPDEConformingVolumeTaylorLocalExpansion,
extra_kernel_kwargs=extra_kwargs
)

p_full = t.PointSources(toy_ctx_full, source, weights=strength)
p2m_full = t.multipole_expand(actx, p_full, m_center1, order=order, rscale=1.0)

p_local = t.PointSources(toy_ctx_local, m_center2.reshape(2, 1), weights=strength)
p2l = t.local_expand(actx, p_local, target, order=order)

mexpn = LinearPDEConformingVolumeTaylorMultipoleExpansion(knl, order)

# Build matrix M
wrangler = mexpn.expansion_terms_wrangler
M_symbolic = wrangler.get_projection_matrix(rscale=1.0) # noqa: N806

Check failure on line 142 in sumpy/test/test_m2m.py

View workflow job for this annotation

GitHub Actions / basedpyright

Cannot access attribute "get_projection_matrix" for class "ExpansionTermsWrangler"   Attribute "get_projection_matrix" is unknown (reportAttributeAccessIssue)
numeric_op = NumericMatVecOperator(M_symbolic, repl_dict)
M = build_matrix(numeric_op, dtype=np.complex128) # noqa: N806
coeffs_full = (M @ p2l.coeffs) * global_const

# Get coefficient identifiers
stored_identifiers = mexpn.get_coefficient_identifiers()
full_identifiers = mexpn.get_full_coefficient_identifiers()
is_stored = [mi in stored_identifiers for mi in full_identifiers]
stored_indices = [i for i, st in enumerate(is_stored) if st]

mexpn_full = VolumeTaylorMultipoleExpansion(knl, order)
mexpn_full_idx = mexpn_full.get_full_coefficient_identifiers()

max_abs_error = 0.0

for k, nu_k in enumerate(full_identifiers):
k_card = sum(np.array(nu_k))
# assume all coefficient values are 1
alpha_k = 1

true_k_idx = mexpn_full_idx.index(nu_k)
basis_full = np.zeros(len(mexpn_full_idx), dtype=np.complex128)
basis_full[true_k_idx] = alpha_k
p2m_full_k = p2m_full.with_coeffs(basis_full)

# M^T @ alpha
basis_cmp = np.zeros(M.shape[0], dtype=np.complex128)
basis_cmp[stored_indices] = M[k, :] * alpha_k

# Embed back into full basis
basis_cmp_full = np.zeros(len(mexpn_full_idx), dtype=np.complex128)
for i, nu_i in enumerate(full_identifiers):
if basis_cmp[i] != 0:
true_i_idx = mexpn_full_idx.index(nu_i)
basis_cmp_full[true_i_idx] = basis_cmp[i]

p2m_cmp_k = p2m_full.with_coeffs(basis_cmp_full)

p2m2m_cmp = t.multipole_expand(
actx, p2m_cmp_k, m_center2, order=order
).eval(actx, target)
p2m2m_full = t.multipole_expand(
actx, p2m_full_k, m_center2, order=order
).eval(actx, target)

direct_diff = (p2m2m_cmp - p2m2m_full)[0]

error = 0.0 + 0.0j
for s, nu_js in enumerate(stored_identifiers):
nu_js_card = sum(np.array(nu_js))
inner_sum = 0.0 + 0.0j

if nu_js_card <= k_card:
start_idx = math.comb(order - k_card + dim, dim)
end_idx = math.comb(order - nu_js_card + dim, dim)

for idx in range(start_idx, end_idx):
nu_l = full_identifiers[idx]
nu_sum = tuple(a + b for a, b in zip(nu_l, nu_js, strict=True))

if nu_sum not in full_identifiers:
continue

derivative_idx = full_identifiers.index(nu_sum)
h_pow = np.prod(h ** np.array(nu_l))
fact_nu_l = np.prod(spsp.factorial(nu_l))

inner_sum += coeffs_full[derivative_idx] * h_pow / fact_nu_l

error += inner_sum * M[k, s]

abs_err = abs(error - direct_diff)
max_abs_error = max(max_abs_error, abs_err)

if VERBOSE:
print(f"{k:3d} | {nu_k!s:>15s} | {k_card:6d} | "
f"{error.real: .8e}{error.imag:+.8e}j | "
f"{direct_diff.real: .8e}{direct_diff.imag:+.8e}j | "
f"{abs_err:9.2e}")

if VERBOSE:
print(f"\nMaximum absolute error: {max_abs_error:.2e}")

assert max_abs_error < 1e-15, (
f"{type(knl).__name__}: error {max_abs_error:.2e}"
)


if __name__ == "__main__":
os.environ.setdefault("PYOPENCL_CTX", "0")
pytest.main([__file__, "-v", "-s"])
Loading