Table of Contents
1 Dependencies
Here we show the dependencies used in ddmpy
:
from __future__ import print_function from collections import OrderedDict import traceback import time import numpy as np import os from scipy import io import scipy.io as spio import scipy.sparse as ssp from scipy.sparse import coo_matrix from scipy.sparse import csr_matrix from scipy.sparse import csc_matrix import scipy.linalg as la import scipy.sparse.linalg as sla from scipy.sparse.linalg import inv from scipy.sparse.linalg import eigsh import matplotlib.pyplot as plt from csv import *
Dependencies .
2 Class TimeIt
The Class TimiIt below is used to register the time past in any functions
call with the decorator @TimeIt()
:
class TimeIt: debug = False max_events = None events = [] stack = [] t0 = time.time() def __init__(self, name=""): self.name = name def __enter__(self): self.begin = time.time() - self.t0 self.end = np.nan self.duration = np.nan self.level = len(self.stack) self.events.append(self) self.stack.append(len(self.events)-1) if TimeIt.debug: print(self.__str__(which="current")) return self def __exit__(self, type, value, traceback): self.end = time.time() - self.t0 self.duration = self.end - self.begin if TimeIt.debug: print(self.__str__(which="current")) if self.stack: self.stack.pop() return False def __call__(self, f): def timed_f(*args, **kwargs): # We create a default name name = self.name if name=="": name = type(args[0]).__name__ + " " + f.__name__ # We time and run the function # We create a new TimeIt instance because self is created # at function declaration and is the same object for all # executions of the function with TimeIt(name) as t: res = f(*args, **kwargs) # Store the duration in the object if len(args)>0 and isinstance(args[0], LinearOperator): solver = args[0] key = "t_" + name.replace(" ", "_") if key in solver.parameters: solver.parameters[key] += t.duration else: solver.parameters[key] = t.duration return res return timed_f def __str__(self, which="all"): if which=="current": s = "{:50s} | {:2d} | {:12.7f} | {:12.7f} |\ {:12.7f}".format( "! "*self.level + self.name, self.level, self.begin, self.duration, self.end) elif which=="all": s = "\n".join( [t.__str__(which="current") for t in self.events[:TimeIt.max_events]]) else: # "stack" s = "\n".join( [self.events[i].__str__(which="current") for i in self.stack]) return s
TimeIt .
3 Class LinearOperator
Defines base tools like storing keyword arguments and solve methods for a Linear Operator.
class LinearOperator(object): defaults = {} def __init__(self, A=None, **kwargs): """ Constructor of the solver Store the keyword arguments as parameters for the solver, performs all the analysis steps that are possible without having the matrix A and performs the setup if the matrix A is provided """ if hasattr(self, "defaults"): self.parameters = OrderedDict( sorted(self.defaults.items(), key=lambda x:str.lower(x[0]))) self.parameters.update(kwargs) else: self.parameters = OrderedDict(kwargs.items()) self.setup_performed = False if A is not None: self.setup(A) # abstract method def setup(self, A, **kwargs): """ Setups the solver to work on a particular matrix A Store the keyword arguments as parameters for the solver and performs all computational steps that are possible without having the RHS. """ self.A = A self.shape = A.shape self.parameters.update(kwargs) # some default parameters are solver types instead of # instance and a type v has to be replaced by an # instance of v for k, v in self.parameters.items(): if type(v)==type: instance = v.__new__(v) instance.__init__() self.parameters[k] = instance self.setup_performed = True # abstract method def solve(self, b): """Performs a solve with b as a right-hand side vector. A subclass may allow multiple rhs to be specified as a matrix. """ pass def dot(self, b): if self.setup_performed: return self.solve(b) else: raise RuntimeError("The operator was not setup before" " performing the solve") def __matmul__(self, b): # self @ b return self.dot(b) def matvec(self, b): # scipy.sparse.linalg.aslinearoperator() return self.dot(b) def __str__(self, level=0): s = [self.__class__.__name__] for key, value in self.parameters.items(): k = str(key) if k=="xexa" or k=="M" or k=="x0" or k=="W": # no need to print matrix/vector-like object v = "True" elif isinstance(value, LinearOperator): v = value.__str__(level+1) else: v = str(value) s.append(" "*(1+level) + k + " : " + v) return "\n".join(s) def __getattr__(self, item): if item != "parameters" and item in self.parameters: return self.parameters[item] elif item in self.__dict__: return self.__dict__[item] else: raise AttributeError("No {} in {}." .format(item, self))
LinearOperator .
4 Iterative Linear Solver
4.1 Conjugate gradient
4.1.1 The cg()
function:
def apply_strat(strat, W, n_iter): """Reduced the cg coefficients using deflation parameters stored in strat. Parameters ---------- strat : list object [Wgiven, k, l1, l2, 'S0/S1/S2/S3/S4', 'LD/MD,LMD', s, 'R/HR', sigma] - Wgiven : bolean, set to True if the user uses its own deflation subspace. - k : integer, number of eigenvector to compute. - [l1, l2] : integer, used to reduce information from the cg iterations. - str['S0|S1|S2|S3|S4'] : used to indicate how to reduce : 'S0': no deflation, 'S1': l1=0|l2=k=n_iter, 'S2': l1=0|l2=n_iter 'S3': user parameters default - str['LD|MD|LMD'] : used to indicate which part of the spectrum need to be computed. 'LD' Least Dominant, 'MD' Most Dominant, 'LMD' From both sides. - s : integer, current system to solve. - str['R|HR'] : if 'R' use Lanczos ritz procedure to compute eigenpair if 'HR' use Harmonic ritz. - sigma : real, Find eigenvalues near sigma. W : Matrix-like object, the deflation subspace. n_iter : integer, number of iteration computed by cg to solve the s-1 system. Explicit Returns : strat. Implicit Returns : _cg_omega, _cg_gamma, _cg_d, _cg_z, _cg_p, _cg_mu global variables. """ global _cg_omega, _cg_gamma, _cg_d, _cg_z, _cg_p, _cg_mu #strat[4] == 'S3' default: strat[2] = max(0, min(strat[2], n_iter-1)) strat[3] = min(n_iter-1, strat[3]) diff = strat[3] - strat[2] + 1 strat[1] = min(diff, strat[1]) if strat[4] == "S1": strat[1] = n_iter; strat[2] = 0; strat[3] = n_iter-1 elif strat[4] == "S2": strat[2] = 0; strat[3] = n_iter-1 elif strat[4] == "S4": strat[3] = n_iter - 1 - strat[2]; strat[2] = n_iter - min(diff, n_iter - 1) _cg_omega = _cg_omega[ strat[2] : strat[3] ] _cg_gamma = _cg_gamma[ strat[2] : strat[3] ] _cg_d = _cg_d[ strat[2] : strat[3] ] _cg_z = _cg_z[..., strat[2] : strat[3] + 1 ] _cg_p = _cg_p[..., strat[2] : strat[3] + 1 ] if W is not None: _cg_mu = _cg_mu[ ..., strat[2] : strat[3] + 1 ] else: _cg_mu = 0 return strat
def lanczos_hr(strat, n_iter, A, M, W): """Using CG coefficients computed from a previous solve, build and solve the corresponding Rayleigh/Harmonic Ritz problem. Parameters ---------- Inputs: strat : list object [Wgiven, k, l1, l2, 'S0/S1/S2/S3/S4', 'LD/MD,LMD', s, 'R/HR', sigma] - Wgiven : bolean, set to True if the user uses its own deflation subspace. - k : integer, number of eigenvector to compute. - [l1, l2] : integer, used to reduce information from the cg iterations. - str['S0|S1|S2|S3|S4'] : used to indicate how to reduce : 'S0': no deflation, 'S1': l1=0|l2=k=n_iter, 'S2': l1=0|l2=n_iter 'S3': user parameters default - str['LD|MD|LMD'] : used to indicate which part of the spectrum need to be computed. 'LD' Least Dominant, 'MD' Most Dominant, 'LMD' From both sides. - s : integer, current system to solve. - str['R|HR'] : if 'R' use Lanczos ritz procedure to compute eigenpair if 'HR' use Harmonic ritz. - sigma : real, Find eigenvalues near sigma. W : Matrix-like object, the deflation subspace. A : Matrix-like object from Ax=b M : Matrix-like object, a preconditionner n_iter : integer, number of iteration computed by cg to solve the s-1 system. Output: W the updated deflation matrix """ global _cg_omega, _cg_gamma, _cg_d, _cg_z, _cg_p, _cg_mu, _WTA, _KER m = min( n_iter, len(_cg_omega) ) if( m <= 1 ): return None if strat[5] == 'LD': which = 'SM' elif strat[5] == 'MD': which = 'LM' elif strat[5] == 'LMD': which = 'BE' if( strat[7] == "HR"): alpha = np.zeros(m); alpha[0] = (_cg_d[0]/_cg_omega[0]) * (1+_cg_gamma[0]) beta = np.zeros(m-1) for i in range(m-1): alpha[i+1] = (_cg_d[i+1]/_cg_omega[i+1]) * (1/_cg_gamma[i+1]) beta[i] = -(_cg_d[i+1]/_cg_omega[i]) Gtild = (np.diag(alpha) + np.diag(beta, 1) + np.diag(beta, -1)) Dtild = np.diag(_cg_d) alpha = np.zeros(m); alpha[0] = 1/_cg_omega[0] beta = np.zeros(m); beta[0] = -_cg_gamma[0] for i in range(m-1): alpha[i+1] = 1/_cg_omega[i+1] beta[i+1] = -_cg_gamma[i+1] data = np.array([alpha, -1*alpha]) Ltild = ssp.diags( data, np.array([0, -1]), shape=(m+1, m) ) data = np.array([np.ones(m+1), beta]) Utild = ssp.diags( data, np.array([0, +1]), shape=(m+1, m+1) ) M = ssp.identity(A.shape[0], format = "csc") if M is None else M #Comupte Eigenvalue problem: if strat[9] == False: if strat[1] >= strat[3]-strat[2]+1: strat[1] = strat[3]-strat[2]-1 if W is not None: DL = _cg_mu@Ltild G = np.block([ [_WTA@M@_WTA.T, _KER@DL], [DL.T@_KER, Gtild] ]) DL = np.zeros( (_KER.shape[0], Dtild.shape[1]) ) F = np.block([ [_KER, DL], [DL.T, Dtild] ]) HRvalue, HRvector = eigen(A=G, B=F, B_I=None, n_v=strat[1], sigma=strat[8], which=which, dense=strat[9], local_solver=None) W = np.hstack((W,_cg_p[...,0:m]))@HRvector#np.hstack( (_WTA.T, (W@_cg_mu + (_cg_p@Utild)) @ Ltild) ) else: HRvalue, HRvector = eigen(A=Gtild, B=Dtild, B_I=None, n_v=strat[1], sigma=strat[8], which=which, dense=strat[9], local_solver=None) W = _cg_p[...,0:m]@HRvector#(_cg_p@(Utild)) @ (Ltild) elif (strat[7] == "RR"): alpha = np.zeros(m) alpha[0] = 1/_cg_omega[0] beta = np.zeros(m-1) for i in range(m-1): alpha[i+1] = (1/_cg_omega[i+1] + _cg_gamma[i]/_cg_omega[i]) beta[i] = (np.sqrt(max(_cg_gamma[i], 0)) / _cg_omega[i]) Tm = (np.diag(alpha) + np.diag(beta, 1) + np.diag(beta, -1)) RRvalue, RRvector = eigen(A=Tm, B=None, B_I=None, n_v=strat[1], sigma=strat[8], which=which, dense=strat[9], local_solver=None) W = _cg_z[...,0:m]@RRvector return W#@HRvector
def cg(A, b, x0=None, tol=1e-5, maxiter=None, xtype=None, M=None, callback=None, ritz=False, save_x=False, debug=False, true_res=False, xexa=None, W=None, strat=None, tags=None): """Solves the linear problem Ax=b using the Conjugate Gradient algorithm. Interface compatible with scipy.sparse.linalg.cg Parameters ---------- A : matrix-like object A is the linear operator on which we want to perform the solve operation. The only requirement on A is to provide a method dot(self, x) where x is a vector-like object. b : vector-like object b is the right-hand side of the system to solve. The only requirement on b is to provide the following methods: __len__(self) (not necessary if maxiter is provided), copy(self), __add__(self, w), __sub__(self, w), __rmul__(self, a), dot(self, w) where w is a vector-like and a is a scalar x0 : vector-like object starting guess for the solution, optional tol : float Relative tolerance to achieve, optional, default: 1e-5 maxiter : integer Maximum number of iterations, optional, default: len(b) xtype : not used (compatibility with scipy.sparse.linalg.cg) M : matrix-like object Preconditioner for A, optional callback : function After each iteration, callback(x) is called, where x is the current solution, optional ritz : boolean Store the dot products in order to compute the Ritz values later, optional save_x : boolean Store the value of x at each iteration, optional debug : boolean print debug info, optional true_res : boolean recompute the residual at each iteration, optional xexa : Vector-like object, Exact solution, optional W : matrix-like object W is the deflated subspace strat : list object [Wgiven, k, l1, l2, 'S0/S1/S2/S3/S4', 'LD/MD,LMD', s, 'R/HR', sigma] - Wgiven : bolean, set to True if the user uses its own deflation subspace. - k : integer, number of eigenvector to compute. - [l1, l2] : integer, used to reduce information from the cg iterations. - str['S0|S1|S2|S3|S4'] : used to indicate how to reduce : 'S0': no deflation, 'S1': l1=0|l2=k=n_iter, 'S2': l1=0|l2=n_iter 'S3': user parameters default - str['LD|MD|LMD'] : used to indicate which part of the spectrum need to be computed. 'LD' Least Dominant, 'MD' Most Dominant, 'LMD' From both sides. - s : integer, current system to solve. - str['R|HR'] : if 'R' use Lanczos ritz procedure to compute eigenpair if 'HR' use Harmonic ritz. - sigma : real, Find eigenvalues near sigma tags : list object ['A0/A1/A2/A3/A4', str(Irun), Stheta, rep, irr, str(enum_c), ISI, Ipart] - tags[0] in str['A0/A1/A2/A3/A4'] used to indicates wich matrix is used - tags[1] = str(Irun) where Irun is the run's number - tags[2] = Stheta a string in ['1E2/1E4/1E6'] used to indicates the eigenvalue order of the LD, MD or LMD - tags[3] = rep is the repository name for outputs - tags[4] = irr = "1" or "2" used to write header only one time - tags[5] = str(enum_c) where enum_c is an integer used to count cg() execution - tags[6] = ISI, where ISI is a string in str['S0|S1|S2|S3|S4'] - tags[7] = Ipart, where Ipart is a string in str['LD|MD|LMD'] """ bb = b.T.dot(b) # $||b||^2 = b^T b$ maxiter = A.shape[0] if maxiter is None else maxiter strat = [False, 0, 0, 0, 'S0', 'LD', 1, 'HR', 1e-16, True] if strat is None else strat with TimeIt("Instrumentation"): global _cg_n_iter _cg_n_iter=0 if ritz or strat[4] != 'S0': global _cg_omega, _cg_gamma _cg_omega = np.zeros(maxiter) _cg_gamma = np.zeros(maxiter) if strat[4] != 'S0': global _cg_p, _cg_z, _cg_d, _cg_mu, _WTA, _KER _cg_p = np.zeros( (A.shape[0], maxiter) ) _cg_z = np.zeros( (A.shape[0], maxiter) ) _cg_d = np.zeros(maxiter) if W is not None: _cg_mu = np.zeros( (W.shape[1], maxiter+1) ) # Initialization if W is None: x = 0 * b if x0 is None else x0 else: _WTA = W.T@A; _KER = _WTA@W x = (W@la.solve(_KER,W.T@b)) if x0 is None else x0+(W@la.solve(_KER,W.T@(b-A.dot(x0)))) # $x = x_{-1} + W.(W^{T}.A.W)^{-1}.W^{T}.r_{-1}$ r = b - A.dot(x) # $r = b - \A x$ rr = r.T.dot(r) # $||r||^2 = r^T r$ print("Iteration: {}, ""||r||_2/||b||_2 = {}".format(0, np.sqrt(rr/bb)[0, 0])) if rr / bb <= tol * tol: # $\frac{||r||}{||b||} \leq \varepsilon$ if W is None: if strat[4] != 'S0': strat = apply_strat(strat, W, _cg_n_iter) return x, 0, W else: return x, 0 else: strat = apply_strat(strat, W, _cg_n_iter) return x, 0, W z = r if M is None else M.dot(r) # $z = \M r$ if W is not None: mu = la.solve(_KER, _WTA.dot(z)) # $\mu = (W^{T}.A.W)^{-1}.(W^{T}.A).z$ p = z.copy() if W is None else z - W.dot(mu) # $p = z$ or $p = z - W.\mu$ rz = r.T.dot(z) # $r^T z$ with TimeIt("Instrumentation"): if save_x: global _cg_x _cg_x = [x] if strat: if strat[4] != 'S0': _cg_z[...,0:1] = z.copy()/np.sqrt(rz) if W is not None: _cg_mu[...,0:1] = mu.copy() drerr = False if xexa is None else np.asscalar(np.sqrt((x-xexa).T.dot(A.dot(x-xexa)))) csv_cg(0, [0, la.norm(b-A.dot(x))/la.norm(b), la.norm(r)/la.norm(b), drerr, W, strat[6],strat[0]], strat[4]+"-"+strat[5]+"-"+strat[7]+".csv", tags); for i in range(maxiter): Ap = A.dot(p) # $\A p$ alpha = (rz / (p.T.dot(Ap)))[0,0] # $\alpha = \frac{r^T z}{p^T \A p}$ x += alpha * p # $x = x + \alpha p$ if true_res: r = b - A.dot(x) # $r = b - \A x$ else: r -= alpha * Ap # $r = r - \alpha\ \A p$ if W is not None: # Reorthogonalisation chek = np.matmul(W.T,r) r = r - np.matmul(W, la.solve(W.T@W, chek)) # $r = r - W (W^{T} W)^{-1} (W^{T} r)$ with TimeIt("Instrumentation"): if ritz or strat[4] != 'S0': if i>0: _cg_gamma[i-1] = beta _cg_omega[i] = alpha _cg_n_iter += 1 if callback: callback(x) if save_x: _cg_x.append(x.copy()) if strat: if strat[4] != 'S0': _cg_p[...,i:i+1] = p.copy() _cg_d[i] = p.T.dot(Ap) drerr = False if xexa is None else np.asscalar(np.sqrt((x-xexa).T.dot(A.dot(x-xexa)))) csv_cg(i+1, [i+1, la.norm(b-A.dot(x))/la.norm(b), la.norm(r)/la.norm(b), drerr, W, strat[6], strat[0], strat[4]], strat[4]+"-"+ strat[5]+"-"+strat[7]+".csv", tags); rr = r.T.dot(r) # $||r||^2 = r^T r$ if debug: print("Iteration: {}, " "||r||_2/||b||_2 = {}" .format(i, np.sqrt(rr/bb)[0, 0])) if rr / bb <= tol * tol or rr / bb > 3.0: # $\frac{||r||}{||b||} \leq \varepsilon$ if W is None: if strat[4] != 'S0': strat = apply_strat(strat, W, _cg_n_iter) return x, 0, lanczos_hr(strat, _cg_n_iter, A, M, W) else: return x, 0 else: strat = apply_strat(strat, W, _cg_n_iter) return x, 0, lanczos_hr(strat, _cg_n_iter, A, M, W) z = r if M is None else M.dot(r) # $z = \M r$ rz, rzold = r.T.dot(z), rz with TimeIt("Instrumentation"): if strat and strat[4] != 'S0': if i < maxiter-2 : _cg_z[...,i+1:i+2] = z.copy()/np.sqrt(rz) beta = (rz / rzold)[0,0] # $\beta = \frac{r_i^T z_i}{r_{i-1}^T z_{i-1}}$ if W is not None: mu = la.solve(_KER, _WTA.dot(z)) # $\mu =(W^{T}.A.W)^{-1}.(W^{T}.A).z$ if strat[4] != 'S0': _cg_mu[...,i+1:i+2] = mu.copy() p = z + beta * p if W is None else z + beta * p -W.dot(mu) # $p = z + \beta p$ or $p = z + beta * p -W.\mu$ if W is None: if strat[4] != 'S0': strat = apply_strat(strat, W, _cg_n_iter) return x, i, lanczos_hr(strat, _cg_n_iter, A, M, W) return x, i else: strat = apply_strat(strat, W, _cg_n_iter) return x, i, lanczos_hr(strat, _cg_n_iter, A, M, W)
cg()
standalone function .
4.1.2 The ConjGrad
solver.
This standalone cg()
function
is also available through a ConjGrad
solver class with the following parameters:
x0
, an initial guess for the solution.tol
, the stopping criterion (relative backward error).maxiter
, the maximum number of iterations.M
, a preconditioner.callback
, a function called at each iteration with the current iterate solution as an argument.ritz
, a boolean to enable the computation of the eigenvalues of the Hessenberg matrix (Ritz values ofA
) from coefficients computed during the iterations (see theritz_values()
method in Listing~ref:lst:py:ritzvalues).save_x
, a boolean to toggle the conservation of the intermediate solution at each iteration in an attributeself.x_
.setup_M
, a boolean to toggle the call ofM.setup()
.debug
, to toggle the printing of debug info.true_res
, to toggle the computation of the true residualr = b - A @ x
at each iteration instead of using an iterative formula.W
, the deflated subspacestrat
, list of parameters used to set deflation
class ConjGrad(LinearOperator): """Solve Ax=b using the CG algorithm Optional parameters are: - x0, initial guess for the solution, default=0*b - tol, relative tolerance to achieve, default=1e-5 - maxiter, maximum number of iterations, default=len(b) - M, preconditioner for A - callback, after each iteration, callback(x) is called, where x is the current solution - ritz=True/False, whether to compute the ritz values (approximate eigenvalues of A), default=False - save_x=True/False, whether to store x at each iteration, default=False - setup_M=True/False, whether to try and call M.setup(A) during the setup phase - debug=True/False, whether to print debug info - true_res=True/False, whether to recompute the true residual instead of a recurrence formula - W, deflated subspace default=None - strat : list [True/False, k, "S0/S1/S1/S3", "LD/MD/LMD"] set True if W is given, k>0 to use deflated cg and compute k approximate eigenvectors, close to "LD" Least Dominant, "MD" Most Dominant or "LMD" compute k close to "LD" and k close to "MD", "S0" => k=l=0, "S1" => k=l=infty, "S2" => l=infty k=1,..,l "S3" => l=[l1,l2] k= l2-l1+1 """ defaults = {"x0": None, "tol": 1e-5, "maxiter": None, "M": None, "callback": None, "ritz": False, "save_x": False, "setup_M": True, "debug": False, "true_res": False, "xexa" : None, "W": None, "strat": [False, 0, 0, 0, 'S0', 'LD', 1, 'R', 1e-16, True], "tags": ["A", "0", "N", "EXP", "1", "1", "0", "0"]} @TimeIt() def setup(self, A, **kwargs): LinearOperator.setup(self, A, **kwargs) # We setup the preconditioner if possible and # asked for by the user if self.setup_M: if hasattr(self.M, "setup"): self.M.setup(A) @TimeIt() def solve(self, b): # For deflation, the preconditioner first # orthogonalize x0, through its self.M.x0 method x0_f = getattr(self.M, "x0", None) if x0_f is not None: r0 = b if self.x0 is None else ( b - self.A.dot(self.x0)) self.x0 = x0_f(r0) # reshape b if b.ndim==1: b = b.reshape((-1, 1)) # Call the standalone cg function if self.W is None: if self.strat[4] == 'S0': x, self.i = cg(self.A, b, self.x0, self.tol, self.maxiter, None, self.M, self.callback, self.ritz, self.save_x, self.debug, self.true_res, self.xexa, self.W, self.strat, self.tags) else: x, self.i, self.W = cg(self.A, b, self.x0, self.tol, self.maxiter, None, self.M, self.callback, self.ritz, self.save_x, self.debug, self.true_res, self.xexa, self.W, self.strat, self.tags) else: x, self.i, self.W = cg(self.A, b, self.x0, self.tol, self.maxiter, None, self.M, self.callback, self.ritz, self.save_x, self.debug, self.true_res, self.xexa, self.W, self.strat, self.tags) self.n_iter = _cg_n_iter self.parameters["n_iter"] = self.n_iter if self.ritz: self.omega = _cg_omega self.gamma = _cg_gamma if self.save_x: self.x_ = _cg_x if self.strat[4] == 'S0': return x else: return x, self.W def ritz_values(self): """Compute the ritz values of the Hessenberg matrix. Call this function after a solve has been performed with self.ritz==True """ self.n_iter = min(self.n_iter, self.strat[3]-self.strat[2]) if self.n_iter>1: alpha = np.zeros(self.n_iter) alpha[0] = 1/self.omega[0] beta = np.zeros(self.n_iter-1) for i in range(self.n_iter-1): alpha[i+1] = (1/self.omega[i+1] + self.gamma[i]/self.omega[i]) beta[i] = (np.sqrt(max(self.gamma[i], 0)) / self.omega[i]) T = (np.diag(alpha) + np.diag(beta, 1) + np.diag(beta, -1)) lambda_ = la.eigvalsh(T) else: lambda_ = np.array([1]) return lambda_
ConjGrad
Class (1/2).
- Computing the eigenvalues of the tridiagonal matrix.
If the
ritz
parameter is set toTrue
, it is possible, after the solve, to compute the eigenvalues of the tridiagonal Lanczos matrix \(\cal T\) of the CG algorithm citep:BurkittIdentityconjugategradient1988. These eigenvalues are Ritz values of the preconditioned matrix \(\M\A\). The condition number (\(\kappa({\cal T}) = \lambda_{\max}/\lambda_{\min}\)) gives an approximation of the condition number of the preconditioned matrix \(\kappa(\M\A)\).def ritz_values(self): """Compute the ritz values of the Hessenberg matrix. Call this function after a solve has been performed with self.ritz==True """ if self.n_iter>1: alpha = np.zeros(self.n_iter) alpha[0] = 1/self.omega[0] beta = np.zeros(self.n_iter-1) for i in range(self.n_iter-1): alpha[i+1] = (1/self.omega[i+1] + self.gamma[i]/self.omega[i]) beta[i] = (np.sqrt(max(self.gamma[i], 0)) / self.omega[i]) T = (np.diag(alpha) + np.diag(beta, 1) + np.diag(beta, -1)) lambda_ = la.eigvalsh(T) else: lambda_ = np.array([1]) return lambda_
ConjGrad
Lanczos Raylaigh-Ritz Spectral Approximation (2/2).
4.1.3 Testing ConjGrad
solver:
#Dependencies: from DDMPY import * #Test to solve the linear system A@x=b of size n: n = 100 # Define the size of the Linear Operator A A = np.diag(np.arange(1,n+1,1),0) # Set A to be a diagonal matrix A=diag(1,2,...,n) B = 1.0*np.random.rand(n,n).T # Build a random matrix Q,R = la.qr(B,mode='full') # Compute a QR decomposition A = Q.T@A@Q # add some perturbation onto the matrix A s = ConjGrad(A, ritz=True, tol=1e-15, maxiter=n, true_res=True) # build the solver xexa = np.random.rand(A.shape[0],1) # build a random exact solution b = A@xexa # build the corresponding rhs x = s.dot(b) # solve A@x=b where x is the unknown print(s) # print informations about the solve step print("||x-xexa||_{A}=",np.sqrt( (x-xexa).T@A@(x-xexa) ) ) # print the A-norm of the direct error print("||b-A@x||_{2}/||b||_{2}",la.norm(b-A@x)/la.norm(b)) # print the backward error print("Ritz value Computed using a Lanczos Rayleigh-Ritz Process:") # Print Ritz Values ritz = s.ritz_values() print("The five smallest ritz values: ",ritz[0:5])
Exemple using the ConjGrad
class.
5 Solving Generalized Eigenproblems in python.
In scipy
, two methods are available to solve generalized eigenproblems
\(\A u = \lambda \B u\):
- the
scipy.linalg.eigh()
function for dense matrices, that uses aLapack
implementation cite:AndersonLAPACKUsersguide1999. - the
scipy.sparse.linalg.eigsh()
function for sparse matrices, that relies on theARPACK
solver cite:LehoucqARPACKusersguide1998a.
The dense version computes all the eigenpairs. It is therefore quite
costly. The sparse version computes a selection of eigenpairs using an
iterative method. It is much faster but is not as robust: it does not
converge for some matrices. As a result, the ddmpy
package provides a
eigen()
function that calls the appropriate version depending on the
type of the input matrices (lines 2-12 and 13-22 of Listing~ref:lst:py:eigen for
dense and sparse matrices, respectively). If the sparse version does not
converge, it falls back to the dense version (lines 20-22). Its parameters are:
A
andB
, representing the matrices \(\A\) and \(\B\).B_I
, representing \(\B^{-1}\) (optional).n_v
, the number of eigenpairs to compute.dense
, whether to force the dense version.local_solver
, aLinearOperator
to be used to factorize \(\B\).
def eigen(A, B, B_I=None, n_v=None, sigma=None, which='SM', dense=True, local_solver=None): if dense or n_v is None: try : with TimeIt("eigen_dense"): if ssp.issparse(A) and ssp.issparse(B): w, v = la.eigh(A.A, B.A) else: w, v = la.eigh(A, B) if n_v is None: n_v = A.shape[0] else: n_v = min(A.shape[0], n_v) s_v = v.shape[1] if which == 'SM': w, v = w[:n_v], v[:, :n_v] elif which == 'LM': w, v = w[s_v-n_v:s_v], v[:, s_v-n_v:s_v] elif which == 'BE': if (n_v%2==0): in_v = int(n_v/2) else: in_v = int((n_v+1)/2) w, v = np.hstack( (w[:in_v], w[:s_v-in_v:s_v]) ), np.hstack( (v[:, :in_v], v[:, s_v-in_v:s_v]) ) except(la.LinAlgError) as err: print(err, '=> sparse computation with n_v=4') w, v = eigen(A, B, B_I, 4, sigma, which, dense=False, local_solver=local_solver) else: try: with TimeIt("eigen_sparse"): if B_I is None and local_solver is not None: B_I = local_solver B_I.setup(B) if sigma is None: w, v = sla.eigsh(A, n_v, which='SM', M=B, Minv=B_I) else: w, v = sla.eigsh(A, n_v, sigma=sigma, which=which, M=B, Minv=B_I) except (sla.ArpackNoConvergence, sla.ArpackError) as err: print(err, '=> dense computation') w, v = eigen(A, B, B_I, n_v, dense=True) return w, v
eigen()
function for solving generalized eigenproblems.