Skip to content
39 changes: 37 additions & 2 deletions dirac-common/src/DIRACCommon/Core/Utilities/ReturnValues.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from __future__ import annotations

import functools
import linecache
import sys
import traceback
from types import TracebackType
Expand Down Expand Up @@ -41,6 +42,41 @@ class DErrorReturnType(TypedDict):
DReturnType = Union[DOKReturnType[T], DErrorReturnType]


def _formatStackCached(skip: int = 1) -> list[str]:
"""Like ``traceback.format_stack()`` but without the per-frame ``stat()``.

Source files are read into ``linecache`` once and reused; cached lines may
therefore lag behind a mid-process source edit.
"""
frame = sys._getframe(skip)
frames = []
while frame is not None:
code = frame.f_code
frames.append((code.co_filename, frame.f_lineno, code.co_name))
frame = frame.f_back
frames.reverse()

cache = linecache.cache
out = []
for filename, lineno, name in frames:
if filename not in cache:
try:
linecache.updatecache(filename)
except OSError:
pass
cached = cache.get(filename)
line_text = ""
if cached is not None:
file_lines = cached[2]
if 0 < lineno <= len(file_lines):
line_text = file_lines[lineno - 1].strip()
formatted = f' File "{filename}", line {lineno}, in {name}\n'
if line_text:
formatted += f" {line_text}\n"
out.append(formatted)
return out


def S_ERROR(*args: Any, **kwargs: Any) -> DErrorReturnType:
"""return value on error condition

Expand Down Expand Up @@ -69,8 +105,7 @@ def S_ERROR(*args: Any, **kwargs: Any) -> DErrorReturnType:

if callStack is None:
try:
callStack = traceback.format_stack()
callStack.pop()
callStack = _formatStackCached(skip=2)
except Exception:
callStack = []

Expand Down
33 changes: 15 additions & 18 deletions src/DIRAC/AccountingSystem/DB/AccountingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,24 +632,21 @@ def insertRecordDirectly(self, typeName, startTime, endTime, valuesList):
if not retVal["OK"]:
return retVal
connObj = retVal["Value"]
try:
retVal = self.insertFields(
_getTableName("type", typeName), self.dbCatalog[typeName]["typeFields"], insertList, conn=connObj
)
if not retVal["OK"]:
return retVal
# HACK: One more record to split in the buckets to be able to count total entries
valuesList.append(1)
retVal = self.__startTransaction(connObj)
if not retVal["OK"]:
return retVal
retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList, connObj=connObj)
if not retVal["OK"]:
self.__rollbackTransaction(connObj)
return retVal
return self.__commitTransaction(connObj)
finally:
connObj.close()
retVal = self.insertFields(
_getTableName("type", typeName), self.dbCatalog[typeName]["typeFields"], insertList, conn=connObj
)
if not retVal["OK"]:
return retVal
# HACK: One more record to split in the buckets to be able to count total entries
valuesList.append(1)
retVal = self.__startTransaction(connObj)
if not retVal["OK"]:
return retVal
retVal = self.__splitInBuckets(typeName, startTime, endTime, valuesList, connObj=connObj)
if not retVal["OK"]:
self.__rollbackTransaction(connObj)
return retVal
return self.__commitTransaction(connObj)

def deleteRecord(self, typeName, startTime, endTime, valuesList):
"""
Expand Down
39 changes: 38 additions & 1 deletion src/DIRAC/Core/Security/m2crypto/X509Chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
In particular, limited proxy: https://tools.ietf.org/html/rfc3820#section-3.8

"""
from __future__ import annotations

import copy
import hashlib
import os
import re
from collections import OrderedDict

import M2Crypto.X509

Expand All @@ -23,6 +27,10 @@
# Decorator to check that the PKey has been loaded
needPKey = executeOnlyIf("_keyObj", S_ERROR(DErrno.ENOPKEY))

# Cache of parsed proxy files keyed by (path, mtime_ns, size, inode).
_PROXY_LOAD_CACHE: OrderedDict[tuple, dict] = OrderedDict()
_PROXY_LOAD_CACHE_MAX = 8


class X509Chain:
"""
Expand Down Expand Up @@ -289,12 +297,41 @@ def loadProxyFromFile(self, chainLocation):

