diff --git a/docs/source/AdministratorGuide/ExternalsSupport/index.rst b/docs/source/AdministratorGuide/ExternalsSupport/index.rst index 2cdf39fb09d..0f206bb1ac4 100644 --- a/docs/source/AdministratorGuide/ExternalsSupport/index.rst +++ b/docs/source/AdministratorGuide/ExternalsSupport/index.rst @@ -55,6 +55,7 @@ You can run your OpenSearch cluster without authentication, or using User name a - ``ca_certs`` (default:``None``) - ``client_key`` (default:``None``) - ``client_cert`` (default:``None``) + - ``IndexPrefix`` (default:``''``). Prefix prepended to all DIRAC-created OpenSearch indexes. The prefix will be lower case only. to the location:: diff --git a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst index 531c04e06be..6e85e39933c 100644 --- a/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst +++ b/docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst @@ -50,13 +50,14 @@ For example:: { User = test Password = password + IndexPrefix = mydirac- } } -The following option can be set in `Systems/Monitoring/Databases/MonitoringDB`: +The following global option can be set in `Systems/NoSQLDatabases`: - *IndexPrefix*: Prefix used to prepend to indexes created in the ES instance. + *IndexPrefix*: Prefix prepended to all indexes created in the OpenSearch instance. For each monitoring types managed, the Period (how often a new index is created) can be defined with:: diff --git a/src/DIRAC/ConfigurationSystem/Client/Utilities.py b/src/DIRAC/ConfigurationSystem/Client/Utilities.py index 63c38da2354..de5c59bb591 100644 --- a/src/DIRAC/ConfigurationSystem/Client/Utilities.py +++ b/src/DIRAC/ConfigurationSystem/Client/Utilities.py @@ -532,6 +532,13 @@ def _getCACerts(cs_path): if ca_certs: parameters["ca_certs"] = ca_certs + # Global index prefix for all OpenSearch databases + result = gConfig.getOption("/Systems/NoSQLDatabases/IndexPrefix") + if result["OK"]: + parameters["IndexPrefix"] = str(result["Value"]).strip().lower() + else: + parameters["IndexPrefix"] = "" + # Check optional parameters: Host, Port, SSL result = gConfig.getOption(cs_path + "/Host") if not result["OK"]: diff --git a/src/DIRAC/Core/Base/ElasticDB.py b/src/DIRAC/Core/Base/ElasticDB.py index 7f73806bbbc..9b26178f72a 100644 --- a/src/DIRAC/Core/Base/ElasticDB.py +++ b/src/DIRAC/Core/Base/ElasticDB.py @@ -34,6 +34,7 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None): self.__ca_certs = dbParameters.get("ca_certs", None) self.__client_key = dbParameters.get("client_key", None) self.__client_cert = dbParameters.get("client_cert", None) + self.__globalIndexPrefix = dbParameters.get("IndexPrefix", "") super().__init__( host=self._dbHost, @@ -41,6 +42,7 @@ def __init__(self, fullName, indexPrefix="", parentLogger=None): user=self.__user, password=self.__dbPassword, indexPrefix=indexPrefix, + globalIndexPrefix=self.__globalIndexPrefix, useSSL=self.__useSSL, useCRT=self.__useCRT, ca_certs=self.__ca_certs, diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 46322129abd..786d4e994e7 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -103,6 +103,7 @@ def __init__( user=None, password=None, indexPrefix="", + globalIndexPrefix="", useSSL=True, useCRT=False, ca_certs=None, @@ -117,6 +118,7 @@ def __init__( :param str user: user name to access the db :param str password: if the db is password protected we need to provide a password :param str indexPrefix: it is the indexPrefix used to get all indexes + :param str globalIndexPrefix: prefix prepended to all index names and patterns :param bool useSSL: We can disable using secure connection. By default we use secure connection. :param bool useCRT: Use certificates. :param str ca_certs: CA certificates bundle. @@ -125,6 +127,7 @@ def __init__( """ self._connected = False + self.globalIndexPrefix = globalIndexPrefix if user and password: sLog.debug("Specified username and password") password = urlparse.quote_plus(password) @@ -191,6 +194,43 @@ def __init__( except ElasticConnectionError as e: sLog.error(repr(e)) + @property + def globalIndexPrefix(self) -> str: + """Global prefix prepended to all index names and patterns.""" + return self._globalIndexPrefix + + @globalIndexPrefix.setter + def globalIndexPrefix(self, value: str): + self._globalIndexPrefix = (value or "").strip().lower() + + def _withGlobalPrefix(self, indexName): + """Prepend the global index prefix to an index name or pattern.""" + if not self._globalIndexPrefix: + return indexName + + prefixedTokens = [] + for token in indexName.split(","): + strippedToken = token.strip() + if not strippedToken: + prefixedTokens.append(strippedToken) + continue + + excluded = strippedToken.startswith("-") + if excluded: + strippedToken = strippedToken[1:] + + if strippedToken == "_all": + strippedToken = "*" + + if not strippedToken.startswith(self._globalIndexPrefix): + strippedToken = f"{self._globalIndexPrefix}{strippedToken}" + + if excluded: + strippedToken = f"-{strippedToken}" + prefixedTokens.append(strippedToken) + + return ",".join(prefixedTokens) + @ifConnected def addIndexTemplate( self, name: str, index_patterns: list, mapping: dict, priority: int = 1, settings: dict = None @@ -204,6 +244,7 @@ def addIndexTemplate( """ if settings is None: settings = {"index": {"number_of_shards": 1, "number_of_replicas": 1}} + index_patterns = [self._withGlobalPrefix(pattern) for pattern in index_patterns] body = { "index_patterns": index_patterns, "priority": priority, @@ -225,6 +266,7 @@ def query(self, index: str, query): :param dict query: It is the query in OpenSearch DSL language """ + index = self._withGlobalPrefix(index) try: esDSLQueryResult = self.client.search(index=index, body=query) return S_OK(esDSLQueryResult) @@ -247,6 +289,7 @@ def update(self, index: str, query=None, updateByQuery: bool = True, docID: str if not index or not query: return S_ERROR("Missing index or query") + index = self._withGlobalPrefix(index) try: if updateByQuery: esDSLQueryResult = self.client.update_by_query(index=index, body=query) @@ -263,6 +306,7 @@ def getDoc(self, index: str, docID: str) -> dict: :param index: name of the index :param docID: document ID """ + index = self._withGlobalPrefix(index) sLog.debug(f"Retrieving document {docID} in index {index}") try: return S_OK(self.client.get(index, docID)["_source"]) @@ -280,7 +324,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]: :param docIDs: document IDs """ sLog.debug(f"Retrieving documents {docIDs}") - docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs] + docs = [{"_index": self._withGlobalPrefix(indexFunc(docID, vo)), "_id": docID} for docID in docIDs] try: response = self.client.mget({"docs": docs}) except RequestError as re: @@ -298,6 +342,7 @@ def updateDoc(self, index: str, docID: str, body) -> dict: :param body: The request definition requires either `script` or partial `doc` """ + index = self._withGlobalPrefix(index) sLog.debug(f"Updating document {docID} in index {index}") try: self.client.update(index, docID, body) @@ -317,6 +362,7 @@ def deleteDoc(self, index: str, docID: str): :param index: name of the index :param docID: document ID """ + index = self._withGlobalPrefix(index) sLog.debug(f"Deleting document {docID} in index {index}") try: return S_OK(self.client.delete(index, docID)) @@ -333,6 +379,7 @@ def existsDoc(self, index: str, docID: str) -> bool: :param index: name of the index :param docID: document ID """ + index = self._withGlobalPrefix(index) sLog.debug(f"Checking if document {docID} in index {index} exists") return self.client.exists(index, docID) @@ -341,6 +388,7 @@ def _Search(self, indexname): """ it returns the object which can be used for retreiving certain value from the DB """ + indexname = self._withGlobalPrefix(indexname) return Search(using=self.client, index=indexname) def _Q(self, name_or_query="match", **params): @@ -363,6 +411,7 @@ def getIndexes(self, indexName=None): """ if not indexName: indexName = "" + indexName = self._withGlobalPrefix(indexName) sLog.debug(f"Getting indices alias of {indexName}") # we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc. return list(self.client.indices.get_alias(f"{indexName}*")) @@ -376,6 +425,7 @@ def getDocTypes(self, indexName): :return: S_OK or S_ERROR """ result = [] + indexName = self._withGlobalPrefix(indexName) try: sLog.debug("Getting mappings for ", indexName) result = self.client.indices.get_mapping(indexName) @@ -407,6 +457,7 @@ def existingIndex(self, indexName): :param str indexName: the name of the index :returns: S_OK/S_ERROR if the request is successful """ + indexName = self._withGlobalPrefix(indexName) sLog.debug(f"Checking existance of index {indexName}") try: return S_OK(self.client.indices.exists(indexName)) @@ -428,6 +479,7 @@ def createIndex(self, indexPrefix, mapping=None, period="day"): else: sLog.warn("The period is not provided, so using non-periodic indexes names") fullIndex = indexPrefix + fullIndex = self._withGlobalPrefix(fullIndex) try: if not mapping: @@ -444,6 +496,7 @@ def deleteIndex(self, indexName): """ :param str indexName: the name of the index to be deleted... """ + indexName = self._withGlobalPrefix(indexName) sLog.info("Deleting index", indexName) try: retVal = self.client.indices.delete(indexName) @@ -474,6 +527,7 @@ def index(self, indexName, body=None, docID=None, op_type="index"): if not indexName or not body: return S_ERROR("Missing index or body") + indexName = self._withGlobalPrefix(indexName) try: res = self.client.index(index=indexName, body=body, id=docID, params={"op_type": op_type}) except (RequestError, TransportError) as e: @@ -505,8 +559,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim indexName = self.generateFullIndexName(indexPrefix, period) else: indexName = indexPrefix - sLog.debug(f"Bulk indexing into {indexName} of {len(data)}") + sLog.debug(f"Bulk indexing into {self._withGlobalPrefix(indexName)} of {len(data)}") + # Keep existence/creation checks on the raw name path; methods apply global prefix internally. res = self.existingIndex(indexName) if not res["OK"]: return res @@ -515,6 +570,9 @@ def bulk_index(self, indexPrefix, data=None, mapping=None, period="day", withTim if not retVal["OK"]: return retVal + # Prefix exactly once for the direct bulk API call. + indexName = self._withGlobalPrefix(indexName) + try: res = bulk(client=self.client, index=indexName, actions=generateDocs(data, withTimeStamp)) except (BulkIndexError, RequestError) as e: @@ -534,6 +592,7 @@ def getUniqueValue(self, indexName, key, orderBy=False): :param dict orderBy: it is a dictionary in case we want to order the result {key:'desc'} or {key:'asc'} :returns: a list of unique value for a certain key from the dictionary. """ + indexName = self._withGlobalPrefix(indexName) query = self._Search(indexName) @@ -592,6 +651,7 @@ def deleteByQuery(self, indexName, query): :param str indexName: the name of the index :param str query: the JSON-formatted query for which we want to issue the delete """ + indexName = self._withGlobalPrefix(indexName) try: self.client.delete_by_query(index=indexName, body=query) except Exception as inst: diff --git a/src/DIRAC/Core/Utilities/test/Test_ElasticSearchDB.py b/src/DIRAC/Core/Utilities/test/Test_ElasticSearchDB.py new file mode 100644 index 00000000000..7bc443ab6d4 --- /dev/null +++ b/src/DIRAC/Core/Utilities/test/Test_ElasticSearchDB.py @@ -0,0 +1,207 @@ +from types import SimpleNamespace +from unittest.mock import MagicMock + +import DIRAC.Core.Utilities.ElasticSearchDB as elastic_module +from DIRAC.Core.Utilities.ElasticSearchDB import ElasticSearchDB + + +def _get_db(prefix="dirac-"): + db = ElasticSearchDB.__new__(ElasticSearchDB) + db._connected = True + db.client = MagicMock() + db.globalIndexPrefix = prefix + return db + + +def test_global_prefix_normalization_and_token_handling(): + db = _get_db() + db.globalIndexPrefix = " LHCB- " + assert db.globalIndexPrefix == "lhcb-" + + assert db._withGlobalPrefix("jobs,-logs,_all,lhcb-ready") == "lhcb-jobs,-lhcb-logs,lhcb-*,lhcb-ready" + + +def test_query_and_document_operations_use_prefixed_index_names(): + db = _get_db("prefix-") + db.client.search.return_value = {} + db.client.update_by_query.return_value = {} + db.client.index.return_value = {"result": "updated"} + db.client.get.return_value = {"_source": {"a": 1}} + db.client.delete.return_value = {"result": "deleted"} + db.client.exists.return_value = True + + db.query("myindex", {"query": {"match_all": {}}}) + assert db.client.search.call_args.kwargs["index"] == "prefix-myindex" + + db.update("myindex", {"script": {}}, updateByQuery=True) + assert db.client.update_by_query.call_args.kwargs["index"] == "prefix-myindex" + + db.update("myindex", {"a": 1}, updateByQuery=False, docID="42") + assert db.client.index.call_args.kwargs["index"] == "prefix-myindex" + + db.getDoc("myindex", "1") + assert db.client.get.call_args.args[0] == "prefix-myindex" + + db.updateDoc("myindex", "1", {"doc": {"a": 2}}) + assert db.client.update.call_args.args[0] == "prefix-myindex" + + db.deleteDoc("myindex", "1") + assert db.client.delete.call_args.args[0] == "prefix-myindex" + + db.existsDoc("myindex", "1") + assert db.client.exists.call_args.args[0] == "prefix-myindex" + + db.index("myindex", {"a": 1}, docID="5") + assert db.client.index.call_args.kwargs["index"] == "prefix-myindex" + + db.deleteByQuery("myindex", {"query": {"match_all": {}}}) + assert db.client.delete_by_query.call_args.kwargs["index"] == "prefix-myindex" + + +def test_index_management_and_template_operations_use_prefixed_index_names(): + db = _get_db("prefix-") + db.client.indices.put_index_template.return_value = {"acknowledged": True} + db.client.indices.get_alias.return_value = {"prefix-myindex": {}} + db.client.indices.get_mapping.return_value = { + "prefix-myindex": {"mappings": {"properties": {"a": {"type": "keyword"}}}} + } + db.client.indices.exists.return_value = True + db.client.indices.create.return_value = {"acknowledged": True} + db.client.indices.delete.return_value = {"acknowledged": True} + + db.addIndexTemplate("my-template", ["myindex-*", "prefix-already-*"], mapping={}) + body = db.client.indices.put_index_template.call_args.kwargs["body"] + assert body["index_patterns"] == ["prefix-myindex-*", "prefix-already-*"] + + db.getIndexes("myindex") + assert db.client.indices.get_alias.call_args.args[0] == "prefix-myindex*" + + db.getDocTypes("myindex") + assert db.client.indices.get_mapping.call_args.args[0] == "prefix-myindex" + + db.existingIndex("myindex") + assert db.client.indices.exists.call_args.args[0] == "prefix-myindex" + + db.createIndex("myindex", mapping={}, period=None) + assert db.client.indices.create.call_args.kwargs["index"] == "prefix-myindex" + + db.deleteIndex("myindex") + assert db.client.indices.delete.call_args.args[0] == "prefix-myindex" + + +def test_get_docs_and_search_builder_use_prefixed_index_names(monkeypatch): + db = _get_db("prefix-") + db.client.mget.return_value = {"docs": [{"_id": "7", "found": True, "_source": {"v": 1}}]} + + db.getDocs(lambda _doc_id, _vo: "logs", ["7"], "lhcb") + mget_body = db.client.mget.call_args.args[0] + assert mget_body["docs"][0]["_index"] == "prefix-logs" + + captured = {} + + def _fake_search(*, using, index): + captured["using"] = using + captured["index"] = index + return "search-object" + + monkeypatch.setattr(elastic_module, "Search", _fake_search) + assert db._Search("myindex") == "search-object" + assert captured["using"] is db.client + assert captured["index"] == "prefix-myindex" + + +def test_bulk_index_prefixes_once_but_keeps_internal_checks_unprefixed(monkeypatch): + db = _get_db("prefix-") + db.existingIndex = MagicMock(return_value={"OK": True, "Value": False}) + db.createIndex = MagicMock(return_value={"OK": True, "Value": "created"}) + + seen = {} + + def _fake_bulk(*, client, index, actions): + seen["client"] = client + seen["index"] = index + seen["actions"] = list(actions) + return (len(seen["actions"]), []) + + monkeypatch.setattr(elastic_module, "bulk", _fake_bulk) + res = db.bulk_index("myindex", data=[{"a": 1}, {"a": 2}], mapping=None, period=None, withTimeStamp=False) + + assert res["OK"] + assert res["Value"] == 2 + db.existingIndex.assert_called_once_with("myindex") + db.createIndex.assert_called_once_with("myindex", {}, None) + assert seen["client"] is db.client + assert seen["index"] == "prefix-myindex" + + +def test_get_unique_value_uses_prefixed_index_name(): + db = _get_db("prefix-") + + class _FakeAggs: + def bucket(self, *_args, **_kwargs): + return self + + def metric(self, *_args, **_kwargs): + return self + + class _FakeQuery: + def __init__(self): + self.aggs = _FakeAggs() + + def filter(self, *_args, **_kwargs): + return self + + def extra(self, **_kwargs): + return self + + def to_dict(self): + return {} + + def execute(self): + return SimpleNamespace(aggregations={"site": SimpleNamespace(buckets=[{"key": "A"}, {"key": "B"}])}) + + db._Search = MagicMock(return_value=_FakeQuery()) + res = db.getUniqueValue("myindex", "site") + + assert res["OK"] + assert res["Value"] == ["A", "B"] + db._Search.assert_called_once_with("prefix-myindex") + + +def test_create_index_with_period_uses_generated_name_and_global_prefix(): + db = _get_db("prefix-") + db.client.indices.create.return_value = {"acknowledged": True} + db.generateFullIndexName = MagicMock(return_value="myindex-2026-04-25") + + res = db.createIndex("myindex", mapping={"a": {"type": "keyword"}}, period="day") + + assert res["OK"] + assert res["Value"] == "prefix-myindex-2026-04-25" + db.generateFullIndexName.assert_called_once_with("myindex", "day") + assert db.client.indices.create.call_args.kwargs["index"] == "prefix-myindex-2026-04-25" + + +def test_bulk_index_with_period_uses_prefixed_generated_name_once(monkeypatch): + db = _get_db("prefix-") + db.generateFullIndexName = MagicMock(return_value="myindex-2026-04-25") + db.existingIndex = MagicMock(return_value={"OK": True, "Value": False}) + db.createIndex = MagicMock(return_value={"OK": True, "Value": "created"}) + + seen = {} + + def _fake_bulk(*, client, index, actions): + seen["client"] = client + seen["index"] = index + seen["actions"] = list(actions) + return (len(seen["actions"]), []) + + monkeypatch.setattr(elastic_module, "bulk", _fake_bulk) + res = db.bulk_index("myindex", data=[{"a": 1}], mapping={}, period="day", withTimeStamp=False) + + assert res["OK"] + assert res["Value"] == 1 + db.generateFullIndexName.assert_called_once_with("myindex", "day") + db.existingIndex.assert_called_once_with("myindex-2026-04-25") + db.createIndex.assert_called_once_with("myindex", {}, "day") + assert seen["client"] is db.client + assert seen["index"] == "prefix-myindex-2026-04-25" diff --git a/src/DIRAC/Core/scripts/install_full.cfg b/src/DIRAC/Core/scripts/install_full.cfg index 4ea5e518ed2..233dfb60144 100755 --- a/src/DIRAC/Core/scripts/install_full.cfg +++ b/src/DIRAC/Core/scripts/install_full.cfg @@ -142,5 +142,7 @@ LocalInstallation Password = Host = Port = + # Optional global prefix prepended to all DIRAC-created OpenSearch indexes + # IndexPrefix = } } diff --git a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py index 4c09ef93c30..37112bf3d07 100644 --- a/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py +++ b/src/DIRAC/MonitoringSystem/DB/MonitoringDB.py @@ -3,9 +3,8 @@ **Configuration Parameters**: -The following option can be set in `Systems/Monitoring/Databases/MonitoringDB` - -* *IndexPrefix*: Prefix used to prepend to indexes created in the OpenSearch instance. +The global OpenSearch index prefix can be set in +`Systems/NoSQLDatabases/IndexPrefix`. For each monitoring types managed, the Period (how often a new index is created) can be defined with:: @@ -31,8 +30,6 @@ import time from DIRAC import S_ERROR, S_OK -from DIRAC.ConfigurationSystem.Client.Config import gConfig -from DIRAC.ConfigurationSystem.Client.PathFinder import getDatabaseSection from DIRAC.Core.Base.ElasticDB import ElasticDB from DIRAC.Core.Utilities.Plotting.TypeLoader import TypeLoader @@ -45,10 +42,8 @@ def __init__(self, name="Monitoring/MonitoringDB"): """Standard constructor""" try: - section = getDatabaseSection("Monitoring/MonitoringDB") - indexPrefix = gConfig.getValue(f"{section}/IndexPrefix", "").lower() # Connecting to the ES cluster - super().__init__(fullName=name, indexPrefix=indexPrefix) + super().__init__(fullName=name) except RuntimeError as ex: self.log.error("Can't connect to MonitoringDB", repr(ex)) raise ex