diff --git a/dirac-common/src/DIRACCommon/Core/Utilities/ReturnValues.py b/dirac-common/src/DIRACCommon/Core/Utilities/ReturnValues.py index ca32d393aad..dbaf10e453b 100644 --- a/dirac-common/src/DIRACCommon/Core/Utilities/ReturnValues.py +++ b/dirac-common/src/DIRACCommon/Core/Utilities/ReturnValues.py @@ -8,6 +8,7 @@ from __future__ import annotations import functools +import linecache import sys import traceback from types import TracebackType @@ -41,6 +42,41 @@ class DErrorReturnType(TypedDict): DReturnType = Union[DOKReturnType[T], DErrorReturnType] +def _formatStackCached(skip: int = 1) -> list[str]: + """Like ``traceback.format_stack()`` but without the per-frame ``stat()``. + + Source files are read into ``linecache`` once and reused; cached lines may + therefore lag behind a mid-process source edit. + """ + frame = sys._getframe(skip) + frames = [] + while frame is not None: + code = frame.f_code + frames.append((code.co_filename, frame.f_lineno, code.co_name)) + frame = frame.f_back + frames.reverse() + + cache = linecache.cache + out = [] + for filename, lineno, name in frames: + if filename not in cache: + try: + linecache.updatecache(filename) + except OSError: + pass + cached = cache.get(filename) + line_text = "" + if cached is not None: + file_lines = cached[2] + if 0 < lineno <= len(file_lines): + line_text = file_lines[lineno - 1].strip() + formatted = f' File "{filename}", line {lineno}, in {name}\n' + if line_text: + formatted += f" {line_text}\n" + out.append(formatted) + return out + + def S_ERROR(*args: Any, **kwargs: Any) -> DErrorReturnType: """return value on error condition @@ -69,8 +105,7 @@ def S_ERROR(*args: Any, **kwargs: Any) -> DErrorReturnType: if callStack is None: try: - callStack = traceback.format_stack() - callStack.pop() + callStack = _formatStackCached(skip=2) except Exception: callStack = [] diff --git a/src/DIRAC/AccountingSystem/DB/AccountingDB.py b/src/DIRAC/AccountingSystem/DB/AccountingDB.py index a3f687fba8d..1662de1177d 100644 --- a/src/DIRAC/AccountingSystem/DB/AccountingDB.py +++ b/src/DIRAC/AccountingSystem/DB/AccountingDB.py @@ -632,24 +632,21 @@ def insertRecordDirectly(self, typeName, startTime, endTime, valuesList): if not retVal["OK"]: return retVal connObj = retVal["Value"] - try: - retVal = self.insertFields( - _getTableName("type", typeName), self.dbCatalog[typeName]["typeFields"], insertList, conn=connObj - ) - if not retVal["OK"]: - return retVal - # HACK: One more record to split in the buckets to be able to count total entries - valuesList.append(1) - retVal = self.__startTransaction(connObj) - if not retVal["OK"]: - return retVal - retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList, connObj=connObj) - if not retVal["OK"]: - self.__rollbackTransaction(connObj) - return retVal - return self.__commitTransaction(connObj) - finally: - connObj.close() + retVal = self.insertFields( + _getTableName("type", typeName), self.dbCatalog[typeName]["typeFields"], insertList, conn=connObj + ) + if not retVal["OK"]: + return retVal + # HACK: One more record to split in the buckets to be able to count total entries + valuesList.append(1) + retVal = self.__startTransaction(connObj) + if not retVal["OK"]: + return retVal + retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList, connObj=connObj) + if not retVal["OK"]: + self.__rollbackTransaction(connObj) + return retVal + return self.__commitTransaction(connObj) def deleteRecord(self, typeName, startTime, endTime, valuesList): """ diff --git a/src/DIRAC/Core/Security/m2crypto/X509Chain.py b/src/DIRAC/Core/Security/m2crypto/X509Chain.py index 5aa2659071b..80facb01633 100644 --- a/src/DIRAC/Core/Security/m2crypto/X509Chain.py +++ b/src/DIRAC/Core/Security/m2crypto/X509Chain.py @@ -4,9 +4,13 @@ In particular, limited proxy: https://tools.ietf.org/html/rfc3820#section-3.8 """ +from __future__ import annotations + import copy import hashlib +import os import re +from collections import OrderedDict import M2Crypto.X509 @@ -23,6 +27,10 @@ # Decorator to check that the PKey has been loaded needPKey = executeOnlyIf("_keyObj", S_ERROR(DErrno.ENOPKEY)) +# Cache of parsed proxy files keyed by (path, mtime_ns, size, inode). +_PROXY_LOAD_CACHE: OrderedDict[tuple, dict] = OrderedDict() +_PROXY_LOAD_CACHE_MAX = 8 + class X509Chain: """ @@ -289,12 +297,41 @@ def loadProxyFromFile(self, chainLocation): :returns: S_OK / S_ERROR """ + try: + st = os.stat(chainLocation) + except OSError as e: + return S_ERROR(DErrno.EOF, f"{chainLocation}: {repr(e).replace(',)', ')')}") + + cacheKey = (chainLocation, st.st_mtime_ns, st.st_size, st.st_ino) + cached = _PROXY_LOAD_CACHE.get(cacheKey) + if cached is not None: + _PROXY_LOAD_CACHE.move_to_end(cacheKey) + # Shallow copy so callers mutating self._certList don't poison the cache + self._certList = list(cached["certList"]) + self._keyObj = cached["keyObj"] + self.__isProxy = cached["isProxy"] + self.__isLimitedProxy = cached["isLimitedProxy"] + self.__firstProxyStep = cached["firstProxyStep"] + self.__hash = False + return S_OK() + try: with open(chainLocation) as fd: pemData = fd.read() except Exception as e: return S_ERROR(DErrno.EOF, f"{chainLocation}: {repr(e).replace(',)', ')')}") - return self.loadProxyFromString(pemData) + result = self.loadProxyFromString(pemData) + if result["OK"]: + _PROXY_LOAD_CACHE[cacheKey] = { + "certList": list(self._certList), + "keyObj": self._keyObj, + "isProxy": self.__isProxy, + "isLimitedProxy": self.__isLimitedProxy, + "firstProxyStep": self.__firstProxyStep, + } + while len(_PROXY_LOAD_CACHE) > _PROXY_LOAD_CACHE_MAX: + _PROXY_LOAD_CACHE.popitem(last=False) + return result def loadProxyFromString(self, pemData): """ diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 46322129abd..2c01fc2fb63 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -265,7 +265,7 @@ def getDoc(self, index: str, docID: str) -> dict: """ sLog.debug(f"Retrieving document {docID} in index {index}") try: - return S_OK(self.client.get(index, docID)["_source"]) + return S_OK(self.client.get(index=index, id=docID)["_source"]) except NotFoundError: sLog.warn("Could not find the document in index", index) return S_OK({}) @@ -282,7 +282,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]: sLog.debug(f"Retrieving documents {docIDs}") docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs] try: - response = self.client.mget({"docs": docs}) + response = self.client.mget(body={"docs": docs}) except RequestError as re: return S_ERROR(re) else: @@ -300,12 +300,12 @@ def updateDoc(self, index: str, docID: str, body) -> dict: """ sLog.debug(f"Updating document {docID} in index {index}") try: - self.client.update(index, docID, body) + self.client.update(index=index, id=docID, body=body) except ConflictError: # updates are rather "heavy" operations from ES point of view, needing seqNo to be updated. # Not ideal, but we just wait and retry. time.sleep(1) - self.client.update(index, docID, body, params={"retry_on_conflict": 3}) + self.client.update(index=index, id=docID, body=body, params={"retry_on_conflict": 3}) except RequestError as re: return S_ERROR(re) return S_OK() @@ -319,7 +319,7 @@ def deleteDoc(self, index: str, docID: str): """ sLog.debug(f"Deleting document {docID} in index {index}") try: - return S_OK(self.client.delete(index, docID)) + return S_OK(self.client.delete(index=index, id=docID)) except RequestError as re: return S_ERROR(re) except NotFoundError: @@ -334,7 +334,7 @@ def existsDoc(self, index: str, docID: str) -> bool: :param docID: document ID """ sLog.debug(f"Checking if document {docID} in index {index} exists") - return self.client.exists(index, docID) + return self.client.exists(index=index, id=docID) @ifConnected def _Search(self, indexname): @@ -365,7 +365,7 @@ def getIndexes(self, indexName=None): indexName = "" sLog.debug(f"Getting indices alias of {indexName}") # we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc. - return list(self.client.indices.get_alias(f"{indexName}*")) + return list(self.client.indices.get_alias(index=f"{indexName}*")) @ifConnected def getDocTypes(self, indexName): @@ -378,7 +378,7 @@ def getDocTypes(self, indexName): result = [] try: sLog.debug("Getting mappings for ", indexName) - result = self.client.indices.get_mapping(indexName) + result = self.client.indices.get_mapping(index=indexName) except Exception as e: # pylint: disable=broad-except sLog.exception() return S_ERROR(e) @@ -409,7 +409,7 @@ def existingIndex(self, indexName): """ sLog.debug(f"Checking existance of index {indexName}") try: - return S_OK(self.client.indices.exists(indexName)) + return S_OK(self.client.indices.exists(index=indexName)) except TransportError as e: sLog.exception() return S_ERROR(e) @@ -446,7 +446,7 @@ def deleteIndex(self, indexName): """ sLog.info("Deleting index", indexName) try: - retVal = self.client.indices.delete(indexName) + retVal = self.client.indices.delete(index=indexName) except NotFoundError: sLog.warn("Index does not exist", indexName) return S_OK("Nothing to delete") diff --git a/src/DIRAC/Core/Utilities/Extensions.py b/src/DIRAC/Core/Utilities/Extensions.py index 774fe37da00..a56111ba821 100644 --- a/src/DIRAC/Core/Utilities/Extensions.py +++ b/src/DIRAC/Core/Utilities/Extensions.py @@ -118,6 +118,7 @@ def entrypointToExtension(entrypoint): return module.split(".")[0] +@functools.cache def extensionsByPriority(): """Discover extensions using the setuptools metadata""" @@ -140,6 +141,7 @@ def extensionsByPriority(): return extensions +@functools.cache def getExtensionMetadata(extensionName): """Get the metadata for a given extension name""" for entrypoint in metadata.entry_points().select(group="dirac"): diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 323249bba16..d9568ab731c 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -359,6 +359,10 @@ def __execute(self, conn, cmd): cursor.close() return res + # Skip per-checkout ping when the same thread reused a connection within this + # window; kept well below any plausible MySQL ``wait_timeout``. + PING_IDLE_THRESHOLD = 5.0 + def get(self, dbName, retries=10): retries = max(0, min(MAXCONNECTRETRY, retries)) self.clean() @@ -369,13 +373,13 @@ def __getWithRetry(self, dbName, totalRetries, retriesLeft): if sleepTime > 0: time.sleep(sleepTime) try: - conn, lastName, thid = self.__innerGet() + conn, lastName, thid, needsPing = self.__innerGet() except MySQLdb.MySQLError as excp: if retriesLeft > 0: return self.__getWithRetry(dbName, totalRetries, retriesLeft - 1) return S_ERROR(DErrno.EMYSQL, f"Could not connect: {excp}") - if not self.__ping(conn): + if needsPing and not self.__ping(conn): try: self.__assigned.pop(thid) except KeyError: @@ -411,18 +415,31 @@ def __innerGet(self): now = time.time() if thid in self.__assigned: data = self.__assigned[thid] - conn = data[0] + idle = now - data[2] data[2] = now - return data[0], data[1], thid + return data[0], data[1], thid, idle >= self.PING_IDLE_THRESHOLD # Not cached try: conn, dbName = self.__spares.pop() + needsPing = True except IndexError: conn = self.__newConn() dbName = "" + needsPing = False self.__assigned[thid] = [conn, dbName, now] - return conn, dbName, thid + return conn, dbName, thid, needsPing + + def discardCurrentThreadConn(self): + """Drop the current thread's cached connection without returning it to the spare pool.""" + thid = self.__thid + data = self.__assigned.pop(thid, None) + if data is None: + return + try: + data[0].close() + except Exception: + pass def __pop(self, thid): try: @@ -530,6 +547,9 @@ def __del__(self): except Exception: pass + # MySQLdb error codes that mean the connection itself is dead and must be discarded. + __CONNECTION_LOST_ERRNOS = frozenset((2006, 2013, 2055, 4031)) + def _except(self, methodName, x, err, cmd="", debug=True): """ print MySQL error or exception @@ -538,6 +558,8 @@ def _except(self, methodName, x, err, cmd="", debug=True): try: raise x except MySQLdb.Error as e: + if e.args and e.args[0] in self.__CONNECTION_LOST_ERRNOS: + self.__connectionPool.discardCurrentThreadConn() if debug: self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1])) return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1])) diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py index 09cb78e2f14..e5be04b3e33 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py @@ -804,12 +804,10 @@ def getDirectorySize(self, lfns, longOutput=False, rawFileTables=False, recursiv ) if not resultLogical["OK"]: - connection.close() return resultLogical resultDict = resultLogical["Value"] if not resultDict["Successful"]: - connection.close() return resultLogical if longOutput: @@ -826,11 +824,9 @@ def getDirectorySize(self, lfns, longOutput=False, rawFileTables=False, recursiv resultDict["QueryTime"] = time.time() - start result = S_OK(resultDict) result["Message"] = "Failed to get the physical size on storage" - connection.close() return result for lfn in resultPhysical["Value"]["Successful"]: resultDict["Successful"][lfn]["PhysicalSize"] = resultPhysical["Value"]["Successful"][lfn] - connection.close() resultDict["QueryTime"] = time.time() - start return S_OK(resultDict) diff --git a/src/DIRAC/DataManagementSystem/Utilities/DMSHelpers.py b/src/DIRAC/DataManagementSystem/Utilities/DMSHelpers.py index 8a28c8437bc..7f4919666a3 100644 --- a/src/DIRAC/DataManagementSystem/Utilities/DMSHelpers.py +++ b/src/DIRAC/DataManagementSystem/Utilities/DMSHelpers.py @@ -2,9 +2,11 @@ This module contains helper methods for accessing operational attributes or parameters of DMS objects """ +from __future__ import annotations from collections import defaultdict from DIRAC import gConfig, gLogger, S_OK, S_ERROR +from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData from DIRAC.ConfigurationSystem.Client.Helpers.Path import cfgPath from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations @@ -95,10 +97,28 @@ def _getConnectionIndex(connectionLevel, default=None): class DMSHelpers: """ - This class is used to get information about sites, SEs and their interrelations + This class is used to get information about sites, SEs and their interrelations. + + Instances are shared per VO and re-derived only when the configuration system + has been refreshed since the last initialisation. """ + _instances: dict[object, DMSHelpers] = {} + + def __new__(cls, vo=False): + inst = cls._instances.get(vo) + if inst is None: + inst = super().__new__(cls) + inst.__csVersion = None + cls._instances[vo] = inst + return inst + def __init__(self, vo=False): + # We're a per-VO singleton (see __new__); skip re-init unless the CS + # has been refreshed since we last populated. + currentVersion = gConfigurationData.getVersion() + if self.__csVersion == currentVersion: + return self.siteSEMapping = {} self.storageElementSet = set() self.siteSet = set() @@ -106,6 +126,7 @@ def __init__(self, vo=False): self.failoverSEs = None self.archiveSEs = None self.notForJobSEs = None + self.__csVersion = currentVersion def getSiteSEMapping(self): """Returns a dictionary of all sites and their localSEs as a list, e.g. diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PilotStatusAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PilotStatusAgent.py index 582910b8595..4a6a67c99b6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PilotStatusAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PilotStatusAgent.py @@ -68,8 +68,6 @@ def execute(self): # Now handle pilots not updated in the last N days and declare them Deleted. result = self.handleOldPilots(connection) - connection.close() - result = self.pilots.clearPilots(self.clearPilotsDelay, self.clearAbortedDelay) if not result["OK"]: self.log.warn("Failed to clear old pilots in the PilotAgentsDB")