# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
#
import os, sys
try:
from mpi4py import MPI
except ImportError:# pragma: no cover
print("MPI capabilities not available")
import numpy as np
from genericROM.FE import FETools as FT
from genericROM.OperatorCompressors import ReducedQuadratureProcedure as RQP
from genericROM.BasicAlgorithms import EIM
from genericROM.DataCompressors import FusedSnapshotPOD as SP
from genericROM.Containers.OperatorCompressionData import OperatorCompressionDataMechanical as OCDM
from genericROM.Containers.OperatorCompressionData import OperatorPreCompressionDataMechanical as OPCDM
from genericROM.Containers.OnlineData import OnlineDataMechanical as ODM
from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature
secondOrderTensorOffDiagComponents = {1:[], 2:[(0,1)], 3:[(0,1), (1,2), (0,2)]}
[docs]
def PrepareOnline(onlineProblemData, operatorCompressionData):
"""
Prepares a online computation by constructing an onlineData and adding it to
the onlineProblemData
Parameters
----------
onlineProblemData : OnlineProblemData
containing the description of the problem for which a reduced solution is computed
operatorCompressionData : OperatorCompressionDataMecanical
containing the data structure generated by the data compression step for POD-ECM for a mechanical problem
"""
nReducedIntegrationPoints = operatorCompressionData.GetNumberOfReducedIntegrationPoints()
numberOfSigmaComponents = operatorCompressionData.GetNumberOfSigmaComponents()
onlineData = ODM.OnlineDataMechanical("U", nReducedIntegrationPoints, numberOfSigmaComponents)
constitutiveLawSets = onlineProblemData.GetSetsOfConstitutiveOfType("mechanical")
reducedListOTags = operatorCompressionData.GetReducedListOTags()
indicesOfReducedIntegPointsPerMaterial = FT.ComputeIndicesOfIntegPointsPerMaterial(reducedListOTags, constitutiveLawSets)
for tag, reducedIntegPoints in indicesOfReducedIntegPointsPerMaterial.items():
localNbReducedIntegPoints = len(reducedIntegPoints)
lawTag = ('mechanical', tag)
law = onlineProblemData.GetConstitutiveLaws()[lawTag]
var = law.GetOneConstitutiveLawVariable("var")
nstatv = law.GetOneConstitutiveLawVariable("nstatv")
onlineData.InitializeMaterial(tag, var, nstatv, localNbReducedIntegPoints)
onlineData.SetIndicesOfReducedIntegPointsPerMaterial(indicesOfReducedIntegPointsPerMaterial)
#onlineData.SetReducedData(operatorCompressionData)
onlineProblemData.AddOnlineData(onlineData)
[docs]
def ComputeOnline(onlineProblemData, timeSequence, operatorCompressionData, tolerance, onlineData = None, callback = None):
"""
Compute the online stage using the method POD and ECM for a mechanical problem
Parameters
----------
onlineProblemData : OnlineProblemData
containing the description of the problem for which a reduced solution is computed
operatorCompressionData : OperatorCompressionDataMecanical
containing the data structure generated by the data compression step for POD-ECM for a mechanical problem
timeSequence : 1D np.ndarray
time sequence where the reduced solution is computed
tolerance : float
stopping criterion for the Newton algorithm of the reduced problem
onlineData : OnlineDataMechanical, optional
used in the online stage to store quantities required for the tracking of certain quantites,
e.g. the internal state variable at the reduced integration points, required to compute the constitutive law.
callback : function, optional
used to override console output messages during the reduced problem iterations
Returns
-------
dict
onlineCompressedSolution, with time steps as keys and 1D np.ndarray of size (numberOfModes,)
"""
currentFolder = os.getcwd()
folder = currentFolder + os.sep + onlineProblemData.GetDataFolder()
os.chdir(folder)
if onlineData is None and operatorCompressionData is not None:
PrepareOnline(onlineProblemData, operatorCompressionData)
onlineData = onlineProblemData.GetOnlineData("U")
initialCondition = onlineProblemData.GetInitialCondition()
onlineCompressedSolution = {}
onlineCompressedSolution[timeSequence[0]] = initialCondition.GetReducedInitialSnapshot("U")
onlineData.UpdateInternalStateAtReducedIntegrationPoints(timeSequence[0])
for timeStep in range(1, len(timeSequence)):
previousTime = timeSequence[timeStep-1]
time = timeSequence[timeStep]
dtime = time - previousTime
if callback == None:
print("time =", time); sys.stdout.flush()
else:
callback.CurrentTime(timeStep, time)
reducedExternalForcesTemp = PrepareNewtonIterations(onlineProblemData, time, dtime)
reducedExternalForces = np.zeros(reducedExternalForcesTemp.shape)
MPI.COMM_WORLD.Allreduce([reducedExternalForcesTemp, MPI.DOUBLE], [reducedExternalForces, MPI.DOUBLE])
normExt = np.linalg.norm(reducedExternalForces)
before = onlineCompressedSolution[previousTime]# np.copy in niROM
after = before# np.copy in niROM
onlineCompressedSolution[time] = np.copy(before)
reducedInternalForcesTemp, reducedTangentMatrixTemp = ComputeReducedInternalForcesAndTangentMatrix(onlineProblemData, operatorCompressionData, before, after, time)
reducedInternalForces = np.zeros(reducedInternalForcesTemp.shape)
MPI.COMM_WORLD.Allreduce([reducedInternalForcesTemp, MPI.DOUBLE], [reducedInternalForces, MPI.DOUBLE])
if normExt>0:
normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces)/normExt
else:
normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces) # pragma: no cover
count = 0
while normRes > tolerance:
reducedTangentMatrix = np.zeros(reducedTangentMatrixTemp.shape)
MPI.COMM_WORLD.Allreduce([reducedTangentMatrixTemp, MPI.DOUBLE], [reducedTangentMatrix, MPI.DOUBLE])
sol = np.linalg.solve(reducedTangentMatrix, reducedExternalForces-reducedInternalForces)
onlineCompressedSolution[time] += sol
after = onlineCompressedSolution[time]# np.copy in niROM
reducedInternalForcesTemp, reducedTangentMatrixTemp = ComputeReducedInternalForcesAndTangentMatrix(onlineProblemData, operatorCompressionData, before, after, time)
reducedInternalForces = np.zeros(reducedInternalForcesTemp.shape)
MPI.COMM_WORLD.Allreduce([reducedInternalForcesTemp, MPI.DOUBLE], [reducedInternalForces, MPI.DOUBLE])
if normExt>0:
normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces)/normExt
else:
normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces)# pragma: no cover
if callback == None:
print("normRes =", normRes); sys.stdout.flush()
else:
callback.CurrentNormRes(normRes)
count += 1
if count == 200:
raise RuntimeError("problem could not converge after 200 iterations") # pragma: no cover
#solution is set: now update Internal Variables:
onlineData.UpdateInternalStateAtReducedIntegrationPoints(time)
constLaws = onlineProblemData.GetConstitutiveLaws()
for tag, intPoints in onlineData.GetIndicesOfReducedIntegPointsPerMaterial().items():
lawTag = ('mechanical', tag)
constLaws[lawTag].UpdateInternalState()
if callback == None:
print("=== Newton iterations:", count); sys.stdout.flush()
else:
callback.CurrentNewtonIterations(count)
os.chdir(currentFolder)
onlineProblemData.AddOnlineData(onlineData)
return onlineCompressedSolution
[docs]
def PrepareNewtonIterations(onlineProblemData, time, dtime):
"""
Prepares the Newton iteration by computing the reduced external forces
vector (depending on the loading) and updating the temperature loading to the onlineData.
Parameters
----------
onlineProblemData : OnlineProblemData
containing the description of the problem for which a reduced solution is computed
time : float
previous time where the reduced solution hasbeen computed
dtime : float
time increment
Returns
-------
1D np.ndarray
reducedExternalForces, of size (numberOfModes,)
"""
onlineData = onlineProblemData.GetOnlineData("U")
for law in onlineProblemData.GetConstitutiveLawsOfType("mechanical"):
law.SetOneConstitutiveLawVariable('dtime', dtime)
if ('U', 'temperature', 'ALLNODE') in onlineProblemData.loadings:
temperatureAtReducedIntegrationPoints0 = onlineProblemData.loadings[('U', 'temperature', 'ALLNODE')].GetTemperatureAtReducedIntegrationPointsAtTime(time-dtime)
temperatureAtReducedIntegrationPoints1 = onlineProblemData.loadings[('U', 'temperature', 'ALLNODE')].GetTemperatureAtReducedIntegrationPointsAtTime(time)
onlineData.UpdateTemperatureAtReducedIntegrationPoints(temperatureAtReducedIntegrationPoints0, temperatureAtReducedIntegrationPoints1)
reducedExternalForces = 0.
for loading in onlineProblemData.GetLoadings().values():
if loading.GetSolutionName() == "U":
reducedExternalForces += loading.ComputeContributionToReducedExternalForces(time)
return reducedExternalForces
[docs]
def ComputeReducedInternalForcesAndTangentMatrix(onlineProblemData, operatorCompressionData, before, after, time):
"""
Computes the reduced internal forces vector and tange
Parameters
----------
onlineProblemData : OnlineProblemData
containing the description of the problem for which a reduced solution is computed
operatorCompressionData : OperatorCompressionDataMechanical
used to store data during the operator compression step of the POD-ECM method.
before : 1D np.ndarray
of size (numberOfModes,), coefficients of the reduced solution at the previous time step
after : 1D np.ndarray
of size (numberOfModes,), coefficients of the reduced solution at the current time step,
for which the residual will be computed during the reduced Newton algorithm
Returns
-------
1D np.ndarray
reducedInternalForces, of size (numberOfModes,)
np.ndarray
reducedTangentMatrix, of size (numberOfModes,numberOfModes)
"""
onlineData = onlineProblemData.GetOnlineData("U")
indicesOfReducedIntegPointsPerMaterial = onlineData.GetIndicesOfReducedIntegPointsPerMaterial()
numberOfSigmaComponents = onlineData.GetNumberOfSigmaComponents()
nReducedIntegrationPoints = onlineData.GetNReducedIntegrationPoints()
reducedIntegrationWeights = operatorCompressionData.GetReducedIntegrationWeights()
reducedEpsilonAtReducedIntegPoints = operatorCompressionData.GetReducedEpsilonAtReducedIntegPoints()
#onlineCompressionData['sigIntForces'] = np.zeros((nReducedIntegrationPoints,numberOfSigmaComponents))
onlineData.SetStrainAtReducedIntegrationPoints0(np.dot(reducedEpsilonAtReducedIntegPoints, before).T)
onlineData.SetStrainAtReducedIntegrationPoints1(np.dot(reducedEpsilonAtReducedIntegPoints, after).T)
constLaws = onlineProblemData.GetConstitutiveLaws()
sigma = np.empty((nReducedIntegrationPoints,numberOfSigmaComponents))
localTgtMat = np.empty((nReducedIntegrationPoints,numberOfSigmaComponents,numberOfSigmaComponents))
for tag, intPoints in indicesOfReducedIntegPointsPerMaterial.items():
if ('U', 'temperature', 'ALLNODE') in onlineProblemData.loadings:
temperature = onlineData.GetTemperatureAtReducedIntegrationPoints0()[intPoints]
dtemp = onlineData.GetTemperatureAtReducedIntegrationPoints1()[intPoints] - onlineData.GetTemperatureAtReducedIntegrationPoints0()[intPoints]
else:
temperature = 20.+np.zeros(intPoints.shape[0]) #pragma: no cover
dtemp = np.zeros(intPoints.shape[0]) #pragma: no cover
stran = onlineData.GetStrainAtReducedIntegrationPoints0()[intPoints]
dstran = onlineData.GetStrainAtReducedIntegrationPoints1()[intPoints] - onlineData.GetStrainAtReducedIntegrationPoints0()[intPoints]
statev = np.copy(onlineData.GetStateVarAtReducedIntegrationPoints0(tag))
lawTag = ('mechanical', tag)
ddsdde, stress, statev = constLaws[lawTag].ComputeConstitutiveLaw(temperature, dtemp, stran, dstran, statev)
sigma[intPoints,:] = stress
localTgtMat[intPoints,:,:] = ddsdde
#Set dual state output values (overwritten at each Newton iteration, kept at convergence)
stran1 = onlineData.GetStrainAtReducedIntegrationPoints1()[intPoints]
onlineData.SetDualStateAndVarOutput(tag, time, stran1, stress, statev)
reducedInternalForces = np.einsum('l,mlk,lm->k', reducedIntegrationWeights, reducedEpsilonAtReducedIntegPoints, sigma, optimize = True)
reducedTangentMatrix = np.einsum('l,mlk,lmn,nlo->ko', reducedIntegrationWeights, reducedEpsilonAtReducedIntegPoints, localTgtMat, reducedEpsilonAtReducedIntegPoints, optimize = True)
return reducedInternalForces, reducedTangentMatrix
[docs]
def PreCompressOperator(mesh, integrationRule = LagrangeIsoParamQuadrature):
"""
Preliminary operator compression step for the POD-ECM method for a
mechanical problem, precomputing quantities that only depend on the mesh
Requires naming the displacement solution "U" and the stress solution
"sigma"
Parameters
----------
mesh: MuscatMesh
high-dimensional mesh
Returns
-------
OperatorPreCompressionDataMechanical
data structure used in a precomputation step
of the operator compression step of the POD-ECM method
"""
listOfTags = FT.ComputeIntegrationPointsTags(mesh, mesh.GetDimensionality(), integrationRule = integrationRule)
integrationWeights, gradPhiAtIntegPoint = FT.ComputeGradPhiAtIntegPoint(mesh, integrationRule = integrationRule)
operatorPreCompressionData = OPCDM.OperatorPreCompressionDataMechanical("U", gradPhiAtIntegPoint, integrationWeights, listOfTags)
return operatorPreCompressionData
[docs]
def CompressOperator(
collectionProblemData, operatorPreCompressionData, mesh, tolerance,
listNameDualVarOutput = None, listNameDualVarGappyIndicesforECM = None,
toleranceCompressSnapshotsForRedQuad = 0., methodDualReconstruction = "GappyPOD",
timeSequenceForDualReconstruction = None):
"""
Operator compression step for the POD-ECM method for a mechanical problem
Requires naming the displacement solution "U" and the stress solution
"sigma"
Parameters
----------
collectionProblemData : CollectionProblemData
definition of the training data in a CollectionProblemData object
operatorPreCompressionData : OperatorPreCompressionDataMechanical
ata structure used in a precomputation step
of the operator compression step of the POD-ECM method
mesh: MuscatMesh
high-dimensional mesh
tolerance : float
tolerance for the Empirical Cubature Method (ECM)
listNameDualVarOutput : list of strings, optional
names of dual quantities to reconstruct on complete mesh
listNameDualVarGappyIndicesforECM : list of strings, optional
names of dual quantities for which the indices of the POD are added to
the reduced integration points list
toleranceCompressSnapshotsForRedQuad : float, optional
if > 0., sigma is compressed using snapshots POD before applying ECM
methodDualReconstruction : str, optional
method used to reconstruct the dual quantities from the reduced
integration points to the complete mesh, "GappyPOD" or "MetaModel"
timeSequenceForDualReconstruction : list or 1D np.ndarray, optional
time sequence used to train the dual quantities reconstruction
algorithm
"""
print("CompressOperator starting..."); sys.stdout.flush()
if toleranceCompressSnapshotsForRedQuad > 0:
collectionProblemData.DefineQuantity("SigmaECM")
operatorCompressionData = collectionProblemData.GetOperatorCompressionData("U")
if operatorCompressionData == None:
operatorCompressionData = OCDM.OperatorCompressionDataMechanical("U")
operatorCompressionData.SetOperatorPreCompressionData(operatorPreCompressionData)
collectionProblemData.AddOperatorCompressionData(operatorCompressionData)
listOfTags = operatorCompressionData.GetListOfTags()
integrationWeights = operatorCompressionData.GetIntegrationWeights()
#gradPhiAtIntegPoint = operatorCompressionData.GetGradPhiAtIntegPoint()
#numberOfIntegrationPoints = operatorCompressionData.GetNumberOfIntegrationPoints()
#reducedOrderBasis = collectionProblemData.GetReducedOrderBasis("U")
#numberOfModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("U")
#numberOfSigmaComponents = collectionProblemData.GetSolutionsNumberOfComponents("sigma")
import time
start = time.time()
reducedEpsilonAtIntegPoints = ReduceIntegrator(collectionProblemData, mesh)
#if listNameDualVarGappyIndicesforECM is None:
# listNameDualVarGappyIndicesforECM = []# pragma: no cover
#
#if not operatorCompressionData:
# operatorCompressionData["reducedIntegrationPoints"] = []
sigmaEpsilon = ComputeSigmaEpsilon(collectionProblemData, reducedEpsilonAtIntegPoints, tolerance, toleranceCompressSnapshotsForRedQuad)
imposedIndices = []
if listNameDualVarGappyIndicesforECM is not None:
for name in listNameDualVarGappyIndicesforECM:
imposedIndices += list(EIM.QDEIM(collectionProblemData.GetReducedOrderBasis(name)))
imposedIndices = list(set(imposedIndices))
reducedIntegrationPointsInitSet = operatorCompressionData.GetReducedIntegrationPoints()
print("Prepare ECM duration = "+str(time.time()-start)+" s"); sys.stdout.flush()
reducedIntegrationPoints, reducedIntegrationWeights = RQP.ComputeReducedIntegrationScheme(integrationWeights, sigmaEpsilon, tolerance, imposedIndices = imposedIndices, reducedIntegrationPointsInitSet = reducedIntegrationPointsInitSet)
#reducedIntegrationPoints, reducedIntegrationWeights = np.arange(integrationWeights.shape[0]), integrationWeights
#hyperreduced operator
reducedEpsilonAtReducedIntegPoints = reducedEpsilonAtIntegPoints[:,reducedIntegrationPoints,:]
reducedListOTags = [listOfTags[intPoint] for intPoint in reducedIntegrationPoints]
for i, listOfTags in enumerate(reducedListOTags):
reducedListOTags[i].append("ALLELEMENT")
#hyperreduce boundary conditions
#reducedOrderBases = collectionProblemData.GetReducedOrderBases
#integrationWeightsRadiation, phiAtIntegPointRadiation = FT.ComputePhiAtIntegPoint(mesh)
#for _, problemData in collectionProblemData.GetProblemDatas().items():
# centrifugalLoading = problemData.GetLoadingsOfType('centrifugal')
# unAssembledReducedUnitCentrifugalVector, _ = centrifugalLoading.ReduceLoading(mesh, problemData, reducedOrderBases, integrationWeights = integrationWeightsRadiation, phiAtIntegPoint = phiAtIntegPointRadiation)"""
dualReconstructionData = LearnDualReconstruction(collectionProblemData, listNameDualVarOutput, reducedIntegrationPoints, methodDualReconstruction, timeSequenceForDualReconstruction)
operatorCompressionData.SetReducedIntegrationPoints(reducedIntegrationPoints)
operatorCompressionData.SetReducedIntegrationWeights(reducedIntegrationWeights)
operatorCompressionData.SetReducedListOTags(reducedListOTags)
operatorCompressionData.SetReducedEpsilonAtReducedIntegPoints(reducedEpsilonAtReducedIntegPoints)
operatorCompressionData.SetDualReconstructionData(dualReconstructionData)
[docs]
def ComputeSigmaEpsilon(collectionProblemData, reducedEpsilonAtIntegPoints, tolerance, toleranceCompressSnapshotsForRedQuad):
r"""
Computes sigma double contracted with epsilon at the reduced integration points:
:math:`\sigma(u_i):\epsilon(\Psi)(x_k)` where :math:`\sigma(u_i)` are the stress snapshots,
:math:`\Psi` is a POD mode and :math:`x_k` are the integration points
Parameters
----------
collectionProblemData : CollectionProblemData
definition of the training data in a CollectionProblemData object
reducedEpsilonAtIntegPoints: np.ndarray
of size (numberOfSigmaComponents, numberOfIntegrationPoints, numberOfModes),
the reduced integrator
tolerance : float
tolerance for the Empirical Cubature Method (ECM)
toleranceCompressSnapshotsForRedQuad : float
if >0, the snigma snapshots are compressed before the ECM is applied, and
this parameters defines the compression error
Returns
-------
np.ndarray
sigmaEpsilon, of size (numberOfModes*(numberOfSnapshots-1),numberOfIntegrationPoints),
the first sigma snapshots at the initial time is ignored
"""
numberOfSigmaComponents = collectionProblemData.GetSolutionsNumberOfComponents("sigma")
numberOfModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("U")
numberOfIntegrationPoints = collectionProblemData.GetOperatorCompressionData("U").GetNumberOfIntegrationPoints()
snapshotsSigma = collectionProblemData.GetSnapshots("sigma", skipFirst = True)
if toleranceCompressSnapshotsForRedQuad > 0.:
SP.CompressData(collectionProblemData, "SigmaECM", tolerance, snapshots = snapshotsSigma)
reducedOrderBasisSigmaEspilon = collectionProblemData.GetReducedOrderBasis("SigmaECM")
reducedOrderBasisSigmaEspilonShape = reducedOrderBasisSigmaEspilon.shape
numberOfSigmaModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("SigmaECM")
reducedOrderBasisSigmaEspilon.shape = (numberOfSigmaModes,numberOfSigmaComponents,numberOfIntegrationPoints)
sigmaEpsilon = np.einsum('mlk,pml->pkl', reducedEpsilonAtIntegPoints, reducedOrderBasisSigmaEspilon, optimize = True).reshape(numberOfModes*numberOfSigmaModes,numberOfIntegrationPoints)
reducedOrderBasisSigmaEspilon.shape = reducedOrderBasisSigmaEspilonShape
else:
snapshotsSigmaShape = snapshotsSigma.shape
numberOfSigmaSnapshots = collectionProblemData.GetGlobalNumberOfSnapshots("sigma", skipFirst = True)
snapshotsSigma.shape = (numberOfSigmaSnapshots,numberOfSigmaComponents,numberOfIntegrationPoints)
sigmaEpsilon = np.einsum('mlk,pml->pkl', reducedEpsilonAtIntegPoints, snapshotsSigma, optimize = True).reshape(numberOfModes*numberOfSigmaSnapshots,numberOfIntegrationPoints)
snapshotsSigma.shape = snapshotsSigmaShape
return sigmaEpsilon
[docs]
def ReduceIntegrator(collectionProblemData, mesh):
r"""
Computes the reduced integrator, named reducedEpsilonAtIntegPoints.
:math:`\epsilon(u_t)(x_k)`, where u_t is a test displacement and x_k are the
integration points is called integrator, since the internal forces vector
is obtained using :math:`\sigma:\epsilon(u_t)(x_k)`. reducedEpsilonAtIntegPoints
denotes :math:`\epsilon(\Psi)(x_k)`, where :math:`\Psi` is a POD mode and
:math:`x_k` are the integration points.
Parameters
----------
collectionProblemData : CollectionProblemData
definition of the training data in a CollectionProblemData object
mesh: MuscatMesh
high-dimensional mesh
Returns
-------
np.ndarray
reducedEpsilonAtIntegPoints, of size (numberOfSigmaComponents,numberOfIntegrationPoints,numberOfModes)
"""
operatorCompressionData = collectionProblemData.GetOperatorCompressionData("U")
numberOfIntegrationPoints = operatorCompressionData.GetNumberOfIntegrationPoints()
gradPhiAtIntegPoint = operatorCompressionData.GetGradPhiAtIntegPoint()
uNumberOfComponents = collectionProblemData.GetSolutionsNumberOfComponents("U")
numberOfSigmaComponents = collectionProblemData.GetSolutionsNumberOfComponents("sigma")
numberOfNodes = mesh.GetNumberOfNodes()
numberOfModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("U")
reducedOrderBasis = collectionProblemData.GetReducedOrderBasis("U")
componentReducedOrderBasis = []
for i in range(uNumberOfComponents):
componentReducedOrderBasis.append(reducedOrderBasis[:,i*numberOfNodes:(i+1)*numberOfNodes].T)
reducedEpsilonAtIntegPoints = np.empty((numberOfSigmaComponents, numberOfIntegrationPoints, numberOfModes))
count = 0
for i in range(uNumberOfComponents):
reducedEpsilonAtIntegPoints[count,:,:] = gradPhiAtIntegPoint[i].dot(componentReducedOrderBasis[i])
count += 1
for ind in secondOrderTensorOffDiagComponents[uNumberOfComponents]:
i = ind[0]
j = ind[1]
reducedEpsilonAtIntegPoints[count,:,:] = gradPhiAtIntegPoint[i].dot(componentReducedOrderBasis[j]) + gradPhiAtIntegPoint[j].dot(componentReducedOrderBasis[i])
count += 1
return reducedEpsilonAtIntegPoints
[docs]
def LearnDualReconstruction(collectionProblemData, listNameDualVarOutput, reducedIntegrationPoints, methodDualReconstruction, timeSequenceForDualReconstruction = None, snapshotsAtReducedIntegrationPoints = None, regressor = None, paramGrid = None):
"""
Train the agorithm of reconstruction of the dual quantities from the
reduced integration points to the complete mesh: "GappyPOD" or "MetaModel"
Parameters
----------
collectionProblemData : CollectionProblemData
definition of the training data in a CollectionProblemData object
listNameDualVarOutput : list of strings, optional
names of dual quantities to reconstruct on complete mesh
reducedIntegrationPoints : np.ndarray
of size (nReducedIntegrationPoints,), dtype = int
indices of the reduced integration points
methodDualReconstruction : str
method used to reconstruct the dual quantities from the reduced
integration points to the complete mesh, "GappyPOD" or "MetaModel"
timeSequenceForDualReconstruction : list or 1D np.ndarray, optional
time sequence used to train the dual quantities reconstruction
algorithm
snapshotsAtReducedIntegrationPoints : dict, optional
dual quantity at reduced integrationPoints, for which reconstruction is
trained
regressor : object satisfying the scikit-learn regressors API, optional
regressor used for the method "MetaModel"
paramGrid : dict, optional
of lists (of floats) containing hyperparameter values of the considered
regressor
Returns
-------
dict
dictionary containing data used for reconstructing dual quantities in
the online stage, with key:values:
"methodDualReconstruction : str
"GappyPOD", "MetaModel" or "None"
name of dual quantities (e.g. "evrcum"):
- if "MetaModel" : tuple
model: sklearn.model_selection._search.GridSearchCV
scalerX: sklearn.preprocessing._data.StandardScaler
scalery: sklearn.preprocessing._data.StandardScaler
- if "GappyPOD" : tuple
reducedOrderBasisAtReducedIntegrationPoints: np.ndarray
of size (numberOfModes, nReducedIntegrationPoints)
- if "None" : empty dict
Notes
-----
Regressor and paramGrid mush be both None or both specified
"""
if listNameDualVarOutput is None:
listNameDualVarOutput = []# pragma: no cover
dualReconstructionData = {}
if methodDualReconstruction == "GappyPOD":
dualReconstructionData["methodDualReconstruction"] = "GappyPOD"
for name in listNameDualVarOutput:
dualReconstructionData[name] = collectionProblemData.GetReducedOrderBasis(name)[:,reducedIntegrationPoints]
elif methodDualReconstruction == "MetaModel":
dualReconstructionData["methodDualReconstruction"] = "MetaModel"
for name in listNameDualVarOutput:
print("Running MetaModel for solutionName =", name)
if timeSequenceForDualReconstruction is None:
solutionTimeSteps = collectionProblemData.GetSolutionTimeSteps(name, skipFirst = True)[:,np.newaxis]
if snapshotsAtReducedIntegrationPoints is None:
localSnapshotsAtReducedIntegrationPoints = collectionProblemData.GetSnapshots(name, skipFirst = True)[:,reducedIntegrationPoints]
else:# pragma: no cover
raise RuntimeError("cannot specify snapshotsAtReducedIntegrationPoints when timeSequenceForDualReconstruction is None")
yTrain = collectionProblemData.GetReducedCoordinates(name, skipFirst = True)
else:
solutionTimeSteps = np.array(timeSequenceForDualReconstruction)[:,np.newaxis]
if snapshotsAtReducedIntegrationPoints is None:
localSnapshotsAtReducedIntegrationPoints = collectionProblemData.GetSnapshotsAtTimes(name, timeSequenceForDualReconstruction)[:,reducedIntegrationPoints]
else:
localSnapshotsAtReducedIntegrationPoints = snapshotsAtReducedIntegrationPoints[name]
yTrain = collectionProblemData.GetReducedCoordinatesAtTimes(name, timeSequenceForDualReconstruction)
solutionTimeSteps = np.tile(solutionTimeSteps, (collectionProblemData.GetNumberOfProblemDatas(),1))
#print("solutionTimeSteps =", solutionTimeSteps)
#print("localSnapshotsAtReducedIntegrationPoints =", localSnapshotsAtReducedIntegrationPoints)
assert solutionTimeSteps.shape[0] == localSnapshotsAtReducedIntegrationPoints.shape[0], "number of time steps different from number of snapshots at reduced integration points"
#XTrain = np.hstack((solutionTimeSteps, localSnapshotsAtReducedIntegrationPoints))
XTrain = localSnapshotsAtReducedIntegrationPoints
from Mordicus.BasicAlgorithms import ScikitLearnRegressor as SLR
"""
from sklearn.gaussian_process.kernels import WhiteKernel, ConstantKernel, Matern
#from sklearn.gaussian_process import GaussianProcessRegressor
kernel = Matern(length_scale=1., nu=2.5) + ConstantKernel(constant_value=1.0, constant_value_bounds=(0.001, 1.0)) * WhiteKernel(noise_level=1, noise_level_bounds=(1e-10, 1e0))
#regressor = GaussianProcessRegressor(kernel=kernel)
regressor = SLR.MyGPR(kernel=kernel)
paramGrid = {'kernel__k1__length_scale': [0.1, 1., 10.], 'kernel__k1__nu':[1.5, 2.5], 'kernel__k2__k1__constant_value':[0.001, 0.01]}#, 'kernel__k2__k2__noise_level':[1, 2]}
"""
#from pprint import pprint
#pprint(vars(regressor))
#print("regressor.get_params().keys() =", regressor.get_params().keys())
if regressor is None and paramGrid is None:
from sklearn import linear_model
regressor = linear_model.Lasso(alpha=0.1, max_iter=int(1e8), tol=1e-5)
paramGrid = {'alpha': [0.1, 1.]}
if (regressor is None and paramGrid is not None) or (regressor is not None and paramGrid is None):#pragma: no cover
raise ValueError('regressor and paramGrid mush be both None or both specified')
model, scalerX, scalery = SLR.GridSearchCVRegression(regressor, paramGrid, XTrain, yTrain)
dualReconstructionData[name] = (model, scalerX, scalery)
else:# pragma: no cover
print(">> Not learning how to reconstructing dual variables")
return dualReconstructionData
[docs]
def ReconstructDualQuantity(nameDualQuantity, operatorCompressionData, onlineData, timeSequence):
r"""
Reconstruct a dual quantitie using a trained algorithm
Parameters
----------
nameDualQuantity : str
name of the dual quantity to reconstruct
operatorCompressionData : OperatorCompressionDataMechanical
used in the operator compression step of the POD-ECM method.
onlineData : OnlineDataMechanical
used in the online stage to store quantities required for the tracking
of certain quantites, e.g. the internal state variable at the reduced
integration points, required to compute the constitutive law.
timeSequence : 1D np.ndarray, optional
time sequence used to train the dual quantities reconstruction
algorithm
Returns
-------
dict
with float as keys (time steps) and np.ndarray of size (numberOfModes)
as values (reducedCoordinates of the reconstructed dual quantity)
list
of floats, contains the resitual of the reconstruction if "GappyPOD" is
used, empty list otherwise
"""
onlineDualCompressedSolution = {}
#nTimeSteps = np.array(timeSequence).shape[0]
#nReducedIntegrationPoints = operatorCompressionData["reducedEpsilonAtReducedIntegPoints"].shape[1]
#fieldAtMask = np.zeros((nTimeSteps, nReducedIntegrationPoints))
#localIndex = {}
#for tag in onlineCompressionData['indicesOfReducedIntegPointsPerMaterial'].keys():
# if nameDualQuantity in onlineCompressionData['dualVarOutputNames'][tag]:
# localIndex[tag] = onlineCompressionData['dualVarOutputNames'][tag].index(nameDualQuantity)
#for i, time in enumerate(timeSequence):
# for tag, intPoints in onlineCompressionData['indicesOfReducedIntegPointsPerMaterial'].items():
# if tag in localIndex:
# fieldAtMask[i, intPoints] = onlineCompressionData['dualVarOutput'][tag][time][:,localIndex[tag]]
fieldAtMask = GetOnlineDualQuantityAtReducedIntegrationPoints(nameDualQuantity, onlineData, timeSequence)
#print(nameDualQuantity, fieldAtMask)
reconstructionResidual = []
methodDualReconstruction = operatorCompressionData.GetDualReconstructionData()["methodDualReconstruction"]
if methodDualReconstruction == "GappyPOD":
from genericROM.BasicAlgorithms import GappyPOD as GP
ModesAtMask = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity]
for i, time in enumerate(timeSequence):
onlineDualCompressedSolution[time], error = GP.FitAndCost(ModesAtMask, fieldAtMask[i])
reconstructionResidual.append(error)
elif methodDualReconstruction == "MetaModel":
from Mordicus.BasicAlgorithms import ScikitLearnRegressor as SLR
model = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity][0]
scalerX = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity][1]
scalery = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity][2]
#xTest = np.hstack((np.array(timeSequence)[:,np.newaxis], fieldAtMask))
xTest = fieldAtMask
y = SLR.ComputeRegressionApproximation(model, scalerX, scalery, xTest)
for i, time in enumerate(timeSequence):
onlineDualCompressedSolution[time] = y[i]
else:# pragma: no cover
raise NameError(methodDualReconstruction+" not available")
return onlineDualCompressedSolution, reconstructionResidual
[docs]
def GetOnlineDualQuantityAtReducedIntegrationPoints(nameDualQuantity, onlineData, timeSequence):
r"""
Reconstruct a dual quantitie using a trained algorithm
Parameters
----------
nameDualQuantity : str
name of the dual quantity to reconstruct
onlineData : OnlineDataMechanical
used in the online stage to store quantities required for the tracking
of certain quantites, e.g. the internal state variable at the reduced
integration points, required to compute the constitutive law.
timeSequence : 1D np.ndarray, optional
time sequence used to train the dual quantities reconstruction
algorithm
Returns
-------
np.ndarray
of size(nTimeSteps, nReducedIntegrationPoints), contains the online
dual quantity of name "nameDualQuantity" at reduced integration points
"""
nTimeSteps = np.array(timeSequence).shape[0]
nReducedIntegrationPoints = 0
localIndex = {}
for tag, intPoints in onlineData.GetIndicesOfReducedIntegPointsPerMaterial().items():
nReducedIntegrationPoints += intPoints.shape[0]
if nameDualQuantity in onlineData.GetDualVarOutputNames(tag):
localIndex[tag] = onlineData.GetDualVarOutputNames(tag).index(nameDualQuantity)
fieldAtMask = np.zeros((nTimeSteps, nReducedIntegrationPoints))
for i, time in enumerate(timeSequence):
for tag, intPoints in onlineData.GetIndicesOfReducedIntegPointsPerMaterial().items():
if tag in localIndex:
fieldAtMask[i, intPoints] = onlineData.GetDualVarOutput(tag)[time][:,localIndex[tag]]
return fieldAtMask
if __name__ == "__main__":# pragma: no cover
from genericROM import RunTestFile
RunTestFile(__file__)