:returns: S_OK / S_ERROR
"""
try:
st = os.stat(chainLocation)
except OSError as e:
return S_ERROR(DErrno.EOF, f"{chainLocation}: {repr(e).replace(',)', ')')}")

cacheKey = (chainLocation, st.st_mtime_ns, st.st_size, st.st_ino)
cached = _PROXY_LOAD_CACHE.get(cacheKey)
if cached is not None:
_PROXY_LOAD_CACHE.move_to_end(cacheKey)
# Shallow copy so callers mutating self._certList don't poison the cache
self._certList = list(cached["certList"])
self._keyObj = cached["keyObj"]
self.__isProxy = cached["isProxy"]
self.__isLimitedProxy = cached["isLimitedProxy"]
self.__firstProxyStep = cached["firstProxyStep"]
self.__hash = False
return S_OK()

try:
with open(chainLocation) as fd:
pemData = fd.read()
except Exception as e:
return S_ERROR(DErrno.EOF, f"{chainLocation}: {repr(e).replace(',)', ')')}")
return self.loadProxyFromString(pemData)
result = self.loadProxyFromString(pemData)
if result["OK"]:
_PROXY_LOAD_CACHE[cacheKey] = {
"certList": list(self._certList),
"keyObj": self._keyObj,
"isProxy": self.__isProxy,
"isLimitedProxy": self.__isLimitedProxy,
"firstProxyStep": self.__firstProxyStep,
}
while len(_PROXY_LOAD_CACHE) > _PROXY_LOAD_CACHE_MAX:
_PROXY_LOAD_CACHE.popitem(last=False)
return result

def loadProxyFromString(self, pemData):
"""
Expand Down
20 changes: 10 additions & 10 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def getDoc(self, index: str, docID: str) -> dict:
"""
sLog.debug(f"Retrieving document {docID} in index {index}")
try:
return S_OK(self.client.get(index, docID)["_source"])
return S_OK(self.client.get(index=index, id=docID)["_source"])
except NotFoundError:
sLog.warn("Could not find the document in index", index)
return S_OK({})
Expand All @@ -282,7 +282,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
response = self.client.mget(body={"docs": docs})
except RequestError as re:
return S_ERROR(re)
else:
Expand All @@ -300,12 +300,12 @@ def updateDoc(self, index: str, docID: str, body) -> dict:
"""
sLog.debug(f"Updating document {docID} in index {index}")
try:
self.client.update(index, docID, body)
self.client.update(index=index, id=docID, body=body)
except ConflictError:
# updates are rather "heavy" operations from ES point of view, needing seqNo to be updated.
# Not ideal, but we just wait and retry.
time.sleep(1)
self.client.update(index, docID, body, params={"retry_on_conflict": 3})
self.client.update(index=index, id=docID, body=body, params={"retry_on_conflict": 3})
except RequestError as re:
return S_ERROR(re)
return S_OK()
Expand All @@ -319,7 +319,7 @@ def deleteDoc(self, index: str, docID: str):
"""
sLog.debug(f"Deleting document {docID} in index {index}")
try:
return S_OK(self.client.delete(index, docID))
return S_OK(self.client.delete(index=index, id=docID))
except RequestError as re:
return S_ERROR(re)
except NotFoundError:
Expand All @@ -334,7 +334,7 @@ def existsDoc(self, index: str, docID: str) -> bool:
:param docID: document ID
"""
sLog.debug(f"Checking if document {docID} in index {index} exists")
return self.client.exists(index, docID)
return self.client.exists(index=index, id=docID)

@ifConnected
def _Search(self, indexname):
Expand Down Expand Up @@ -365,7 +365,7 @@ def getIndexes(self, indexName=None):
indexName = ""
sLog.debug(f"Getting indices alias of {indexName}")
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
return list(self.client.indices.get_alias(f"{indexName}*"))
return list(self.client.indices.get_alias(index=f"{indexName}*"))

@ifConnected
def getDocTypes(self, indexName):
Expand All @@ -378,7 +378,7 @@ def getDocTypes(self, indexName):
result = []
try:
sLog.debug("Getting mappings for ", indexName)
result = self.client.indices.get_mapping(indexName)
result = self.client.indices.get_mapping(index=indexName)
except Exception as e: # pylint: disable=broad-except
sLog.exception()
return S_ERROR(e)
Expand Down Expand Up @@ -409,7 +409,7 @@ def existingIndex(self, indexName):
"""
sLog.debug(f"Checking existance of index {indexName}")
try:
return S_OK(self.client.indices.exists(indexName))
return S_OK(self.client.indices.exists(index=indexName))
except TransportError as e:
sLog.exception()
return S_ERROR(e)
Expand Down Expand Up @@ -446,7 +446,7 @@ def deleteIndex(self, indexName):
"""
sLog.info("Deleting index", indexName)
try:
retVal = self.client.indices.delete(indexName)
retVal = self.client.indices.delete(index=indexName)
except NotFoundError:
sLog.warn("Index does not exist", indexName)
return S_OK("Nothing to delete")
Expand Down
2 changes: 2 additions & 0 deletions src/DIRAC/Core/Utilities/Extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def entrypointToExtension(entrypoint):
return module.split(".")[0]


