Source code for genericROM.OperatorCompressors.Mechanical

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
#

import os, sys
try:
    from mpi4py import MPI
except ImportError:# pragma: no cover
    print("MPI capabilities not available")
import numpy as np

from genericROM.FE import FETools as FT
from genericROM.OperatorCompressors import ReducedQuadratureProcedure as RQP
from genericROM.BasicAlgorithms import EIM
from genericROM.DataCompressors import FusedSnapshotPOD as SP

from genericROM.Containers.OperatorCompressionData import OperatorCompressionDataMechanical as OCDM
from genericROM.Containers.OperatorCompressionData import OperatorPreCompressionDataMechanical as OPCDM
from genericROM.Containers.OnlineData import OnlineDataMechanical as ODM
from Muscat.FE.IntegrationRules import LagrangeIsoParamQuadrature


secondOrderTensorOffDiagComponents = {1:[], 2:[(0,1)], 3:[(0,1), (1,2), (0,2)]}




[docs] def PrepareOnline(onlineProblemData, operatorCompressionData): """ Prepares a online computation by constructing an onlineData and adding it to the onlineProblemData Parameters ---------- onlineProblemData : OnlineProblemData containing the description of the problem for which a reduced solution is computed operatorCompressionData : OperatorCompressionDataMecanical containing the data structure generated by the data compression step for POD-ECM for a mechanical problem """ nReducedIntegrationPoints = operatorCompressionData.GetNumberOfReducedIntegrationPoints() numberOfSigmaComponents = operatorCompressionData.GetNumberOfSigmaComponents() onlineData = ODM.OnlineDataMechanical("U", nReducedIntegrationPoints, numberOfSigmaComponents) constitutiveLawSets = onlineProblemData.GetSetsOfConstitutiveOfType("mechanical") reducedListOTags = operatorCompressionData.GetReducedListOTags() indicesOfReducedIntegPointsPerMaterial = FT.ComputeIndicesOfIntegPointsPerMaterial(reducedListOTags, constitutiveLawSets) for tag, reducedIntegPoints in indicesOfReducedIntegPointsPerMaterial.items(): localNbReducedIntegPoints = len(reducedIntegPoints) lawTag = ('mechanical', tag) law = onlineProblemData.GetConstitutiveLaws()[lawTag] var = law.GetOneConstitutiveLawVariable("var") nstatv = law.GetOneConstitutiveLawVariable("nstatv") onlineData.InitializeMaterial(tag, var, nstatv, localNbReducedIntegPoints) onlineData.SetIndicesOfReducedIntegPointsPerMaterial(indicesOfReducedIntegPointsPerMaterial) #onlineData.SetReducedData(operatorCompressionData) onlineProblemData.AddOnlineData(onlineData)
[docs] def ComputeOnline(onlineProblemData, timeSequence, operatorCompressionData, tolerance, onlineData = None, callback = None): """ Compute the online stage using the method POD and ECM for a mechanical problem Parameters ---------- onlineProblemData : OnlineProblemData containing the description of the problem for which a reduced solution is computed operatorCompressionData : OperatorCompressionDataMecanical containing the data structure generated by the data compression step for POD-ECM for a mechanical problem timeSequence : 1D np.ndarray time sequence where the reduced solution is computed tolerance : float stopping criterion for the Newton algorithm of the reduced problem onlineData : OnlineDataMechanical, optional used in the online stage to store quantities required for the tracking of certain quantites, e.g. the internal state variable at the reduced integration points, required to compute the constitutive law. callback : function, optional used to override console output messages during the reduced problem iterations Returns ------- dict onlineCompressedSolution, with time steps as keys and 1D np.ndarray of size (numberOfModes,) """ currentFolder = os.getcwd() folder = currentFolder + os.sep + onlineProblemData.GetDataFolder() os.chdir(folder) if onlineData is None and operatorCompressionData is not None: PrepareOnline(onlineProblemData, operatorCompressionData) onlineData = onlineProblemData.GetOnlineData("U") initialCondition = onlineProblemData.GetInitialCondition() onlineCompressedSolution = {} onlineCompressedSolution[timeSequence[0]] = initialCondition.GetReducedInitialSnapshot("U") onlineData.UpdateInternalStateAtReducedIntegrationPoints(timeSequence[0]) for timeStep in range(1, len(timeSequence)): previousTime = timeSequence[timeStep-1] time = timeSequence[timeStep] dtime = time - previousTime if callback == None: print("time =", time); sys.stdout.flush() else: callback.CurrentTime(timeStep, time) reducedExternalForcesTemp = PrepareNewtonIterations(onlineProblemData, time, dtime) reducedExternalForces = np.zeros(reducedExternalForcesTemp.shape) MPI.COMM_WORLD.Allreduce([reducedExternalForcesTemp, MPI.DOUBLE], [reducedExternalForces, MPI.DOUBLE]) normExt = np.linalg.norm(reducedExternalForces) before = onlineCompressedSolution[previousTime]# np.copy in niROM after = before# np.copy in niROM onlineCompressedSolution[time] = np.copy(before) reducedInternalForcesTemp, reducedTangentMatrixTemp = ComputeReducedInternalForcesAndTangentMatrix(onlineProblemData, operatorCompressionData, before, after, time) reducedInternalForces = np.zeros(reducedInternalForcesTemp.shape) MPI.COMM_WORLD.Allreduce([reducedInternalForcesTemp, MPI.DOUBLE], [reducedInternalForces, MPI.DOUBLE]) if normExt>0: normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces)/normExt else: normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces) # pragma: no cover count = 0 while normRes > tolerance: reducedTangentMatrix = np.zeros(reducedTangentMatrixTemp.shape) MPI.COMM_WORLD.Allreduce([reducedTangentMatrixTemp, MPI.DOUBLE], [reducedTangentMatrix, MPI.DOUBLE]) sol = np.linalg.solve(reducedTangentMatrix, reducedExternalForces-reducedInternalForces) onlineCompressedSolution[time] += sol after = onlineCompressedSolution[time]# np.copy in niROM reducedInternalForcesTemp, reducedTangentMatrixTemp = ComputeReducedInternalForcesAndTangentMatrix(onlineProblemData, operatorCompressionData, before, after, time) reducedInternalForces = np.zeros(reducedInternalForcesTemp.shape) MPI.COMM_WORLD.Allreduce([reducedInternalForcesTemp, MPI.DOUBLE], [reducedInternalForces, MPI.DOUBLE]) if normExt>0: normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces)/normExt else: normRes = np.linalg.norm(reducedExternalForces-reducedInternalForces)# pragma: no cover if callback == None: print("normRes =", normRes); sys.stdout.flush() else: callback.CurrentNormRes(normRes) count += 1 if count == 200: raise RuntimeError("problem could not converge after 200 iterations") # pragma: no cover #solution is set: now update Internal Variables: onlineData.UpdateInternalStateAtReducedIntegrationPoints(time) constLaws = onlineProblemData.GetConstitutiveLaws() for tag, intPoints in onlineData.GetIndicesOfReducedIntegPointsPerMaterial().items(): lawTag = ('mechanical', tag) constLaws[lawTag].UpdateInternalState() if callback == None: print("=== Newton iterations:", count); sys.stdout.flush() else: callback.CurrentNewtonIterations(count) os.chdir(currentFolder) onlineProblemData.AddOnlineData(onlineData) return onlineCompressedSolution
[docs] def PrepareNewtonIterations(onlineProblemData, time, dtime): """ Prepares the Newton iteration by computing the reduced external forces vector (depending on the loading) and updating the temperature loading to the onlineData. Parameters ---------- onlineProblemData : OnlineProblemData containing the description of the problem for which a reduced solution is computed time : float previous time where the reduced solution hasbeen computed dtime : float time increment Returns ------- 1D np.ndarray reducedExternalForces, of size (numberOfModes,) """ onlineData = onlineProblemData.GetOnlineData("U") for law in onlineProblemData.GetConstitutiveLawsOfType("mechanical"): law.SetOneConstitutiveLawVariable('dtime', dtime) if ('U', 'temperature', 'ALLNODE') in onlineProblemData.loadings: temperatureAtReducedIntegrationPoints0 = onlineProblemData.loadings[('U', 'temperature', 'ALLNODE')].GetTemperatureAtReducedIntegrationPointsAtTime(time-dtime) temperatureAtReducedIntegrationPoints1 = onlineProblemData.loadings[('U', 'temperature', 'ALLNODE')].GetTemperatureAtReducedIntegrationPointsAtTime(time) onlineData.UpdateTemperatureAtReducedIntegrationPoints(temperatureAtReducedIntegrationPoints0, temperatureAtReducedIntegrationPoints1) reducedExternalForces = 0. for loading in onlineProblemData.GetLoadings().values(): if loading.GetSolutionName() == "U": reducedExternalForces += loading.ComputeContributionToReducedExternalForces(time) return reducedExternalForces
[docs] def ComputeReducedInternalForcesAndTangentMatrix(onlineProblemData, operatorCompressionData, before, after, time): """ Computes the reduced internal forces vector and tange Parameters ---------- onlineProblemData : OnlineProblemData containing the description of the problem for which a reduced solution is computed operatorCompressionData : OperatorCompressionDataMechanical used to store data during the operator compression step of the POD-ECM method. before : 1D np.ndarray of size (numberOfModes,), coefficients of the reduced solution at the previous time step after : 1D np.ndarray of size (numberOfModes,), coefficients of the reduced solution at the current time step, for which the residual will be computed during the reduced Newton algorithm Returns ------- 1D np.ndarray reducedInternalForces, of size (numberOfModes,) np.ndarray reducedTangentMatrix, of size (numberOfModes,numberOfModes) """ onlineData = onlineProblemData.GetOnlineData("U") indicesOfReducedIntegPointsPerMaterial = onlineData.GetIndicesOfReducedIntegPointsPerMaterial() numberOfSigmaComponents = onlineData.GetNumberOfSigmaComponents() nReducedIntegrationPoints = onlineData.GetNReducedIntegrationPoints() reducedIntegrationWeights = operatorCompressionData.GetReducedIntegrationWeights() reducedEpsilonAtReducedIntegPoints = operatorCompressionData.GetReducedEpsilonAtReducedIntegPoints() #onlineCompressionData['sigIntForces'] = np.zeros((nReducedIntegrationPoints,numberOfSigmaComponents)) onlineData.SetStrainAtReducedIntegrationPoints0(np.dot(reducedEpsilonAtReducedIntegPoints, before).T) onlineData.SetStrainAtReducedIntegrationPoints1(np.dot(reducedEpsilonAtReducedIntegPoints, after).T) constLaws = onlineProblemData.GetConstitutiveLaws() sigma = np.empty((nReducedIntegrationPoints,numberOfSigmaComponents)) localTgtMat = np.empty((nReducedIntegrationPoints,numberOfSigmaComponents,numberOfSigmaComponents)) for tag, intPoints in indicesOfReducedIntegPointsPerMaterial.items(): if ('U', 'temperature', 'ALLNODE') in onlineProblemData.loadings: temperature = onlineData.GetTemperatureAtReducedIntegrationPoints0()[intPoints] dtemp = onlineData.GetTemperatureAtReducedIntegrationPoints1()[intPoints] - onlineData.GetTemperatureAtReducedIntegrationPoints0()[intPoints] else: temperature = 20.+np.zeros(intPoints.shape[0]) #pragma: no cover dtemp = np.zeros(intPoints.shape[0]) #pragma: no cover stran = onlineData.GetStrainAtReducedIntegrationPoints0()[intPoints] dstran = onlineData.GetStrainAtReducedIntegrationPoints1()[intPoints] - onlineData.GetStrainAtReducedIntegrationPoints0()[intPoints] statev = np.copy(onlineData.GetStateVarAtReducedIntegrationPoints0(tag)) lawTag = ('mechanical', tag) ddsdde, stress, statev = constLaws[lawTag].ComputeConstitutiveLaw(temperature, dtemp, stran, dstran, statev) sigma[intPoints,:] = stress localTgtMat[intPoints,:,:] = ddsdde #Set dual state output values (overwritten at each Newton iteration, kept at convergence) stran1 = onlineData.GetStrainAtReducedIntegrationPoints1()[intPoints] onlineData.SetDualStateAndVarOutput(tag, time, stran1, stress, statev) reducedInternalForces = np.einsum('l,mlk,lm->k', reducedIntegrationWeights, reducedEpsilonAtReducedIntegPoints, sigma, optimize = True) reducedTangentMatrix = np.einsum('l,mlk,lmn,nlo->ko', reducedIntegrationWeights, reducedEpsilonAtReducedIntegPoints, localTgtMat, reducedEpsilonAtReducedIntegPoints, optimize = True) return reducedInternalForces, reducedTangentMatrix
[docs] def PreCompressOperator(mesh, integrationRule = LagrangeIsoParamQuadrature): """ Preliminary operator compression step for the POD-ECM method for a mechanical problem, precomputing quantities that only depend on the mesh Requires naming the displacement solution "U" and the stress solution "sigma" Parameters ---------- mesh: MuscatMesh high-dimensional mesh Returns ------- OperatorPreCompressionDataMechanical data structure used in a precomputation step of the operator compression step of the POD-ECM method """ listOfTags = FT.ComputeIntegrationPointsTags(mesh, mesh.GetDimensionality(), integrationRule = integrationRule) integrationWeights, gradPhiAtIntegPoint = FT.ComputeGradPhiAtIntegPoint(mesh, integrationRule = integrationRule) operatorPreCompressionData = OPCDM.OperatorPreCompressionDataMechanical("U", gradPhiAtIntegPoint, integrationWeights, listOfTags) return operatorPreCompressionData
[docs] def CompressOperator( collectionProblemData, operatorPreCompressionData, mesh, tolerance, listNameDualVarOutput = None, listNameDualVarGappyIndicesforECM = None, toleranceCompressSnapshotsForRedQuad = 0., methodDualReconstruction = "GappyPOD", timeSequenceForDualReconstruction = None): """ Operator compression step for the POD-ECM method for a mechanical problem Requires naming the displacement solution "U" and the stress solution "sigma" Parameters ---------- collectionProblemData : CollectionProblemData definition of the training data in a CollectionProblemData object operatorPreCompressionData : OperatorPreCompressionDataMechanical ata structure used in a precomputation step of the operator compression step of the POD-ECM method mesh: MuscatMesh high-dimensional mesh tolerance : float tolerance for the Empirical Cubature Method (ECM) listNameDualVarOutput : list of strings, optional names of dual quantities to reconstruct on complete mesh listNameDualVarGappyIndicesforECM : list of strings, optional names of dual quantities for which the indices of the POD are added to the reduced integration points list toleranceCompressSnapshotsForRedQuad : float, optional if > 0., sigma is compressed using snapshots POD before applying ECM methodDualReconstruction : str, optional method used to reconstruct the dual quantities from the reduced integration points to the complete mesh, "GappyPOD" or "MetaModel" timeSequenceForDualReconstruction : list or 1D np.ndarray, optional time sequence used to train the dual quantities reconstruction algorithm """ print("CompressOperator starting..."); sys.stdout.flush() if toleranceCompressSnapshotsForRedQuad > 0: collectionProblemData.DefineQuantity("SigmaECM") operatorCompressionData = collectionProblemData.GetOperatorCompressionData("U") if operatorCompressionData == None: operatorCompressionData = OCDM.OperatorCompressionDataMechanical("U") operatorCompressionData.SetOperatorPreCompressionData(operatorPreCompressionData) collectionProblemData.AddOperatorCompressionData(operatorCompressionData) listOfTags = operatorCompressionData.GetListOfTags() integrationWeights = operatorCompressionData.GetIntegrationWeights() #gradPhiAtIntegPoint = operatorCompressionData.GetGradPhiAtIntegPoint() #numberOfIntegrationPoints = operatorCompressionData.GetNumberOfIntegrationPoints() #reducedOrderBasis = collectionProblemData.GetReducedOrderBasis("U") #numberOfModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("U") #numberOfSigmaComponents = collectionProblemData.GetSolutionsNumberOfComponents("sigma") import time start = time.time() reducedEpsilonAtIntegPoints = ReduceIntegrator(collectionProblemData, mesh) #if listNameDualVarGappyIndicesforECM is None: # listNameDualVarGappyIndicesforECM = []# pragma: no cover # #if not operatorCompressionData: # operatorCompressionData["reducedIntegrationPoints"] = [] sigmaEpsilon = ComputeSigmaEpsilon(collectionProblemData, reducedEpsilonAtIntegPoints, tolerance, toleranceCompressSnapshotsForRedQuad) imposedIndices = [] if listNameDualVarGappyIndicesforECM is not None: for name in listNameDualVarGappyIndicesforECM: imposedIndices += list(EIM.QDEIM(collectionProblemData.GetReducedOrderBasis(name))) imposedIndices = list(set(imposedIndices)) reducedIntegrationPointsInitSet = operatorCompressionData.GetReducedIntegrationPoints() print("Prepare ECM duration = "+str(time.time()-start)+" s"); sys.stdout.flush() reducedIntegrationPoints, reducedIntegrationWeights = RQP.ComputeReducedIntegrationScheme(integrationWeights, sigmaEpsilon, tolerance, imposedIndices = imposedIndices, reducedIntegrationPointsInitSet = reducedIntegrationPointsInitSet) #reducedIntegrationPoints, reducedIntegrationWeights = np.arange(integrationWeights.shape[0]), integrationWeights #hyperreduced operator reducedEpsilonAtReducedIntegPoints = reducedEpsilonAtIntegPoints[:,reducedIntegrationPoints,:] reducedListOTags = [listOfTags[intPoint] for intPoint in reducedIntegrationPoints] for i, listOfTags in enumerate(reducedListOTags): reducedListOTags[i].append("ALLELEMENT") #hyperreduce boundary conditions #reducedOrderBases = collectionProblemData.GetReducedOrderBases #integrationWeightsRadiation, phiAtIntegPointRadiation = FT.ComputePhiAtIntegPoint(mesh) #for _, problemData in collectionProblemData.GetProblemDatas().items(): # centrifugalLoading = problemData.GetLoadingsOfType('centrifugal') # unAssembledReducedUnitCentrifugalVector, _ = centrifugalLoading.ReduceLoading(mesh, problemData, reducedOrderBases, integrationWeights = integrationWeightsRadiation, phiAtIntegPoint = phiAtIntegPointRadiation)""" dualReconstructionData = LearnDualReconstruction(collectionProblemData, listNameDualVarOutput, reducedIntegrationPoints, methodDualReconstruction, timeSequenceForDualReconstruction) operatorCompressionData.SetReducedIntegrationPoints(reducedIntegrationPoints) operatorCompressionData.SetReducedIntegrationWeights(reducedIntegrationWeights) operatorCompressionData.SetReducedListOTags(reducedListOTags) operatorCompressionData.SetReducedEpsilonAtReducedIntegPoints(reducedEpsilonAtReducedIntegPoints) operatorCompressionData.SetDualReconstructionData(dualReconstructionData)
[docs] def ComputeSigmaEpsilon(collectionProblemData, reducedEpsilonAtIntegPoints, tolerance, toleranceCompressSnapshotsForRedQuad): r""" Computes sigma double contracted with epsilon at the reduced integration points: :math:`\sigma(u_i):\epsilon(\Psi)(x_k)` where :math:`\sigma(u_i)` are the stress snapshots, :math:`\Psi` is a POD mode and :math:`x_k` are the integration points Parameters ---------- collectionProblemData : CollectionProblemData definition of the training data in a CollectionProblemData object reducedEpsilonAtIntegPoints: np.ndarray of size (numberOfSigmaComponents, numberOfIntegrationPoints, numberOfModes), the reduced integrator tolerance : float tolerance for the Empirical Cubature Method (ECM) toleranceCompressSnapshotsForRedQuad : float if >0, the snigma snapshots are compressed before the ECM is applied, and this parameters defines the compression error Returns ------- np.ndarray sigmaEpsilon, of size (numberOfModes*(numberOfSnapshots-1),numberOfIntegrationPoints), the first sigma snapshots at the initial time is ignored """ numberOfSigmaComponents = collectionProblemData.GetSolutionsNumberOfComponents("sigma") numberOfModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("U") numberOfIntegrationPoints = collectionProblemData.GetOperatorCompressionData("U").GetNumberOfIntegrationPoints() snapshotsSigma = collectionProblemData.GetSnapshots("sigma", skipFirst = True) if toleranceCompressSnapshotsForRedQuad > 0.: SP.CompressData(collectionProblemData, "SigmaECM", tolerance, snapshots = snapshotsSigma) reducedOrderBasisSigmaEspilon = collectionProblemData.GetReducedOrderBasis("SigmaECM") reducedOrderBasisSigmaEspilonShape = reducedOrderBasisSigmaEspilon.shape numberOfSigmaModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("SigmaECM") reducedOrderBasisSigmaEspilon.shape = (numberOfSigmaModes,numberOfSigmaComponents,numberOfIntegrationPoints) sigmaEpsilon = np.einsum('mlk,pml->pkl', reducedEpsilonAtIntegPoints, reducedOrderBasisSigmaEspilon, optimize = True).reshape(numberOfModes*numberOfSigmaModes,numberOfIntegrationPoints) reducedOrderBasisSigmaEspilon.shape = reducedOrderBasisSigmaEspilonShape else: snapshotsSigmaShape = snapshotsSigma.shape numberOfSigmaSnapshots = collectionProblemData.GetGlobalNumberOfSnapshots("sigma", skipFirst = True) snapshotsSigma.shape = (numberOfSigmaSnapshots,numberOfSigmaComponents,numberOfIntegrationPoints) sigmaEpsilon = np.einsum('mlk,pml->pkl', reducedEpsilonAtIntegPoints, snapshotsSigma, optimize = True).reshape(numberOfModes*numberOfSigmaSnapshots,numberOfIntegrationPoints) snapshotsSigma.shape = snapshotsSigmaShape return sigmaEpsilon
[docs] def ReduceIntegrator(collectionProblemData, mesh): r""" Computes the reduced integrator, named reducedEpsilonAtIntegPoints. :math:`\epsilon(u_t)(x_k)`, where u_t is a test displacement and x_k are the integration points is called integrator, since the internal forces vector is obtained using :math:`\sigma:\epsilon(u_t)(x_k)`. reducedEpsilonAtIntegPoints denotes :math:`\epsilon(\Psi)(x_k)`, where :math:`\Psi` is a POD mode and :math:`x_k` are the integration points. Parameters ---------- collectionProblemData : CollectionProblemData definition of the training data in a CollectionProblemData object mesh: MuscatMesh high-dimensional mesh Returns ------- np.ndarray reducedEpsilonAtIntegPoints, of size (numberOfSigmaComponents,numberOfIntegrationPoints,numberOfModes) """ operatorCompressionData = collectionProblemData.GetOperatorCompressionData("U") numberOfIntegrationPoints = operatorCompressionData.GetNumberOfIntegrationPoints() gradPhiAtIntegPoint = operatorCompressionData.GetGradPhiAtIntegPoint() uNumberOfComponents = collectionProblemData.GetSolutionsNumberOfComponents("U") numberOfSigmaComponents = collectionProblemData.GetSolutionsNumberOfComponents("sigma") numberOfNodes = mesh.GetNumberOfNodes() numberOfModes = collectionProblemData.GetReducedOrderBasisNumberOfModes("U") reducedOrderBasis = collectionProblemData.GetReducedOrderBasis("U") componentReducedOrderBasis = [] for i in range(uNumberOfComponents): componentReducedOrderBasis.append(reducedOrderBasis[:,i*numberOfNodes:(i+1)*numberOfNodes].T) reducedEpsilonAtIntegPoints = np.empty((numberOfSigmaComponents, numberOfIntegrationPoints, numberOfModes)) count = 0 for i in range(uNumberOfComponents): reducedEpsilonAtIntegPoints[count,:,:] = gradPhiAtIntegPoint[i].dot(componentReducedOrderBasis[i]) count += 1 for ind in secondOrderTensorOffDiagComponents[uNumberOfComponents]: i = ind[0] j = ind[1] reducedEpsilonAtIntegPoints[count,:,:] = gradPhiAtIntegPoint[i].dot(componentReducedOrderBasis[j]) + gradPhiAtIntegPoint[j].dot(componentReducedOrderBasis[i]) count += 1 return reducedEpsilonAtIntegPoints
[docs] def LearnDualReconstruction(collectionProblemData, listNameDualVarOutput, reducedIntegrationPoints, methodDualReconstruction, timeSequenceForDualReconstruction = None, snapshotsAtReducedIntegrationPoints = None, regressor = None, paramGrid = None): """ Train the agorithm of reconstruction of the dual quantities from the reduced integration points to the complete mesh: "GappyPOD" or "MetaModel" Parameters ---------- collectionProblemData : CollectionProblemData definition of the training data in a CollectionProblemData object listNameDualVarOutput : list of strings, optional names of dual quantities to reconstruct on complete mesh reducedIntegrationPoints : np.ndarray of size (nReducedIntegrationPoints,), dtype = int indices of the reduced integration points methodDualReconstruction : str method used to reconstruct the dual quantities from the reduced integration points to the complete mesh, "GappyPOD" or "MetaModel" timeSequenceForDualReconstruction : list or 1D np.ndarray, optional time sequence used to train the dual quantities reconstruction algorithm snapshotsAtReducedIntegrationPoints : dict, optional dual quantity at reduced integrationPoints, for which reconstruction is trained regressor : object satisfying the scikit-learn regressors API, optional regressor used for the method "MetaModel" paramGrid : dict, optional of lists (of floats) containing hyperparameter values of the considered regressor Returns ------- dict dictionary containing data used for reconstructing dual quantities in the online stage, with key:values: "methodDualReconstruction : str "GappyPOD", "MetaModel" or "None" name of dual quantities (e.g. "evrcum"): - if "MetaModel" : tuple model: sklearn.model_selection._search.GridSearchCV scalerX: sklearn.preprocessing._data.StandardScaler scalery: sklearn.preprocessing._data.StandardScaler - if "GappyPOD" : tuple reducedOrderBasisAtReducedIntegrationPoints: np.ndarray of size (numberOfModes, nReducedIntegrationPoints) - if "None" : empty dict Notes ----- Regressor and paramGrid mush be both None or both specified """ if listNameDualVarOutput is None: listNameDualVarOutput = []# pragma: no cover dualReconstructionData = {} if methodDualReconstruction == "GappyPOD": dualReconstructionData["methodDualReconstruction"] = "GappyPOD" for name in listNameDualVarOutput: dualReconstructionData[name] = collectionProblemData.GetReducedOrderBasis(name)[:,reducedIntegrationPoints] elif methodDualReconstruction == "MetaModel": dualReconstructionData["methodDualReconstruction"] = "MetaModel" for name in listNameDualVarOutput: print("Running MetaModel for solutionName =", name) if timeSequenceForDualReconstruction is None: solutionTimeSteps = collectionProblemData.GetSolutionTimeSteps(name, skipFirst = True)[:,np.newaxis] if snapshotsAtReducedIntegrationPoints is None: localSnapshotsAtReducedIntegrationPoints = collectionProblemData.GetSnapshots(name, skipFirst = True)[:,reducedIntegrationPoints] else:# pragma: no cover raise RuntimeError("cannot specify snapshotsAtReducedIntegrationPoints when timeSequenceForDualReconstruction is None") yTrain = collectionProblemData.GetReducedCoordinates(name, skipFirst = True) else: solutionTimeSteps = np.array(timeSequenceForDualReconstruction)[:,np.newaxis] if snapshotsAtReducedIntegrationPoints is None: localSnapshotsAtReducedIntegrationPoints = collectionProblemData.GetSnapshotsAtTimes(name, timeSequenceForDualReconstruction)[:,reducedIntegrationPoints] else: localSnapshotsAtReducedIntegrationPoints = snapshotsAtReducedIntegrationPoints[name] yTrain = collectionProblemData.GetReducedCoordinatesAtTimes(name, timeSequenceForDualReconstruction) solutionTimeSteps = np.tile(solutionTimeSteps, (collectionProblemData.GetNumberOfProblemDatas(),1)) #print("solutionTimeSteps =", solutionTimeSteps) #print("localSnapshotsAtReducedIntegrationPoints =", localSnapshotsAtReducedIntegrationPoints) assert solutionTimeSteps.shape[0] == localSnapshotsAtReducedIntegrationPoints.shape[0], "number of time steps different from number of snapshots at reduced integration points" #XTrain = np.hstack((solutionTimeSteps, localSnapshotsAtReducedIntegrationPoints)) XTrain = localSnapshotsAtReducedIntegrationPoints from Mordicus.BasicAlgorithms import ScikitLearnRegressor as SLR """ from sklearn.gaussian_process.kernels import WhiteKernel, ConstantKernel, Matern #from sklearn.gaussian_process import GaussianProcessRegressor kernel = Matern(length_scale=1., nu=2.5) + ConstantKernel(constant_value=1.0, constant_value_bounds=(0.001, 1.0)) * WhiteKernel(noise_level=1, noise_level_bounds=(1e-10, 1e0)) #regressor = GaussianProcessRegressor(kernel=kernel) regressor = SLR.MyGPR(kernel=kernel) paramGrid = {'kernel__k1__length_scale': [0.1, 1., 10.], 'kernel__k1__nu':[1.5, 2.5], 'kernel__k2__k1__constant_value':[0.001, 0.01]}#, 'kernel__k2__k2__noise_level':[1, 2]} """ #from pprint import pprint #pprint(vars(regressor)) #print("regressor.get_params().keys() =", regressor.get_params().keys()) if regressor is None and paramGrid is None: from sklearn import linear_model regressor = linear_model.Lasso(alpha=0.1, max_iter=int(1e8), tol=1e-5) paramGrid = {'alpha': [0.1, 1.]} if (regressor is None and paramGrid is not None) or (regressor is not None and paramGrid is None):#pragma: no cover raise ValueError('regressor and paramGrid mush be both None or both specified') model, scalerX, scalery = SLR.GridSearchCVRegression(regressor, paramGrid, XTrain, yTrain) dualReconstructionData[name] = (model, scalerX, scalery) else:# pragma: no cover print(">> Not learning how to reconstructing dual variables") return dualReconstructionData
[docs] def ReconstructDualQuantity(nameDualQuantity, operatorCompressionData, onlineData, timeSequence): r""" Reconstruct a dual quantitie using a trained algorithm Parameters ---------- nameDualQuantity : str name of the dual quantity to reconstruct operatorCompressionData : OperatorCompressionDataMechanical used in the operator compression step of the POD-ECM method. onlineData : OnlineDataMechanical used in the online stage to store quantities required for the tracking of certain quantites, e.g. the internal state variable at the reduced integration points, required to compute the constitutive law. timeSequence : 1D np.ndarray, optional time sequence used to train the dual quantities reconstruction algorithm Returns ------- dict with float as keys (time steps) and np.ndarray of size (numberOfModes) as values (reducedCoordinates of the reconstructed dual quantity) list of floats, contains the resitual of the reconstruction if "GappyPOD" is used, empty list otherwise """ onlineDualCompressedSolution = {} #nTimeSteps = np.array(timeSequence).shape[0] #nReducedIntegrationPoints = operatorCompressionData["reducedEpsilonAtReducedIntegPoints"].shape[1] #fieldAtMask = np.zeros((nTimeSteps, nReducedIntegrationPoints)) #localIndex = {} #for tag in onlineCompressionData['indicesOfReducedIntegPointsPerMaterial'].keys(): # if nameDualQuantity in onlineCompressionData['dualVarOutputNames'][tag]: # localIndex[tag] = onlineCompressionData['dualVarOutputNames'][tag].index(nameDualQuantity) #for i, time in enumerate(timeSequence): # for tag, intPoints in onlineCompressionData['indicesOfReducedIntegPointsPerMaterial'].items(): # if tag in localIndex: # fieldAtMask[i, intPoints] = onlineCompressionData['dualVarOutput'][tag][time][:,localIndex[tag]] fieldAtMask = GetOnlineDualQuantityAtReducedIntegrationPoints(nameDualQuantity, onlineData, timeSequence) #print(nameDualQuantity, fieldAtMask) reconstructionResidual = [] methodDualReconstruction = operatorCompressionData.GetDualReconstructionData()["methodDualReconstruction"] if methodDualReconstruction == "GappyPOD": from genericROM.BasicAlgorithms import GappyPOD as GP ModesAtMask = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity] for i, time in enumerate(timeSequence): onlineDualCompressedSolution[time], error = GP.FitAndCost(ModesAtMask, fieldAtMask[i]) reconstructionResidual.append(error) elif methodDualReconstruction == "MetaModel": from Mordicus.BasicAlgorithms import ScikitLearnRegressor as SLR model = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity][0] scalerX = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity][1] scalery = operatorCompressionData.GetDualReconstructionData()[nameDualQuantity][2] #xTest = np.hstack((np.array(timeSequence)[:,np.newaxis], fieldAtMask)) xTest = fieldAtMask y = SLR.ComputeRegressionApproximation(model, scalerX, scalery, xTest) for i, time in enumerate(timeSequence): onlineDualCompressedSolution[time] = y[i] else:# pragma: no cover raise NameError(methodDualReconstruction+" not available") return onlineDualCompressedSolution, reconstructionResidual
[docs] def GetOnlineDualQuantityAtReducedIntegrationPoints(nameDualQuantity, onlineData, timeSequence): r""" Reconstruct a dual quantitie using a trained algorithm Parameters ---------- nameDualQuantity : str name of the dual quantity to reconstruct onlineData : OnlineDataMechanical used in the online stage to store quantities required for the tracking of certain quantites, e.g. the internal state variable at the reduced integration points, required to compute the constitutive law. timeSequence : 1D np.ndarray, optional time sequence used to train the dual quantities reconstruction algorithm Returns ------- np.ndarray of size(nTimeSteps, nReducedIntegrationPoints), contains the online dual quantity of name "nameDualQuantity" at reduced integration points """ nTimeSteps = np.array(timeSequence).shape[0] nReducedIntegrationPoints = 0 localIndex = {} for tag, intPoints in onlineData.GetIndicesOfReducedIntegPointsPerMaterial().items(): nReducedIntegrationPoints += intPoints.shape[0] if nameDualQuantity in onlineData.GetDualVarOutputNames(tag): localIndex[tag] = onlineData.GetDualVarOutputNames(tag).index(nameDualQuantity) fieldAtMask = np.zeros((nTimeSteps, nReducedIntegrationPoints)) for i, time in enumerate(timeSequence): for tag, intPoints in onlineData.GetIndicesOfReducedIntegPointsPerMaterial().items(): if tag in localIndex: fieldAtMask[i, intPoints] = onlineData.GetDualVarOutput(tag)[time][:,localIndex[tag]] return fieldAtMask
if __name__ == "__main__":# pragma: no cover from genericROM import RunTestFile RunTestFile(__file__)