@functools.cache
def extensionsByPriority():
"""Discover extensions using the setuptools metadata"""

Expand All @@ -140,6 +141,7 @@ def extensionsByPriority():
return extensions


@functools.cache
def getExtensionMetadata(extensionName):
"""Get the metadata for a given extension name"""
for entrypoint in metadata.entry_points().select(group="dirac"):
Expand Down
32 changes: 27 additions & 5 deletions src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ def __execute(self, conn, cmd):
cursor.close()
return res

# Skip per-checkout ping when the same thread reused a connection within this
# window; kept well below any plausible MySQL ``wait_timeout``.
PING_IDLE_THRESHOLD = 5.0

def get(self, dbName, retries=10):
retries = max(0, min(MAXCONNECTRETRY, retries))
self.clean()
Expand All @@ -369,13 +373,13 @@ def __getWithRetry(self, dbName, totalRetries, retriesLeft):
if sleepTime > 0:
time.sleep(sleepTime)
try:
conn, lastName, thid = self.__innerGet()
conn, lastName, thid, needsPing = self.__innerGet()
except MySQLdb.MySQLError as excp:
if retriesLeft > 0:
return self.__getWithRetry(dbName, totalRetries, retriesLeft - 1)
return S_ERROR(DErrno.EMYSQL, f"Could not connect: {excp}")

if not self.__ping(conn):
if needsPing and not self.__ping(conn):
try:
self.__assigned.pop(thid)
except KeyError:
Expand Down Expand Up @@ -411,18 +415,31 @@ def __innerGet(self):
now = time.time()
if thid in self.__assigned:
data = self.__assigned[thid]
conn = data[0]
idle = now - data[2]
data[2] = now
return data[0], data[1], thid
return data[0], data[1], thid, idle >= self.PING_IDLE_THRESHOLD
# Not cached
try:
conn, dbName = self.__spares.pop()
needsPing = True
except IndexError:
conn = self.__newConn()
dbName = ""
needsPing = False

self.__assigned[thid] = [conn, dbName, now]
return conn, dbName, thid
return conn, dbName, thid, needsPing

def discardCurrentThreadConn(self):
"""Drop the current thread's cached connection without returning it to the spare pool."""
thid = self.__thid
data = self.__assigned.pop(thid, None)
if data is None:
return
try:
data[0].close()
except Exception:
pass

def __pop(self, thid):
try:
Expand Down Expand Up @@ -530,6 +547,9 @@ def __del__(self):
except Exception:
pass

# MySQLdb error codes that mean the connection itself is dead and must be discarded.
__CONNECTION_LOST_ERRNOS = frozenset((2006, 2013, 2055, 4031))

def _except(self, methodName, x, err, cmd="", debug=True):
"""
print MySQL error or exception
Expand All @@ -538,6 +558,8 @@ def _except(self, methodName, x, err, cmd="", debug=True):
try:
raise x
except MySQLdb.Error as e:
if e.args and e.args[0] in self.__CONNECTION_LOST_ERRNOS:
self.__connectionPool.discardCurrentThreadConn()
if debug:
self.log.error(f"{methodName} ({self._safeCmd(cmd)}): {err}", "%d: %s" % (e.args[0], e.args[1]))
return S_ERROR(DErrno.EMYSQL, "%s: ( %d: %s )" % (err, e.args[0], e.args[1]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,10 @@ def getDirectorySize(self, lfns, longOutput=False, rawFileTables=False, recursiv
)

if not resultLogical["OK"]:
connection.close()
return resultLogical

resultDict = resultLogical["Value"]
if not resultDict["Successful"]:
connection.close()
return resultLogical

if longOutput:
Expand All @@ -826,11 +824,9 @@ def getDirectorySize(self, lfns, longOutput=False, rawFileTables=False, recursiv
resultDict["QueryTime"] = time.time() - start
result = S_OK(resultDict)
result["Message"] = "Failed to get the physical size on storage"
connection.close()
return result
for lfn in resultPhysical["Value"]["Successful"]:
resultDict["Successful"][lfn]["PhysicalSize"] = resultPhysical["Value"]["Successful"][lfn]
connection.close()
resultDict["QueryTime"] = time.time() - start

return S_OK(resultDict)
Expand Down
Loading
Loading