diff --git a/docs/content/en/open_source/upgrading/2.54.md b/docs/content/en/open_source/upgrading/2.54.md index 625198c3162..c6843812cb7 100644 --- a/docs/content/en/open_source/upgrading/2.54.md +++ b/docs/content/en/open_source/upgrading/2.54.md @@ -1,8 +1,8 @@ --- title: 'Upgrading to DefectDojo Version 2.54.x' toc_hide: true -weight: -20251201 -description: Removal of django-auditlog and exclusive use of django-pghistory for audit logging & Dropped support for DD_PARSER_EXCLUDE +weight: -20250804 +description: Removal of django-auditlog & Dropped support for DD_PARSER_EXCLUDE & Reimport performance improvements --- ## Breaking Change: Removal of django-auditlog @@ -44,4 +44,16 @@ The backfill migration is not mandatory to succeed. If it fails for some reason, To simplify the management of the DefectDojo application, parser exclusions are no longer controlled via the environment variable DD_PARSER_EXCLUDE or application settings. This variable is now unsupported. From now on, you should use the active flag in the Test_Type model to enable or disable parsers. Only parsers associated with active Test_Type entries will be available for use. -Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.54.0) for the contents of the release. \ No newline at end of file +## Import/reimport performance improvements + +DefectDojo 2.54.x includes performance improvements for reimporting scan results, especially for large scans: + +- **Faster reimports** due to fewer database queries and more bulk operations. +- **Reduced database load** during reimport matching and post-processing (helps avoid slowdowns/timeouts under heavy scan volume). +- **More efficient endpoint status updates** during reimport of dynamic findings. +- **Less churn when updating vulnerability IDs**, avoiding unnecessary deletes/writes when nothing changed. + +No action is required after upgrading. (Optional tuning knobs exist via `DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE` and `DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE`.) + +There are other instructions for upgrading to 2.54.x. Check the Release Notes for the contents of the release: `https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.54.0` +Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.54.0) for the contents of the release. diff --git a/dojo/finding/deduplication.py b/dojo/finding/deduplication.py index 7297e55fef4..16dffc100f5 100644 --- a/dojo/finding/deduplication.py +++ b/dojo/finding/deduplication.py @@ -232,59 +232,123 @@ def are_endpoints_duplicates(new_finding, to_duplicate_finding): return False -def build_dedupe_scope_queryset(test): - scope_on_engagement = test.engagement.deduplication_on_engagement - if scope_on_engagement: - scope_q = Q(test__engagement=test.engagement) - else: - # Product scope limited to current product, but exclude engagements that opted into engagement-scoped dedupe - scope_q = Q(test__engagement__product=test.engagement.product) & ( - Q(test__engagement=test.engagement) - | Q(test__engagement__deduplication_on_engagement=False) - ) +def build_candidate_scope_queryset(test, mode="deduplication", service=None): + """ + Build a queryset for candidate finding. + + Args: + test: The test to scope from + mode: "deduplication" (can match across tests) or "reimport" (same test only) + service: Optional service filter (for deduplication mode, not used for reimport since service is in hash) + + """ + if mode == "reimport": + # For reimport, only filter by test. Service filtering is not needed because + # service is included in hash_code calculation (HASH_CODE_FIELDS_ALWAYS = ["service"]), + # so matching by hash_code automatically ensures correct service match. + queryset = Finding.objects.filter(test=test) + else: # deduplication mode + scope_on_engagement = test.engagement.deduplication_on_engagement + if scope_on_engagement: + scope_q = Q(test__engagement=test.engagement) + else: + # Product scope limited to current product, but exclude engagements that opted into engagement-scoped dedupe + scope_q = Q(test__engagement__product=test.engagement.product) & ( + Q(test__engagement=test.engagement) + | Q(test__engagement__deduplication_on_engagement=False) + ) + queryset = Finding.objects.filter(scope_q) + + # Base prefetches for both modes + prefetch_list = ["endpoints", "vulnerability_id_set", "found_by"] + + # Additional prefetches for reimport mode + if mode == "reimport": + prefetch_list.extend([ + "status_finding", + "status_finding__endpoint", + ]) return ( - Finding.objects.filter(scope_q) + queryset .select_related("test", "test__engagement", "test__test_type") - .prefetch_related("endpoints", "found_by") + .prefetch_related(*prefetch_list) ) -def find_candidates_for_deduplication_hash(test, findings): - base_queryset = build_dedupe_scope_queryset(test) +def find_candidates_for_deduplication_hash(test, findings, mode="deduplication", service=None): + """ + Find candidates by hash_code. Works for both deduplication and reimport. + + Args: + test: The test to scope from + findings: List of findings to find candidates for + mode: "deduplication" or "reimport" + service: Optional service filter (for deduplication mode, not used for reimport since service is in hash) + + """ + base_queryset = build_candidate_scope_queryset(test, mode=mode, service=service) hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None) is not None} if not hash_codes: return {} - existing_qs = ( - base_queryset.filter(hash_code__in=hash_codes) - .exclude(hash_code=None) - .exclude(duplicate=True) - .order_by("id") - ) + + existing_qs = base_queryset.filter(hash_code__in=hash_codes).exclude(hash_code=None) + if mode == "deduplication": + existing_qs = existing_qs.exclude(duplicate=True) + existing_qs = existing_qs.order_by("id") + existing_by_hash = {} for ef in existing_qs: existing_by_hash.setdefault(ef.hash_code, []).append(ef) - deduplicationLogger.debug(f"Found {len(existing_by_hash)} existing findings by hash codes") + + log_msg = "for reimport" if mode == "reimport" else "" + deduplicationLogger.debug(f"Found {len(existing_by_hash)} existing findings by hash codes {log_msg}") return existing_by_hash -def find_candidates_for_deduplication_unique_id(test, findings): - base_queryset = build_dedupe_scope_queryset(test) +def find_candidates_for_deduplication_unique_id(test, findings, mode="deduplication", service=None): + """ + Find candidates by unique_id_from_tool. Works for both deduplication and reimport. + + Args: + test: The test to scope from + findings: List of findings to find candidates for + mode: "deduplication" or "reimport" + service: Optional service filter (for deduplication mode, not used for reimport since service is in hash) + + """ + base_queryset = build_candidate_scope_queryset(test, mode=mode, service=service) unique_ids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None) is not None} if not unique_ids: return {} - existing_qs = base_queryset.filter(unique_id_from_tool__in=unique_ids).exclude(unique_id_from_tool=None).exclude(duplicate=True).order_by("id") + + existing_qs = base_queryset.filter(unique_id_from_tool__in=unique_ids).exclude(unique_id_from_tool=None) + if mode == "deduplication": + existing_qs = existing_qs.exclude(duplicate=True) # unique_id_from_tool can only apply to the same test_type because it is parser dependent - existing_qs = existing_qs.filter(test__test_type=test.test_type) + existing_qs = existing_qs.filter(test__test_type=test.test_type).order_by("id") + existing_by_uid = {} for ef in existing_qs: existing_by_uid.setdefault(ef.unique_id_from_tool, []).append(ef) - deduplicationLogger.debug(f"Found {len(existing_by_uid)} existing findings by unique IDs") + + log_msg = "for reimport" if mode == "reimport" else "" + deduplicationLogger.debug(f"Found {len(existing_by_uid)} existing findings by unique IDs {log_msg}") return existing_by_uid -def find_candidates_for_deduplication_uid_or_hash(test, findings): - base_queryset = build_dedupe_scope_queryset(test) +def find_candidates_for_deduplication_uid_or_hash(test, findings, mode="deduplication", service=None): + """ + Find candidates by unique_id_from_tool or hash_code. Works for both deduplication and reimport. + + Args: + test: The test to scope from + findings: List of findings to find candidates for + mode: "deduplication" or "reimport" + service: Optional service filter (for deduplication mode, not used for reimport since service is in hash) + + """ + base_queryset = build_candidate_scope_queryset(test, mode=mode, service=service) hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None) is not None} unique_ids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None) is not None} if not hash_codes and not unique_ids: @@ -298,7 +362,11 @@ def find_candidates_for_deduplication_uid_or_hash(test, findings): uid_q = Q(unique_id_from_tool__isnull=False, unique_id_from_tool__in=unique_ids) & Q(test__test_type=test.test_type) cond |= uid_q - existing_qs = base_queryset.filter(cond).exclude(duplicate=True).order_by("id") + existing_qs = base_queryset.filter(cond) + if mode == "deduplication": + # reimport matching will match against duplicates, import/deduplication doesn't. + existing_qs = existing_qs.exclude(duplicate=True) + existing_qs = existing_qs.order_by("id") existing_by_hash = {} existing_by_uid = {} @@ -307,13 +375,15 @@ def find_candidates_for_deduplication_uid_or_hash(test, findings): existing_by_hash.setdefault(ef.hash_code, []).append(ef) if ef.unique_id_from_tool is not None: existing_by_uid.setdefault(ef.unique_id_from_tool, []).append(ef) - deduplicationLogger.debug(f"Found {len(existing_by_uid)} existing findings by unique IDs") - deduplicationLogger.debug(f"Found {len(existing_by_hash)} existing findings by hash codes") + + log_msg = "for reimport" if mode == "reimport" else "" + deduplicationLogger.debug(f"Found {len(existing_by_uid)} existing findings by unique IDs {log_msg}") + deduplicationLogger.debug(f"Found {len(existing_by_hash)} existing findings by hash codes {log_msg}") return existing_by_uid, existing_by_hash def find_candidates_for_deduplication_legacy(test, findings): - base_queryset = build_dedupe_scope_queryset(test) + base_queryset = build_candidate_scope_queryset(test, mode="deduplication") titles = {f.title for f in findings if getattr(f, "title", None)} cwes = {f.cwe for f in findings if getattr(f, "cwe", 0)} cwes.discard(0) @@ -335,6 +405,52 @@ def find_candidates_for_deduplication_legacy(test, findings): return by_title, by_cwe +# TODO: should we align this with deduplication? +def find_candidates_for_reimport_legacy(test, findings, service=None): + """ + Find all existing findings in the test that match any of the given findings by title and severity. + Used for batch reimport to avoid 1+N query problem. + Legacy reimport matches by title (case-insensitive), severity, and numerical_severity. + Note: This function is kept separate because legacy reimport has fundamentally different matching logic + than legacy deduplication (title+severity vs title+CWE). + Note: service parameter is kept for backward compatibility but not used since service is in hash_code. + """ + base_queryset = build_candidate_scope_queryset(test, mode="reimport", service=None) + + # Collect all unique title/severity combinations + title_severity_pairs = set() + for finding in findings: + if finding.title: + title_severity_pairs.add(( + finding.title.lower(), # Case-insensitive matching + finding.severity, + Finding.get_numerical_severity(finding.severity), + )) + + if not title_severity_pairs: + return {} + + # Build query to find all matching findings + conditions = Q() + for title_lower, severity, numerical_severity in title_severity_pairs: + conditions |= ( + Q(title__iexact=title_lower) & + Q(severity=severity) & + Q(numerical_severity=numerical_severity) + ) + + existing_qs = base_queryset.filter(conditions).order_by("id") + + # Build dictionary keyed by (title_lower, severity) for quick lookup + existing_by_key = {} + for ef in existing_qs: + key = (ef.title.lower(), ef.severity) + existing_by_key.setdefault(key, []).append(ef) + + deduplicationLogger.debug(f"Found {sum(len(v) for v in existing_by_key.values())} existing findings by legacy matching for reimport") + return existing_by_key + + def _is_candidate_older(new_finding, candidate): # Ensure the newer finding is marked as duplicate of the older finding is_older = candidate.id < new_finding.id diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 19bf9ee6d99..8689212a835 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -765,12 +765,15 @@ def add_endpoints(new_finding, form): endpoint=endpoint, defaults={"date": form.cleaned_data["date"] or timezone.now()}) -def save_vulnerability_ids(finding, vulnerability_ids): +def save_vulnerability_ids(finding, vulnerability_ids, *, delete_existing: bool = True): # Remove duplicates vulnerability_ids = list(dict.fromkeys(vulnerability_ids)) - # Remove old vulnerability ids - Vulnerability_Id.objects.filter(finding=finding).delete() + # Remove old vulnerability ids if requested + # Callers can set delete_existing=False when they know there are no existing IDs + # to avoid an unnecessary delete query (e.g., for new findings) + if delete_existing: + Vulnerability_Id.objects.filter(finding=finding).delete() # Save new vulnerability ids # Using bulk create throws Django 50 warnings about unsaved models... diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index 14e5b567712..cac02755022 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -32,7 +32,6 @@ Test_Import, Test_Import_Finding_Action, Test_Type, - Vulnerability_Id, ) from dojo.notifications.helper import create_notification from dojo.tag_utils import bulk_add_tags_to_instances @@ -278,6 +277,7 @@ def determine_process_method( def determine_deduplication_algorithm(self) -> str: """ Determines what dedupe algorithm to use for the Test being processed. + Overridden in Pro. :return: A string representing the dedupe algorithm to use. """ return self.test.deduplication_algorithm @@ -793,21 +793,23 @@ def process_cve( return finding - def process_vulnerability_ids( + def store_vulnerability_ids( self, finding: Finding, ) -> Finding: """ - Parse the `unsaved_vulnerability_ids` field from findings after they are parsed - to create `Vulnerability_Id` objects with the finding associated correctly - """ - if finding.unsaved_vulnerability_ids: - # Remove old vulnerability ids - keeping this call only because of flake8 - Vulnerability_Id.objects.filter(finding=finding).delete() + Store vulnerability IDs for a finding. + Reads from finding.unsaved_vulnerability_ids and saves them overwriting existing ones. + + Args: + finding: The finding to store vulnerability IDs for - # user the helper function - finding_helper.save_vulnerability_ids(finding, finding.unsaved_vulnerability_ids) + Returns: + The finding object + """ + vulnerability_ids_to_process = finding.unsaved_vulnerability_ids or [] + finding_helper.save_vulnerability_ids(finding, vulnerability_ids_to_process, delete_existing=False) return finding def process_files( diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 63f41b8f744..e1e3f6f4556 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -234,7 +234,7 @@ def process_findings( # Process any files self.process_files(finding) # Process vulnerability IDs - finding = self.process_vulnerability_ids(finding) + finding = self.store_vulnerability_ids(finding) # Categorize this finding as a new one new_findings.append(finding) # all data is already saved on the finding, we only need to trigger post processing in batches diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 10b3ac7148a..14a7da7e21d 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -1,5 +1,6 @@ import logging +from django.conf import settings from django.core.files.uploadedfile import TemporaryUploadedFile from django.core.serializers import serialize from django.db.models.query_utils import Q @@ -7,6 +8,12 @@ import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper from dojo.decorators import we_want_async +from dojo.finding.deduplication import ( + find_candidates_for_deduplication_hash, + find_candidates_for_deduplication_uid_or_hash, + find_candidates_for_deduplication_unique_id, + find_candidates_for_reimport_legacy, +) from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -152,6 +159,93 @@ def process_scan( test_import_history, ) + def get_reimport_match_candidates_for_batch( + self, + batch_findings: list[Finding], + ) -> tuple[dict, dict, dict]: + """ + Fetch candidate matches for a batch of *unsaved* findings during reimport. + + This is intentionally a separate method so downstream editions (e.g. Dojo Pro) + can override candidate retrieval without copying the full `process_findings()` + implementation. + + Is overridden in Pro. + + Returns: + (candidates_by_hash, candidates_by_uid, candidates_by_key) + + """ + candidates_by_hash: dict = {} + candidates_by_uid: dict = {} + candidates_by_key: dict = {} + + if self.deduplication_algorithm == "hash_code": + candidates_by_hash = find_candidates_for_deduplication_hash( + self.test, + batch_findings, + mode="reimport", + ) + elif self.deduplication_algorithm == "unique_id_from_tool": + candidates_by_uid = find_candidates_for_deduplication_unique_id( + self.test, + batch_findings, + mode="reimport", + ) + elif self.deduplication_algorithm == "unique_id_from_tool_or_hash_code": + candidates_by_uid, candidates_by_hash = find_candidates_for_deduplication_uid_or_hash( + self.test, + batch_findings, + mode="reimport", + ) + elif self.deduplication_algorithm == "legacy": + candidates_by_key = find_candidates_for_reimport_legacy(self.test, batch_findings) + + return candidates_by_hash, candidates_by_uid, candidates_by_key + + def add_new_finding_to_candidates( + self, + finding: Finding, + candidates_by_hash: dict, + candidates_by_uid: dict, + candidates_by_key: dict, + ) -> None: + """ + Add a newly created finding to candidate dictionaries for subsequent findings in the same batch. + + This allows duplicates within the same scan report to be detected even when they're processed + in the same batch. When a new finding is created (no match found), it is added to the candidate + dictionaries so that subsequent findings in the same batch can match against it. + + Is overriden in Pro + + Args: + finding: The newly created finding to add to candidates + candidates_by_hash: Dictionary mapping hash_code to list of findings (modified in-place) + candidates_by_uid: Dictionary mapping unique_id_from_tool to list of findings (modified in-place) + candidates_by_key: Dictionary mapping (title_lower, severity) to list of findings (modified in-place) + + """ + if not finding: + return + + if finding.hash_code: + candidates_by_hash.setdefault(finding.hash_code, []).append(finding) + deduplicationLogger.debug( + f"Added finding {finding.id} (hash_code: {finding.hash_code}) to candidates for next findings in this report", + ) + if finding.unique_id_from_tool: + candidates_by_uid.setdefault(finding.unique_id_from_tool, []).append(finding) + deduplicationLogger.debug( + f"Added finding {finding.id} (unique_id_from_tool: {finding.unique_id_from_tool}) to candidates for next findings in this report", + ) + if finding.title: + legacy_key = (finding.title.lower(), finding.severity) + candidates_by_key.setdefault(legacy_key, []).append(finding) + deduplicationLogger.debug( + f"Added finding {finding.id} (title: {finding.title}, severity: {finding.severity}) to candidates for next findings in this report", + ) + def process_findings( self, parsed_findings: list[Finding], @@ -182,8 +276,6 @@ def process_findings( self.reactivated_items = [] self.unchanged_items = [] self.group_names_to_findings_dict = {} - # Progressive batching for chord execution - # No chord: we dispatch per 1000 findings or on the final finding logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.") logger.debug("STEP 1: looping over findings from the reimported report and trying to match them to existing findings") @@ -204,86 +296,136 @@ def process_findings( cleaned_findings.append(sanitized) batch_finding_ids: list[int] = [] - batch_max_size = 1000 - - for idx, unsaved_finding in enumerate(cleaned_findings): - is_final = idx == len(cleaned_findings) - 1 - # Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report) - # Finding.mitigated is DateTimeField and it requires timezone - if unsaved_finding.mitigated and not unsaved_finding.mitigated.tzinfo: - unsaved_finding.mitigated = unsaved_finding.mitigated.replace(tzinfo=self.now.tzinfo) - # Override the test if needed - if not hasattr(unsaved_finding, "test"): - unsaved_finding.test = self.test - # Set the service supplied at import time - if self.service is not None: - unsaved_finding.service = self.service - # Clean any endpoints that are on the finding - self.endpoint_manager.clean_unsaved_endpoints(unsaved_finding.unsaved_endpoints) - # Calculate the hash code to be used to identify duplicates - unsaved_finding.hash_code = self.calculate_unsaved_finding_hash_code(unsaved_finding) - deduplicationLogger.debug(f"unsaved finding's hash_code: {unsaved_finding.hash_code}") - # Match any findings to this new one coming in - matched_findings = self.match_new_finding_to_existing_finding(unsaved_finding) - deduplicationLogger.debug(f"found {len(matched_findings)} findings matching with current new finding") - # Determine how to proceed based on whether matches were found or not - if matched_findings: - existing_finding = matched_findings[0] - finding, force_continue = self.process_matched_finding( + # Batch size for deduplication/post-processing (only new findings) + dedupe_batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000) + # Batch size for candidate matching (all findings, before matching) + match_batch_max_size = getattr(settings, "IMPORT_REIMPORT_MATCH_BATCH_SIZE", 1000) + + # Process findings in batches to enable batch candidate fetching + # This avoids the 1+N query problem by fetching all candidates for a batch at once + for batch_start in range(0, len(cleaned_findings), match_batch_max_size): + batch_end = min(batch_start + match_batch_max_size, len(cleaned_findings)) + batch_findings = cleaned_findings[batch_start:batch_end] + is_final_batch = batch_end == len(cleaned_findings) + + logger.debug(f"Processing reimport batch {batch_start}-{batch_end} of {len(cleaned_findings)} findings") + + # Prepare findings in batch: set test, service, calculate hash codes + for unsaved_finding in batch_findings: + # Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report) + # Finding.mitigated is DateTimeField and it requires timezone + if unsaved_finding.mitigated and not unsaved_finding.mitigated.tzinfo: + unsaved_finding.mitigated = unsaved_finding.mitigated.replace(tzinfo=self.now.tzinfo) + # Override the test if needed + if not hasattr(unsaved_finding, "test"): + unsaved_finding.test = self.test + # Set the service supplied at import time + if self.service is not None: + unsaved_finding.service = self.service + # Clean any endpoints that are on the finding + self.endpoint_manager.clean_unsaved_endpoints(unsaved_finding.unsaved_endpoints) + # Calculate the hash code to be used to identify duplicates + unsaved_finding.hash_code = self.calculate_unsaved_finding_hash_code(unsaved_finding) + deduplicationLogger.debug(f"unsaved finding's hash_code: {unsaved_finding.hash_code}") + + # Fetch all candidates for this batch at once (batch candidate finding) + candidates_by_hash, candidates_by_uid, candidates_by_key = self.get_reimport_match_candidates_for_batch( + batch_findings, + ) + + # Process each finding in the batch using pre-fetched candidates + for idx, unsaved_finding in enumerate(batch_findings): + is_final = is_final_batch and idx == len(batch_findings) - 1 + + # Match any findings to this new one coming in using pre-fetched candidates + matched_findings = self.match_finding_to_candidate_reimport( unsaved_finding, - existing_finding, + candidates_by_hash=candidates_by_hash, + candidates_by_uid=candidates_by_uid, + candidates_by_key=candidates_by_key, ) - # Determine if we should skip the rest of the loop - if force_continue: - continue - # Update endpoints on the existing finding with those on the new finding - if finding.dynamic_finding: - logger.debug( - "Re-import found an existing dynamic finding for this new " - "finding. Checking the status of endpoints", - ) - self.endpoint_manager.update_endpoint_status( - existing_finding, + deduplicationLogger.debug(f"found {len(matched_findings)} findings matching with current new finding") + # Determine how to proceed based on whether matches were found or not + if matched_findings: + existing_finding = matched_findings[0] + finding, force_continue = self.process_matched_finding( unsaved_finding, - self.user, + existing_finding, ) - else: - finding = self.process_finding_that_was_not_matched(unsaved_finding) - # This condition __appears__ to always be true, but am afraid to remove it - if finding: - # Process the rest of the items on the finding - finding = self.finding_post_processing( - finding, - unsaved_finding, - ) - # all data is already saved on the finding, we only need to trigger post processing in batches - push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by) - batch_finding_ids.append(finding.id) - - # If batch is full or we're at the end, dispatch one batched task - if len(batch_finding_ids) >= batch_max_size or is_final: - finding_ids_batch = list(batch_finding_ids) - batch_finding_ids.clear() - if we_want_async(async_user=self.user): - finding_helper.post_process_findings_batch_signature( - finding_ids_batch, - dedupe_option=True, - rules_option=True, - product_grading_option=True, - issue_updater_option=True, - push_to_jira=push_to_jira, - )() - else: - finding_helper.post_process_findings_batch( - finding_ids_batch, - dedupe_option=True, - rules_option=True, - product_grading_option=True, - issue_updater_option=True, - push_to_jira=push_to_jira, + # Determine if we should skip the rest of the loop + if force_continue: + continue + # Update endpoints on the existing finding with those on the new finding + if finding.dynamic_finding: + logger.debug( + "Re-import found an existing dynamic finding for this new " + "finding. Checking the status of endpoints", + ) + self.endpoint_manager.update_endpoint_status( + existing_finding, + unsaved_finding, + self.user, ) + else: + finding = self.process_finding_that_was_not_matched(unsaved_finding) + + # Add newly created finding to candidates for subsequent findings in this batch + self.add_new_finding_to_candidates( + finding, + candidates_by_hash, + candidates_by_uid, + candidates_by_key, + ) - # No chord: tasks are dispatched immediately above per batch + # This condition __appears__ to always be true, but am afraid to remove it + if finding: + # Process the rest of the items on the finding + finding = self.finding_post_processing( + finding, + unsaved_finding, + ) + # all data is already saved on the finding, we only need to trigger post processing in batches + push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by) + batch_finding_ids.append(finding.id) + + # Post-processing batches (deduplication, rules, etc.) are separate from matching batches. + # These batches only contain "new" findings that were saved (not matched to existing findings). + # In reimport scenarios, typically most findings match existing ones, so only a small fraction + # of findings in each matching batch become new findings that need deduplication. + # + # We accumulate finding IDs across matching batches rather than dispatching at the end of each + # matching batch. This ensures deduplication batches stay close to the intended batch size + # (e.g., 1000 findings) for optimal bulk operation efficiency, even when only ~10% of findings + # in matching batches are new. If we dispatched at the end of each matching batch, we would + # end up with many small deduplication batches (e.g., ~100 findings each), reducing efficiency. + # + # The two batch types serve different purposes: + # - Matching batches: optimize candidate fetching (solve 1+N query problem) + # - Deduplication batches: optimize bulk operations (larger batches = fewer queries) + # They don't need to be aligned since they optimize different operations. + if len(batch_finding_ids) >= dedupe_batch_max_size or is_final: + finding_ids_batch = list(batch_finding_ids) + batch_finding_ids.clear() + if we_want_async(async_user=self.user): + finding_helper.post_process_findings_batch_signature( + finding_ids_batch, + dedupe_option=True, + rules_option=True, + product_grading_option=True, + issue_updater_option=True, + push_to_jira=push_to_jira, + )() + else: + finding_helper.post_process_findings_batch( + finding_ids_batch, + dedupe_option=True, + rules_option=True, + product_grading_option=True, + issue_updater_option=True, + push_to_jira=push_to_jira, + ) + + # No chord: tasks are dispatched immediately above per batch self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items)) # due to #3958 we can have duplicates inside the same report @@ -375,48 +517,73 @@ def parse_findings_dynamic_test_type( logger.debug("REIMPORT_SCAN parser v2: Create parse findings") return super().parse_findings_dynamic_test_type(scan, parser) - def match_new_finding_to_existing_finding( + def match_finding_to_candidate_reimport( self, unsaved_finding: Finding, + candidates_by_hash: dict | None = None, + candidates_by_uid: dict | None = None, + candidates_by_key: dict | None = None, ) -> list[Finding]: - """Matches a single new finding to N existing findings and then returns those matches""" - # This code should match the logic used for deduplication out of the re-import feature. - # See utils.py deduplicate_* functions - deduplicationLogger.debug("return findings bases on algorithm: %s", self.deduplication_algorithm) + """ + Matches a single new finding to existing findings using pre-fetched candidate dictionaries. + This avoids individual database queries by using batch-fetched candidates. + + Args: + unsaved_finding: The finding to match + candidates_by_hash: Dictionary mapping hash_code to list of findings (for hash_code algorithm) + candidates_by_uid: Dictionary mapping unique_id_from_tool to list of findings (for unique_id algorithms) + candidates_by_key: Dictionary mapping (title_lower, severity) to list of findings (for legacy algorithm) + + Returns: + List of matching findings, ordered by id + + """ + deduplicationLogger.debug("matching finding for reimport using algorithm: %s", self.deduplication_algorithm) + if self.deduplication_algorithm == "hash_code": - return Finding.objects.filter( - test=self.test, - hash_code=unsaved_finding.hash_code, - ).exclude(hash_code=None).order_by("id") + if candidates_by_hash is None or unsaved_finding.hash_code is None: + return [] + matches = candidates_by_hash.get(unsaved_finding.hash_code, []) + return sorted(matches, key=lambda f: f.id) + if self.deduplication_algorithm == "unique_id_from_tool": - deduplicationLogger.debug(f"unique_id_from_tool: {unsaved_finding.unique_id_from_tool}") - return Finding.objects.filter( - test=self.test, - unique_id_from_tool=unsaved_finding.unique_id_from_tool, - ).exclude(unique_id_from_tool=None).order_by("id") + if candidates_by_uid is None or unsaved_finding.unique_id_from_tool is None: + return [] + matches = candidates_by_uid.get(unsaved_finding.unique_id_from_tool, []) + return sorted(matches, key=lambda f: f.id) + if self.deduplication_algorithm == "unique_id_from_tool_or_hash_code": - deduplicationLogger.debug(f"unique_id_from_tool: {unsaved_finding.unique_id_from_tool}") - deduplicationLogger.debug(f"hash_code: {unsaved_finding.hash_code}") - query = Finding.objects.filter( - Q(test=self.test), - (Q(hash_code__isnull=False) & Q(hash_code=unsaved_finding.hash_code)) - | (Q(unique_id_from_tool__isnull=False) & Q(unique_id_from_tool=unsaved_finding.unique_id_from_tool)), - ).order_by("id") - deduplicationLogger.debug(query.query) - return query + if candidates_by_hash is None and candidates_by_uid is None: + return [] + + if unsaved_finding.hash_code is None and unsaved_finding.unique_id_from_tool is None: + return [] + + # Collect matches from both hash_code and unique_id_from_tool + matches_by_id = {} + + if unsaved_finding.hash_code is not None: + hash_matches = candidates_by_hash.get(unsaved_finding.hash_code, []) + for match in hash_matches: + matches_by_id[match.id] = match + + if unsaved_finding.unique_id_from_tool is not None: + uid_matches = candidates_by_uid.get(unsaved_finding.unique_id_from_tool, []) + for match in uid_matches: + matches_by_id[match.id] = match + + matches = list(matches_by_id.values()) + return sorted(matches, key=lambda f: f.id) + if self.deduplication_algorithm == "legacy": - # This is the legacy reimport behavior. Although it's pretty flawed and doesn't match the legacy algorithm for deduplication, - # this is left as is for simplicity. - # Re-writing the legacy deduplication here would be complicated and counter-productive. - # If you have use cases going through this section, you're advised to create a deduplication configuration for your parser - logger.warning("Legacy reimport. In case of issue, you're advised to create a deduplication configuration in order not to go through this section") - return Finding.objects.filter( - title__iexact=unsaved_finding.title, - test=self.test, - severity=unsaved_finding.severity, - numerical_severity=Finding.get_numerical_severity(unsaved_finding.severity)).order_by("id") + if candidates_by_key is None or not unsaved_finding.title: + return [] + key = (unsaved_finding.title.lower(), unsaved_finding.severity) + matches = candidates_by_key.get(key, []) + return sorted(matches, key=lambda f: f.id) + logger.error(f'Internal error: unexpected deduplication_algorithm: "{self.deduplication_algorithm}"') - return None + return [] def process_matched_finding( self, @@ -691,11 +858,46 @@ def process_finding_that_was_not_matched( self.process_request_response_pairs(unsaved_finding) return unsaved_finding + def reconcile_vulnerability_ids( + self, + finding: Finding, + ) -> Finding: + """ + Reconcile vulnerability IDs for an existing finding. + Checks if IDs have changed before updating to avoid unnecessary database operations. + Uses prefetched data if available, otherwise fetches efficiently. + + Args: + finding: The existing finding to reconcile vulnerability IDs for. + Must have unsaved_vulnerability_ids set. + + Returns: + The finding object + + """ + vulnerability_ids_to_process = finding.unsaved_vulnerability_ids or [] + + # Use prefetched data directly without triggering queries + existing_vuln_ids = {v.vulnerability_id for v in finding.vulnerability_id_set.all()} + new_vuln_ids = set(vulnerability_ids_to_process) + + # Early exit if unchanged + if existing_vuln_ids == new_vuln_ids: + logger.debug( + f"Skipping vulnerability_ids update for finding {finding.id} - " + f"vulnerability_ids unchanged: {sorted(existing_vuln_ids)}", + ) + return finding + + # Update if changed + finding_helper.save_vulnerability_ids(finding, vulnerability_ids_to_process, delete_existing=True) + return finding + def finding_post_processing( self, finding: Finding, finding_from_report: Finding, - ) -> None: + ) -> Finding: """ Save all associated objects to the finding after it has been saved for the purpose of foreign key restrictions @@ -715,10 +917,19 @@ def finding_post_processing( finding.unsaved_files = finding_from_report.unsaved_files self.process_files(finding) # Process vulnerability IDs - if finding_from_report.unsaved_vulnerability_ids: - finding.unsaved_vulnerability_ids = finding_from_report.unsaved_vulnerability_ids + # Copy unsaved_vulnerability_ids from the report finding to the existing finding + # so reconcile_vulnerability_ids can process them + # Always set it (even if empty list) so we can clear existing IDs when report has none + finding.unsaved_vulnerability_ids = finding_from_report.unsaved_vulnerability_ids or [] + # Store the current cve value to check if it changes + old_cve = finding.cve # legacy cve field has already been processed/set earlier - return self.process_vulnerability_ids(finding) + finding = self.reconcile_vulnerability_ids(finding) + # Save the finding only if the cve field was changed by save_vulnerability_ids + # This is temporary as the cve field will be phased out + if finding.cve != old_cve: + finding.save() + return finding def process_groups_for_all_findings( self, diff --git a/dojo/importers/endpoint_manager.py b/dojo/importers/endpoint_manager.py index ccfff345c40..f4b277d49fa 100644 --- a/dojo/importers/endpoint_manager.py +++ b/dojo/importers/endpoint_manager.py @@ -68,6 +68,7 @@ def mitigate_endpoint_status( ) -> None: """Mitigates all endpoint status objects that are supplied""" now = timezone.now() + to_update = [] for endpoint_status in endpoint_status_list: # Only mitigate endpoints that are actually active if endpoint_status.mitigated is False: @@ -75,7 +76,14 @@ def mitigate_endpoint_status( endpoint_status.last_modified = now endpoint_status.mitigated_by = user endpoint_status.mitigated = True - endpoint_status.save() + to_update.append(endpoint_status) + + if to_update: + Endpoint_Status.objects.bulk_update( + to_update, + ["mitigated", "mitigated_time", "last_modified", "mitigated_by"], + batch_size=1000, + ) @dojo_async_task @app.task() @@ -85,6 +93,8 @@ def reactivate_endpoint_status( **kwargs: dict, ) -> None: """Reactivate all endpoint status objects that are supplied""" + now = timezone.now() + to_update = [] for endpoint_status in endpoint_status_list: # Only reactivate endpoints that are actually mitigated if endpoint_status.mitigated: @@ -92,8 +102,15 @@ def reactivate_endpoint_status( endpoint_status.mitigated_by = None endpoint_status.mitigated_time = None endpoint_status.mitigated = False - endpoint_status.last_modified = timezone.now() - endpoint_status.save() + endpoint_status.last_modified = now + to_update.append(endpoint_status) + + if to_update: + Endpoint_Status.objects.bulk_update( + to_update, + ["mitigated", "mitigated_time", "mitigated_by", "last_modified"], + batch_size=1000, + ) def chunk_endpoints_and_disperse( self, @@ -149,17 +166,17 @@ def update_endpoint_status( # New finding is mitigated, so mitigate all old endpoints endpoint_status_to_mitigate = existing_finding_endpoint_status_list else: + # Convert to set for O(1) lookups instead of O(n) linear search + new_finding_endpoints_set = set(new_finding_endpoints_list) # Mitigate any endpoints in the old finding not found in the new finding - endpoint_status_to_mitigate = list( - filter( - lambda existing_finding_endpoint_status: existing_finding_endpoint_status.endpoint not in new_finding_endpoints_list, - existing_finding_endpoint_status_list), - ) + endpoint_status_to_mitigate = [ + eps for eps in existing_finding_endpoint_status_list + if eps.endpoint not in new_finding_endpoints_set + ] # Re-activate any endpoints in the old finding that are in the new finding - endpoint_status_to_reactivate = list( - filter( - lambda existing_finding_endpoint_status: existing_finding_endpoint_status.endpoint in new_finding_endpoints_list, - existing_finding_endpoint_status_list), - ) + endpoint_status_to_reactivate = [ + eps for eps in existing_finding_endpoint_status_list + if eps.endpoint in new_finding_endpoints_set + ] self.chunk_endpoints_and_reactivate(endpoint_status_to_reactivate) self.chunk_endpoints_and_mitigate(endpoint_status_to_mitigate, user) diff --git a/dojo/management/commands/list_top_tests.py b/dojo/management/commands/list_top_tests.py new file mode 100644 index 00000000000..7666d707101 --- /dev/null +++ b/dojo/management/commands/list_top_tests.py @@ -0,0 +1,119 @@ +from django.core.management.base import BaseCommand +from django.db.models import Count, Q + +from dojo.models import Test + + +class Command(BaseCommand): + help = "List the top 25 tests with the most findings" + + def add_arguments(self, parser): + parser.add_argument( + "--limit", + type=int, + default=25, + help="Number of tests to display (default: 25)", + ) + + def handle(self, *args, **options): + limit = options["limit"] + + # Annotate tests with finding counts + tests = ( + Test.objects.annotate( + total_findings=Count("finding", distinct=True), + active_findings=Count("finding", filter=Q(finding__active=True), distinct=True), + duplicate_findings=Count("finding", filter=Q(finding__duplicate=True), distinct=True), + ) + .filter(total_findings__gt=0) + .select_related("engagement", "engagement__product", "test_type") + .order_by("-total_findings")[:limit] + ) + + if not tests: + self.stdout.write(self.style.WARNING("No tests with findings found.")) + return + + # Calculate column widths + max_test_id_len = max( + (len(str(test.id)) for test in tests), + default=8, + ) + max_product_len = max( + (len(str(test.engagement.product.name)) for test in tests), + default=20, + ) + max_engagement_len = max( + (len(str(test.engagement.name)) for test in tests), + default=20, + ) + max_test_len = max( + (len(str(test.title or test.id)) for test in tests), + default=20, + ) + max_test_type_len = max( + (len(str(test.test_type.name)) for test in tests), + default=20, + ) + max_dedup_algo_len = max( + (len(str(test.deduplication_algorithm)) for test in tests), + default=20, + ) + + # Ensure minimum widths for readability + max_test_id_len = max(max_test_id_len, 8) + max_product_len = max(max_product_len, 20) + max_engagement_len = max(max_engagement_len, 20) + max_test_len = max(max_test_len, 20) + max_test_type_len = max(max_test_type_len, 20) + max_dedup_algo_len = max(max_dedup_algo_len, 20) + + # Header + header = ( + f"{'Test ID':<{max_test_id_len}} | " + f"{'Product':<{max_product_len}} | " + f"{'Engagement':<{max_engagement_len}} | " + f"{'Test':<{max_test_len}} | " + f"{'Test Type':<{max_test_type_len}} | " + f"{'Dedup Algorithm':<{max_dedup_algo_len}} | " + f"{'Total':>8} | " + f"{'Active':>8} | " + f"{'Duplicate':>10}" + ) + separator = "-" * len(header) + + self.stdout.write(self.style.SUCCESS(header)) + self.stdout.write(separator) + + # Data rows + for test in tests: + test_id = str(test.id) + product_name = str(test.engagement.product.name) + engagement_name = str(test.engagement.name) + test_name = str(test.title or f"Test #{test.id}") + test_type_name = str(test.test_type.name) + dedup_algo = str(test.deduplication_algorithm) + total = test.total_findings + active = test.active_findings + duplicate = test.duplicate_findings + + row = ( + f"{test_id:<{max_test_id_len}} | " + f"{product_name:<{max_product_len}} | " + f"{engagement_name:<{max_engagement_len}} | " + f"{test_name:<{max_test_len}} | " + f"{test_type_name:<{max_test_type_len}} | " + f"{dedup_algo:<{max_dedup_algo_len}} | " + f"{total:>8} | " + f"{active:>8} | " + f"{duplicate:>10}" + ) + self.stdout.write(row) + + # Summary + self.stdout.write(separator) + self.stdout.write( + self.style.SUCCESS( + f"\nDisplayed top {len(tests)} tests with findings.", + ), + ) diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 1f24bd653f1..e57e5dcdbee 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -283,6 +283,8 @@ DD_EDITABLE_MITIGATED_DATA=(bool, False), # new feature that tracks history across multiple reimports for the same test DD_TRACK_IMPORT_HISTORY=(bool, True), + # Batch size for reimport candidate matching (finding existing findings) + DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE=(int, 1000), # Batch size for import/reimport deduplication processing DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=(int, 1000), # Delete Auditlogs older than x month; -1 to keep all logs @@ -1716,6 +1718,7 @@ def saml2_attrib_map_format(din): DISABLE_FINDING_MERGE = env("DD_DISABLE_FINDING_MERGE") TRACK_IMPORT_HISTORY = env("DD_TRACK_IMPORT_HISTORY") +IMPORT_REIMPORT_MATCH_BATCH_SIZE = env("DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE") IMPORT_REIMPORT_DEDUPE_BATCH_SIZE = env("DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE") # ------------------------------------------------------------------------------ diff --git a/ruff.toml b/ruff.toml index 670a95b7b99..942bd47a00a 100644 --- a/ruff.toml +++ b/ruff.toml @@ -84,7 +84,7 @@ select = [ "PGH", "PLC", "PLE", - "PLR01", "PLR02", "PLR04", "PLR0915", "PLR1711", "PLR1704", "PLR1714", "PLR1716", "PLR172", "PLR173", "PLR2044", "PLR5", "PLR6104", "PLR6201", + "PLR01", "PLR02", "PLR04", "PLR0915", "PLR1711", "PLR1704", "PLR1714", "PLR1716", "PLR172", "PLR173", "PLR2044", "PLR5", "PLR6104", "PLR6201", "PLW", "UP", "FURB", @@ -118,6 +118,15 @@ preview = true "dojo/filters.py" = [ "A003", # ruff upgrade to 0.13.3 ] +"scripts/update_performance_test_counts.py" = [ + "S603", # subprocess.run without shell=True is safe for this script + "S604", # subprocess.run without shell=True is safe for this script + "S605", # subprocess.run without shell=True is safe for this script + "S606", # subprocess.run without shell=True is safe for this script + "S607", # subprocess.run without shell=True is safe for this script + "T201", # print statements are fine for console output scripts + "EXE001", # git won't commit the executable flag I don't know why +] [lint.flake8-boolean-trap] extend-allowed-calls = ["dojo.utils.get_system_setting"] diff --git a/scripts/update_performance_test_counts.py b/scripts/update_performance_test_counts.py new file mode 100644 index 00000000000..f7cfaae2859 --- /dev/null +++ b/scripts/update_performance_test_counts.py @@ -0,0 +1,667 @@ +#!/usr/bin/env python3 +""" +Script to update performance test query counts. +I tried to implement this as management command, but it would become very slow on parsing the test output. + +This script: +1. Runs all performance tests and captures actual query counts +2. Compares them with expected counts +3. Generates a report and optionally updates the test file +4. Provides a verification run + +How to run: + + # Default: Update the test file (uses TestDojoImporterPerformanceSmall by default) + python3 scripts/update_performance_test_counts.py + + # Or specify a different test class: + python3 scripts/update_performance_test_counts.py --test-class TestDojoImporterPerformanceSmall + + # Step 1: Run tests and generate report only (without updating) + python3 scripts/update_performance_test_counts.py --report-only + + # Step 2: Verify all tests pass + python3 scripts/update_performance_test_counts.py --verify + +The script defaults to TestDojoImporterPerformanceSmall if --test-class is not provided. +The script defaults to --update behavior if no action flag is provided. +""" + +import argparse +import re +import subprocess +import sys +from pathlib import Path + +# Path to the test file +TEST_FILE = Path(__file__).parent.parent / "unittests" / "test_importers_performance.py" + + +class TestCount: + + """Represents a test's expected and actual counts.""" + + def __init__(self, test_name: str, step: str, metric: str): + self.test_name = test_name + self.step = step + self.metric = metric + self.expected = None + self.actual = None + self.difference = None + + def __repr__(self): + return ( + f"TestCount({self.test_name}, {self.step}, {self.metric}, " + f"expected={self.expected}, actual={self.actual})" + ) + + +def extract_test_methods(test_class: str) -> list[str]: + """Extract all test method names from the test class.""" + if not TEST_FILE.exists(): + msg = f"Test file not found: {TEST_FILE}" + raise FileNotFoundError(msg) + + content = TEST_FILE.read_text() + + # Find the test class definition + class_pattern = re.compile( + rf"class {re.escape(test_class)}.*?(?=class |\Z)", + re.DOTALL, + ) + class_match = class_pattern.search(content) + if not class_match: + return [] + + class_content = class_match.group(0) + + # Find all test methods in this class + test_method_pattern = re.compile(r"def (test_\w+)\(") + return test_method_pattern.findall(class_content) + + +def run_test_method(test_class: str, test_method: str) -> tuple[str, int]: + """Run a specific test method and return the output and return code.""" + print(f"Running {test_class}.{test_method}...") + cmd = [ + "./run-unittest.sh", + "--test-case", + f"unittests.test_importers_performance.{test_class}.{test_method}", + ] + + # Run with real-time output streaming + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + cwd=Path(__file__).parent.parent, + ) + + output_lines = [] + for line in process.stdout: + print(line, end="") # Print in real-time + output_lines.append(line) + + process.wait() + output = "".join(output_lines) + + return output, process.returncode + + +def run_tests(test_class: str) -> tuple[str, int]: + """Run all tests in a test class and return the output and return code.""" + print(f"Running tests for {test_class}...") + cmd = [ + "./run-unittest.sh", + "--test-case", + f"unittests.test_importers_performance.{test_class}", + ] + + # Run with real-time output streaming + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + cwd=Path(__file__).parent.parent, + ) + + output_lines = [] + for line in process.stdout: + print(line, end="") # Print in real-time + output_lines.append(line) + + process.wait() + output = "".join(output_lines) + + return output, process.returncode + + +def check_test_execution_success(output: str, return_code: int) -> tuple[bool, str]: + """Check if tests executed successfully or failed due to other reasons.""" + # Check for migration errors + migration_error_patterns = [ + r"django\.db\.migrations\.exceptions\.", + r"Migration.*failed", + r"django\.core\.management\.base\.CommandError", + r"OperationalError", + r"ProgrammingError", + r"relation.*does not exist", + r"no such table", + ] + + for pattern in migration_error_patterns: + if re.search(pattern, output, re.IGNORECASE): + return False, f"Migration or database error detected: {pattern}" + + # Check if any tests actually ran + test_run_patterns = [ + r"Ran \d+ test", + r"OK", + r"FAILED", + r"FAIL:", + r"test_\w+", + ] + + tests_ran = any(re.search(pattern, output) for pattern in test_run_patterns) + + if not tests_ran and return_code != 0: + return False, "Tests did not run successfully. Check the output above for errors." + + # Check for other critical errors + critical_error_patterns = [ + r"ImportError", + r"ModuleNotFoundError", + r"SyntaxError", + r"IndentationError", + ] + + for pattern in critical_error_patterns: + if re.search(pattern, output): + return False, f"Critical error detected: {pattern}" + + return True, "" + + +def parse_test_output(output: str) -> list[TestCount]: + """Parse test output to extract actual vs expected counts.""" + counts = [] + + # Debug: Save a sample of the output to help diagnose parsing issues + if "FAIL:" in output: + # Extract failure sections for debugging + fail_sections = [] + lines = output.split("\n") + in_fail_section = False + fail_section = [] + for line in lines: + if "FAIL:" in line: + if fail_section: + fail_sections.append("\n".join(fail_section)) + fail_section = [line] + in_fail_section = True + elif in_fail_section: + fail_section.append(line) + # Stop collecting after AssertionError line or after 5 more lines + if "AssertionError:" in line or len(fail_section) > 6: + fail_sections.append("\n".join(fail_section)) + fail_section = [] + in_fail_section = False + if fail_section: + fail_sections.append("\n".join(fail_section)) + + if fail_sections: + print(f"\nšŸ” Found {len(fail_sections)} failure section(s) in output") + + # The test output format is: + # FAIL: test_name (step='import1', metric='queries') + # AssertionError: 118 != 120 : 118 queries executed, 120 expected + # OR for async tasks: + # FAIL: test_name (step='import1', metric='async_tasks') + # AssertionError: 7 != 8 : 7 async tasks executed, 8 expected + + # Pattern to match the full failure block: + # FAIL: test_name (full.path.to.test) (step='...', metric='...') + # AssertionError: actual != expected : actual ... executed, expected expected + # The test name may include the full path in parentheses, so we extract just the method name + failure_pattern = re.compile( + r"FAIL:\s+(test_\w+)\s+\([^)]+\)\s+\(step=['\"](\w+)['\"],\s*metric=['\"](\w+)['\"]\)\s*\n" + r".*?AssertionError:\s+(\d+)\s+!=\s+(\d+)\s+:\s+\d+\s+(?:queries|async tasks?)\s+executed,\s+\d+\s+expected", + re.MULTILINE | re.DOTALL, + ) + + for match in failure_pattern.finditer(output): + test_name = match.group(1) + step = match.group(2) + metric = match.group(3) + actual = int(match.group(4)) + expected = int(match.group(5)) + + count = TestCount(test_name, step, metric) + count.actual = actual + count.expected = expected + count.difference = expected - actual + counts.append(count) + + # Also try a simpler pattern in case the format is slightly different + if not counts: + # Look for lines with step/metric followed by AssertionError on nearby lines + lines = output.split("\n") + i = 0 + while i < len(lines): + line = lines[i] + + # Look for FAIL: test_name (may include full path in parentheses) + # Format: FAIL: test_name (full.path) (step='...', metric='...') + fail_match = re.search(r"FAIL:\s+(test_\w+)\s+\([^)]+\)\s+\(step=['\"](\w+)['\"],\s*metric=['\"](\w+)['\"]\)", line) + if fail_match: + test_name = fail_match.group(1) + step = fail_match.group(2) + metric = fail_match.group(3) + # Look ahead for AssertionError + for j in range(i, min(i + 15, len(lines))): + assertion_match = re.search( + r"AssertionError:\s+(\d+)\s+!=\s+(\d+)\s+:\s+\d+\s+(?:queries|async tasks?)\s+executed,\s+\d+\s+expected", + lines[j], + ) + + if assertion_match: + actual = int(assertion_match.group(1)) + expected = int(assertion_match.group(2)) + + count = TestCount(test_name, step, metric) + count.actual = actual + count.expected = expected + count.difference = expected - actual + counts.append(count) + break + i += 1 + + if counts: + print(f"\nšŸ“Š Parsed {len(counts)} count mismatch(es) from test output:") + for count in counts: + print(f" {count.test_name} - {count.step} {count.metric}: {count.actual} != {count.expected}") + elif "FAIL:" in output: + print("\nāš ļø WARNING: Found FAIL in output but couldn't parse any count mismatches!") + print("This might indicate a parsing issue. Check the output above.") + + return counts + + +def extract_expected_counts_from_file(test_class: str) -> dict[str, dict[str, int]]: + """Extract expected counts from the test file.""" + if not TEST_FILE.exists(): + msg = f"Test file not found: {TEST_FILE}" + raise FileNotFoundError(msg) + + content = TEST_FILE.read_text() + + # Pattern to match test method calls with expected counts + # Format: self._import_reimport_performance( + # expected_num_queries1=340, + # expected_num_async_tasks1=7, + # expected_num_queries2=238, + # expected_num_async_tasks2=18, + # expected_num_queries3=120, + # expected_num_async_tasks3=17, + # ) + # More flexible pattern that handles whitespace variations + pattern = re.compile( + r"def (test_\w+)\([^)]*\):.*?" + r"self\._import_reimport_performance\(\s*" + r"expected_num_queries1\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks1\s*=\s*(\d+)\s*,\s*" + r"expected_num_queries2\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks2\s*=\s*(\d+)\s*,\s*" + r"expected_num_queries3\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks3\s*=\s*(\d+)\s*,", + re.DOTALL, + ) + + expected_counts = {} + for match in pattern.finditer(content): + test_name = match.group(1) + expected_counts[test_name] = { + "import1_queries": int(match.group(2)), + "import1_async_tasks": int(match.group(3)), + "reimport1_queries": int(match.group(4)), + "reimport1_async_tasks": int(match.group(5)), + "reimport2_queries": int(match.group(6)), + "reimport2_async_tasks": int(match.group(7)), + } + + return expected_counts + + +def generate_report(counts: list[TestCount], expected_counts: dict[str, dict[str, int]]): + """Generate a report of differences.""" + if not counts: + print("āœ… All tests passed! No count differences found.") + return + + print("\n" + "=" * 80) + print("PERFORMANCE TEST COUNT DIFFERENCES REPORT") + print("=" * 80 + "\n") + + # Group by test name + by_test = {} + for count in counts: + if count.test_name not in by_test: + by_test[count.test_name] = [] + by_test[count.test_name].append(count) + + for test_name, test_counts in sorted(by_test.items()): + print(f"Test: {test_name}") + print("-" * 80) + for count in sorted(test_counts, key=lambda x: (x.step, x.metric)): + print( + f" {count.step:12} {count.metric:15} " + f"Expected: {count.expected:4} → Actual: {count.actual:4} " + f"(Difference: {count.difference:+3})", + ) + print() + + print("=" * 80) + print("\nTo update the test file, run:") + print(f" python scripts/update_performance_test_counts.py --test-class {test_name.split('_')[0]} --update") + print() + + +def update_test_file(counts: list[TestCount]): + """Update the test file with new expected counts.""" + if not counts: + print("No counts to update.") + return + + content = TEST_FILE.read_text() + + # Create a mapping of test_name -> step_metric -> new_value + updates = {} + for count in counts: + if count.test_name not in updates: + updates[count.test_name] = {} + step_metric = f"{count.step}_{count.metric}" + updates[count.test_name][step_metric] = count.actual + + # Map step_metric to parameter name for different methods + param_map_import_reimport = { + "import1_queries": "expected_num_queries1", + "import1_async_tasks": "expected_num_async_tasks1", + "reimport1_queries": "expected_num_queries2", + "reimport1_async_tasks": "expected_num_async_tasks2", + "reimport2_queries": "expected_num_queries3", + "reimport2_async_tasks": "expected_num_async_tasks3", + } + param_map_deduplication = { + "first_import_queries": "expected_num_queries1", + "first_import_async_tasks": "expected_num_async_tasks1", + "second_import_queries": "expected_num_queries2", + "second_import_async_tasks": "expected_num_async_tasks2", + } + + # Update each test method + for test_name, test_updates in updates.items(): + print(f" Updating {test_name}...") + # Find the test method boundaries + test_method_pattern = re.compile( + rf"(def {re.escape(test_name)}\([^)]*\):.*?)(?=def test_|\Z)", + re.DOTALL, + ) + test_match = test_method_pattern.search(content) + if not test_match: + print(f"āš ļø Warning: Could not find test method {test_name}") + continue + + test_method_content = test_match.group(1) + test_method_start = test_match.start() + test_method_end = test_match.end() + + # Try to find _import_reimport_performance call first + perf_call_pattern_import_reimport = re.compile( + r"(self\._import_reimport_performance\s*\(\s*)" + r"expected_num_queries1\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks1\s*=\s*(\d+)\s*,\s*" + r"expected_num_queries2\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks2\s*=\s*(\d+)\s*,\s*" + r"expected_num_queries3\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks3\s*=\s*(\d+)\s*," + r"(\s*\))", + re.DOTALL, + ) + + # Try to find _deduplication_performance call + perf_call_pattern_deduplication = re.compile( + r"(self\._deduplication_performance\s*\(\s*)" + r"expected_num_queries1\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks1\s*=\s*(\d+)\s*,\s*" + r"expected_num_queries2\s*=\s*(\d+)\s*,\s*" + r"expected_num_async_tasks2\s*=\s*(\d+)\s*," + r"(\s*\))", + re.DOTALL, + ) + + perf_match = perf_call_pattern_import_reimport.search(test_method_content) + method_type = "import_reimport" + param_map = param_map_import_reimport + param_order = [ + "import1_queries", + "import1_async_tasks", + "reimport1_queries", + "reimport1_async_tasks", + "reimport2_queries", + "reimport2_async_tasks", + ] + + if not perf_match: + perf_match = perf_call_pattern_deduplication.search(test_method_content) + if perf_match: + method_type = "deduplication" + param_map = param_map_deduplication + param_order = [ + "first_import_queries", + "first_import_async_tasks", + "second_import_queries", + "second_import_async_tasks", + ] + else: + print(f"āš ļø Warning: Could not find _import_reimport_performance or _deduplication_performance call in {test_name}") + continue + + # Get the indentation from the original call (first line after opening paren) + call_lines = test_method_content[perf_match.start():perf_match.end()].split("\n") + indent = "" + for line in call_lines: + if "expected_num_queries1" in line: + # Extract indentation (spaces before the parameter) + indent_match = re.match(r"(\s*)expected_num_queries1", line) + if indent_match: + indent = indent_match.group(1) + break + + # If we couldn't find indentation, use a default + if not indent: + indent = " " # 12 spaces default + + replacement_parts = [perf_match.group(1)] # Opening: "self._import_reimport_performance(" + updated_params = [] + for i, step_metric in enumerate(param_order): + param_name = param_map[step_metric] + old_value = int(perf_match.group(i + 2)) # +2 because group 1 is the opening + if step_metric in test_updates: + new_value = test_updates[step_metric] + if old_value != new_value: + updated_params.append(f"{param_name}: {old_value} → {new_value}") + else: + # Keep the existing value + new_value = old_value + + replacement_parts.append(f"{indent}{param_name}={new_value},") + + # Closing parenthesis - group number depends on method type + closing_group = 8 if method_type == "import_reimport" else 6 + replacement_parts.append(perf_match.group(closing_group)) # Closing parenthesis + replacement = "\n".join(replacement_parts) + + if updated_params: + print(f" Updated: {', '.join(updated_params)}") + + # Replace the method call within the test method content + updated_method_content = ( + test_method_content[: perf_match.start()] + + replacement + + test_method_content[perf_match.end() :] + ) + + # Replace the entire test method in the original content + content = content[:test_method_start] + updated_method_content + content[test_method_end:] + + # Write back to file + TEST_FILE.write_text(content) + print(f"āœ… Updated {TEST_FILE}") + print(f" Updated {len(counts)} count(s) across {len(updates)} test(s)") + + +def verify_tests(test_class: str) -> bool: + """Run tests to verify they all pass.""" + print(f"Verifying tests for {test_class}...") + output, return_code = run_tests(test_class) + + success, error_msg = check_test_execution_success(output, return_code) + if not success: + print(f"\nāŒ Test execution failed: {error_msg}") + return False + + counts = parse_test_output(output) + + if counts: + print("\nāŒ Some tests still have count mismatches:") + for count in counts: + print(f" {count.test_name} - {count.step} {count.metric}: " + f"expected {count.expected}, got {count.actual}") + return False + else: # noqa: RET505 + print("\nāœ… All tests pass!") + return True + + +def main(): + parser = argparse.ArgumentParser( + description="Update performance test query counts", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "--test-class", + required=False, + default="TestDojoImporterPerformanceSmall", + help="Test class name (e.g., TestDojoImporterPerformanceSmall). Defaults to TestDojoImporterPerformanceSmall if not provided.", + ) + parser.add_argument( + "--report-only", + action="store_true", + help="Only generate a report, don't update the file", + ) + parser.add_argument( + "--update", + action="store_true", + help="Update the test file with new counts (default behavior if no action flag is provided)", + ) + parser.add_argument( + "--verify", + action="store_true", + help="Run tests to verify they pass", + ) + + args = parser.parse_args() + + if args.report_only: + # Step 1: Run tests and generate report + # Run each test method individually + test_methods = extract_test_methods(args.test_class) + if not test_methods: + print(f"āš ļø No test methods found in {args.test_class}") + sys.exit(1) + + print(f"\nFound {len(test_methods)} test method(s) in {args.test_class}") + print("=" * 80) + + all_counts = [] + for test_method in test_methods: + print(f"\n{'=' * 80}") + output, return_code = run_test_method(args.test_class, test_method) + success, error_msg = check_test_execution_success(output, return_code) + if not success: + print(f"\nāš ļø Test execution failed for {test_method}: {error_msg}") + print("Skipping this test method...") + continue + + counts = parse_test_output(output) + if counts: + all_counts.extend(counts) + + expected_counts = extract_expected_counts_from_file(args.test_class) + generate_report(all_counts, expected_counts) + + elif args.verify: + # Step 3: Verify + success = verify_tests(args.test_class) + sys.exit(0 if success else 1) + + else: + # Default: Update the file (--update is the default behavior) + # Run each test method individually + test_methods = extract_test_methods(args.test_class) + if not test_methods: + print(f"āš ļø No test methods found in {args.test_class}") + sys.exit(1) + + print(f"\nFound {len(test_methods)} test method(s) in {args.test_class}") + print("=" * 80) + + all_counts = [] + for test_method in test_methods: + print(f"\n{'=' * 80}") + output, return_code = run_test_method(args.test_class, test_method) + success, error_msg = check_test_execution_success(output, return_code) + if not success: + print(f"\nāš ļø Test execution failed for {test_method}: {error_msg}") + print("Skipping this test method...") + continue + + counts = parse_test_output(output) + + # Check if test actually passed + test_passed = "OK" in output or ("Ran" in output and "FAILED" not in output and return_code == 0) + + if counts: + all_counts.extend(counts) + # Update immediately after each test + update_test_file(counts) + print(f"āš ļø {test_method}: Found {len(counts)} count mismatch(es) - updated file") + elif test_passed: + print(f"āœ… {test_method}: Test passed, all counts match") + elif return_code != 0: + # Test might have failed for other reasons + print(f"āš ļø {test_method}: Test failed (exit code {return_code}) but no count mismatches parsed") + print(" This might indicate a parsing issue or a different type of failure") + # Show a snippet of the output to help debug + fail_lines = [line for line in output.split("\n") if "FAIL" in line or "Error" in line or "Exception" in line] + if fail_lines: + print(" Relevant error lines:") + for line in fail_lines[:5]: + print(f" {line}") + + if all_counts: + print(f"\n{'=' * 80}") + print(f"āœ… Updated {len(all_counts)} count(s) across {len({c.test_name for c in all_counts})} test(s)") + print("\nNext step: Run --verify to ensure all tests pass") + else: + print(f"\n{'=' * 80}") + print("\nāœ… No differences found. All tests are already up to date.") + + +if __name__ == "__main__": + main() diff --git a/unittests/scans/anchore_grype/check_all_fields_different_ids_fabricated.json b/unittests/scans/anchore_grype/check_all_fields_different_ids_fabricated.json new file mode 100644 index 00000000000..8520e8828dc --- /dev/null +++ b/unittests/scans/anchore_grype/check_all_fields_different_ids_fabricated.json @@ -0,0 +1,167 @@ +{ + "matches": [ + { + "vulnerability": { + "id": "GHSA-v6rh-hp5x-86rv", + "dataSource": "https://github.com/advisories/GHSA-v6rh-hp5x-86rv", + "namespace": "github:python", + "severity": "High", + "urls": [ + "https://github.com/advisories/GHSA-v6rh-hp5x-86rv" + ], + "description": "Potential bypass of an upstream access control based on URL paths in Django", + "cvss": [], + "fix": { + "versions": [ + "3.2.10" + ], + "state": "fixed" + }, + "advisories": [] + }, + "relatedVulnerabilities": [ + { + "id": "CVE-2021-1234", + "dataSource": "https://nvd.nist.gov/vuln/detail/CVE-2021-1234", + "namespace": "nvd", + "severity": "High", + "urls": [ + "https://example.com/cve-2021-1234" + ], + "description": "A different CVE for testing vulnerability ID changes", + "cvss": [ + { + "version": "3.1", + "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + "metrics": { + "baseScore": 9.8, + "exploitabilityScore": 3.9, + "impactScore": 5.9 + }, + "vendorMetadata": {} + } + ] + }, + { + "id": "CVE-2021-5678", + "dataSource": "https://nvd.nist.gov/vuln/detail/CVE-2021-5678", + "namespace": "nvd", + "severity": "Medium", + "urls": [ + "https://example.com/cve-2021-5678" + ], + "description": "Another different CVE for testing vulnerability ID changes", + "cvss": [ + { + "version": "3.1", + "vector": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L", + "metrics": { + "baseScore": 6.3, + "exploitabilityScore": 3.9, + "impactScore": 2.4 + }, + "vendorMetadata": {} + } + ] + } + ], + "matchDetails": [ + { + "matcher": "python-matcher", + "searchedBy": { + "language": "python", + "namespace": "github:python" + }, + "found": { + "versionConstraint": ">=3.2,<3.2.10 (python)" + } + } + ], + "artifact": { + "name": "Django", + "version": "3.2.9", + "type": "python", + "locations": [ + { + "path": "/usr/local/lib/python3.8/site-packages/Django-3.2.9.dist-info/METADATA", + "layerID": "sha256:b1d4455cf82b15a50b006fe87bd29f694c8f9155456253eb67fdd155b5edcf4a" + } + ], + "language": "python", + "licenses": [ + "BSD-3-Clause" + ], + "cpes": [ + "cpe:2.3:a:django_software_foundation:Django:3.2.9:*:*:*:*:*:*:*" + ], + "purl": "pkg:pypi/Django@3.2.9", + "metadata": null + } + } + ], + "source": { + "type": "image", + "target": { + "userInput": "vulnerable-image:latest", + "imageID": "sha256:ce9898fd214aef9c994a42624b09056bdce3ff4a8e3f68dc242d967b80fcbeee", + "manifestDigest": "sha256:9d8825ab20ac86b40eb71495bece1608a302fb180384740697a28c2b0a5a0fc6", + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "tags": [ + "vulnerable-image:latest" + ], + "imageSize": 707381791, + "layers": [] + } + }, + "distro": { + "name": "debian", + "version": "10", + "idLike": "" + }, + "descriptor": { + "name": "grype", + "version": "0.28.0", + "configuration": { + "configPath": "", + "output": "json", + "file": "", + "output-template-file": "", + "quiet": false, + "check-for-app-update": true, + "only-fixed": false, + "scope": "Squashed", + "log": { + "structured": false, + "level": "", + "file": "" + }, + "db": { + "cache-dir": "/home/user/.cache/grype/db", + "update-url": "https://toolbox-data.anchore.io/grype/databases/listing.json", + "ca-cert": "", + "auto-update": true, + "validate-by-hash-on-start": false + }, + "dev": { + "profile-cpu": false, + "profile-mem": false + }, + "fail-on-severity": "", + "registry": { + "insecure-skip-tls-verify": false, + "insecure-use-http": false, + "auth": [] + }, + "ignore": null, + "exclude": [] + }, + "db": { + "built": "2021-12-24T08:14:02Z", + "schemaVersion": 3, + "location": "/home/user/.cache/grype/db/3", + "checksum": "sha256:6c4777e1acea787e5335ccee6b5e4562cd1767b9cca138c07e0802efb2a74162", + "error": null + } + } + } + diff --git a/unittests/scans/anchore_grype/check_all_fields_no_ids_fabricated.json b/unittests/scans/anchore_grype/check_all_fields_no_ids_fabricated.json new file mode 100644 index 00000000000..d4371d3a478 --- /dev/null +++ b/unittests/scans/anchore_grype/check_all_fields_no_ids_fabricated.json @@ -0,0 +1,122 @@ +{ + "matches": [ + { + "vulnerability": { + "id": "GHSA-v6rh-hp5x-86rv", + "dataSource": "https://github.com/advisories/GHSA-v6rh-hp5x-86rv", + "namespace": "github:python", + "severity": "High", + "urls": [ + "https://github.com/advisories/GHSA-v6rh-hp5x-86rv" + ], + "description": "Potential bypass of an upstream access control based on URL paths in Django", + "cvss": [], + "fix": { + "versions": [ + "3.2.10" + ], + "state": "fixed" + }, + "advisories": [] + }, + "relatedVulnerabilities": [], + "matchDetails": [ + { + "matcher": "python-matcher", + "searchedBy": { + "language": "python", + "namespace": "github:python" + }, + "found": { + "versionConstraint": ">=3.2,<3.2.10 (python)" + } + } + ], + "artifact": { + "name": "Django", + "version": "3.2.9", + "type": "python", + "locations": [ + { + "path": "/usr/local/lib/python3.8/site-packages/Django-3.2.9.dist-info/METADATA", + "layerID": "sha256:b1d4455cf82b15a50b006fe87bd29f694c8f9155456253eb67fdd155b5edcf4a" + } + ], + "language": "python", + "licenses": [ + "BSD-3-Clause" + ], + "cpes": [ + "cpe:2.3:a:django_software_foundation:Django:3.2.9:*:*:*:*:*:*:*" + ], + "purl": "pkg:pypi/Django@3.2.9", + "metadata": null + } + } + ], + "source": { + "type": "image", + "target": { + "userInput": "vulnerable-image:latest", + "imageID": "sha256:ce9898fd214aef9c994a42624b09056bdce3ff4a8e3f68dc242d967b80fcbeee", + "manifestDigest": "sha256:9d8825ab20ac86b40eb71495bece1608a302fb180384740697a28c2b0a5a0fc6", + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "tags": [ + "vulnerable-image:latest" + ], + "imageSize": 707381791, + "layers": [] + } + }, + "distro": { + "name": "debian", + "version": "10", + "idLike": "" + }, + "descriptor": { + "name": "grype", + "version": "0.28.0", + "configuration": { + "configPath": "", + "output": "json", + "file": "", + "output-template-file": "", + "quiet": false, + "check-for-app-update": true, + "only-fixed": false, + "scope": "Squashed", + "log": { + "structured": false, + "level": "", + "file": "" + }, + "db": { + "cache-dir": "/home/user/.cache/grype/db", + "update-url": "https://toolbox-data.anchore.io/grype/databases/listing.json", + "ca-cert": "", + "auto-update": true, + "validate-by-hash-on-start": false + }, + "dev": { + "profile-cpu": false, + "profile-mem": false + }, + "fail-on-severity": "", + "registry": { + "insecure-skip-tls-verify": false, + "insecure-use-http": false, + "auth": [] + }, + "ignore": null, + "exclude": [] + }, + "db": { + "built": "2021-12-24T08:14:02Z", + "schemaVersion": 3, + "location": "/home/user/.cache/grype/db/3", + "checksum": "sha256:6c4777e1acea787e5335ccee6b5e4562cd1767b9cca138c07e0802efb2a74162", + "error": null + } + } + } + diff --git a/unittests/scans/anchore_grype/check_all_fields_with_ids_fabricated.json b/unittests/scans/anchore_grype/check_all_fields_with_ids_fabricated.json new file mode 100644 index 00000000000..028bd2c7b7b --- /dev/null +++ b/unittests/scans/anchore_grype/check_all_fields_with_ids_fabricated.json @@ -0,0 +1,158 @@ +{ + "matches": [ + { + "vulnerability": { + "id": "GHSA-v6rh-hp5x-86rv", + "dataSource": "https://github.com/advisories/GHSA-v6rh-hp5x-86rv", + "namespace": "github:python", + "severity": "High", + "urls": [ + "https://github.com/advisories/GHSA-v6rh-hp5x-86rv" + ], + "description": "Potential bypass of an upstream access control based on URL paths in Django", + "cvss": [], + "fix": { + "versions": [ + "3.2.10" + ], + "state": "fixed" + }, + "advisories": [] + }, + "relatedVulnerabilities": [ + { + "id": "CVE-2021-44420", + "dataSource": "https://nvd.nist.gov/vuln/detail/CVE-2021-44420", + "namespace": "nvd", + "severity": "High", + "urls": [ + "https://docs.djangoproject.com/en/3.2/releases/security/", + "https://www.openwall.com/lists/oss-security/2021/12/07/1", + "https://www.djangoproject.com/weblog/2021/dec/07/security-releases/", + "https://groups.google.com/forum/#!forum/django-announce" + ], + "description": "In Django 2.2 before 2.2.25, 3.1 before 3.1.14, and 3.2 before 3.2.10, HTTP requests for URLs with trailing newlines could bypass upstream access control based on URL paths.", + "cvss": [ + { + "version": "2.0", + "vector": "AV:N/AC:L/Au:N/C:P/I:P/A:P", + "metrics": { + "baseScore": 7.5, + "exploitabilityScore": 10, + "impactScore": 6.4 + }, + "vendorMetadata": {} + }, + { + "version": "3.1", + "vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:L/A:L", + "metrics": { + "baseScore": 7.3, + "exploitabilityScore": 3.9, + "impactScore": 3.4 + }, + "vendorMetadata": {} + } + ] + } + ], + "matchDetails": [ + { + "matcher": "python-matcher", + "searchedBy": { + "language": "python", + "namespace": "github:python" + }, + "found": { + "versionConstraint": ">=3.2,<3.2.10 (python)" + } + } + ], + "artifact": { + "name": "Django", + "version": "3.2.9", + "type": "python", + "locations": [ + { + "path": "/usr/local/lib/python3.8/site-packages/Django-3.2.9.dist-info/METADATA", + "layerID": "sha256:b1d4455cf82b15a50b006fe87bd29f694c8f9155456253eb67fdd155b5edcf4a" + } + ], + "language": "python", + "licenses": [ + "BSD-3-Clause" + ], + "cpes": [ + "cpe:2.3:a:django_software_foundation:Django:3.2.9:*:*:*:*:*:*:*" + ], + "purl": "pkg:pypi/Django@3.2.9", + "metadata": null + } + } + ], + "source": { + "type": "image", + "target": { + "userInput": "vulnerable-image:latest", + "imageID": "sha256:ce9898fd214aef9c994a42624b09056bdce3ff4a8e3f68dc242d967b80fcbeee", + "manifestDigest": "sha256:9d8825ab20ac86b40eb71495bece1608a302fb180384740697a28c2b0a5a0fc6", + "mediaType": "application/vnd.docker.distribution.manifest.v2+json", + "tags": [ + "vulnerable-image:latest" + ], + "imageSize": 707381791, + "layers": [] + } + }, + "distro": { + "name": "debian", + "version": "10", + "idLike": "" + }, + "descriptor": { + "name": "grype", + "version": "0.28.0", + "configuration": { + "configPath": "", + "output": "json", + "file": "", + "output-template-file": "", + "quiet": false, + "check-for-app-update": true, + "only-fixed": false, + "scope": "Squashed", + "log": { + "structured": false, + "level": "", + "file": "" + }, + "db": { + "cache-dir": "/home/user/.cache/grype/db", + "update-url": "https://toolbox-data.anchore.io/grype/databases/listing.json", + "ca-cert": "", + "auto-update": true, + "validate-by-hash-on-start": false + }, + "dev": { + "profile-cpu": false, + "profile-mem": false + }, + "fail-on-severity": "", + "registry": { + "insecure-skip-tls-verify": false, + "insecure-use-http": false, + "auth": [] + }, + "ignore": null, + "exclude": [] + }, + "db": { + "built": "2021-12-24T08:14:02Z", + "schemaVersion": 3, + "location": "/home/user/.cache/grype/db/3", + "checksum": "sha256:6c4777e1acea787e5335ccee6b5e4562cd1767b9cca138c07e0802efb2a74162", + "error": null + } + } + } + diff --git a/unittests/test_import_reimport.py b/unittests/test_import_reimport.py index f51a462045a..ed00e015189 100644 --- a/unittests/test_import_reimport.py +++ b/unittests/test_import_reimport.py @@ -102,12 +102,18 @@ def __init__(self, *args, **kwargs): self.anchore_grype_file_name = get_unit_tests_scans_path("anchore_grype") / "check_all_fields.json" self.anchore_grype_file_name_fix_not_available = get_unit_tests_scans_path("anchore_grype") / "fix_not_available.json" self.anchore_grype_file_name_fix_available = get_unit_tests_scans_path("anchore_grype") / "fix_available.json" + self.anchore_grype_file_name_with_ids_fabricated = get_unit_tests_scans_path("anchore_grype") / "check_all_fields_with_ids_fabricated.json" + self.anchore_grype_file_name_no_ids_fabricated = get_unit_tests_scans_path("anchore_grype") / "check_all_fields_no_ids_fabricated.json" + self.anchore_grype_file_name_different_ids_fabricated = get_unit_tests_scans_path("anchore_grype") / "check_all_fields_different_ids_fabricated.json" self.anchore_grype_scan_type = "Anchore Grype" self.checkmarx_one_open_and_false_positive = get_unit_tests_scans_path("checkmarx_one") / "one-open-one-false-positive.json" self.checkmarx_one_two_false_positive = get_unit_tests_scans_path("checkmarx_one") / "two-false-positive.json" self.scan_type_checkmarx_one = "Checkmarx One Scan" + self.bandit_large_file = get_unit_tests_scans_path("bandit") / "many_vulns.json" + self.scan_type_bandit = "Bandit Scan" + # import zap scan, testing: # - import # - active/verifed = True @@ -1693,6 +1699,78 @@ def test_import_reimport_vulnerability_ids(self): self.assertEqual("GHSA-v6rh-hp5x-86rv", findings[3].vulnerability_ids[0]) self.assertEqual("CVE-2021-44420", findings[3].vulnerability_ids[1]) + def test_import_reimport_clear_vulnerability_ids(self): + """Test that vulnerability IDs are cleared when reimporting with no IDs""" + # Import scan with vulnerability IDs + import0 = self.import_scan_with_params(self.anchore_grype_file_name_with_ids_fabricated, scan_type=self.anchore_grype_scan_type) + + test_id = import0["test"] + test = Test.objects.get(id=test_id) + findings = Finding.objects.filter(test=test) + self.assertEqual(1, len(findings)) + finding = findings[0] + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.cve) + self.assertEqual(2, len(finding.vulnerability_ids)) + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.vulnerability_ids[0]) + self.assertEqual("CVE-2021-44420", finding.vulnerability_ids[1]) + + # Reimport with no vulnerability IDs - should clear existing IDs + test_type = Test_Type.objects.get(name=self.anchore_grype_scan_type) + reimport_test = Test( + engagement=test.engagement, + test_type=test_type, + scan_type=self.anchore_grype_scan_type, + target_start=datetime.now(timezone.get_current_timezone()), + target_end=datetime.now(timezone.get_current_timezone()), + ) + reimport_test.save() + + self.reimport_scan_with_params(reimport_test.id, self.anchore_grype_file_name_no_ids_fabricated, scan_type=self.anchore_grype_scan_type) + findings = Finding.objects.filter(test=reimport_test) + self.assertEqual(1, len(findings)) + finding = findings[0] + # After clearing, only the primary vuln_id should remain (GHSA-v6rh-hp5x-86rv) + # because the parser always includes the primary vulnerability.id + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.cve) + self.assertEqual(1, len(finding.vulnerability_ids)) + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.vulnerability_ids[0]) + + def test_import_reimport_change_vulnerability_ids(self): + """Test that vulnerability IDs are updated when reimporting with different IDs""" + # Import scan with initial vulnerability IDs + import0 = self.import_scan_with_params(self.anchore_grype_file_name_with_ids_fabricated, scan_type=self.anchore_grype_scan_type) + + test_id = import0["test"] + test = Test.objects.get(id=test_id) + findings = Finding.objects.filter(test=test) + self.assertEqual(1, len(findings)) + finding = findings[0] + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.cve) + self.assertEqual(2, len(finding.vulnerability_ids)) + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.vulnerability_ids[0]) + self.assertEqual("CVE-2021-44420", finding.vulnerability_ids[1]) + + # Reimport with different vulnerability IDs - should update existing IDs + test_type = Test_Type.objects.get(name=self.anchore_grype_scan_type) + reimport_test = Test( + engagement=test.engagement, + test_type=test_type, + scan_type=self.anchore_grype_scan_type, + target_start=datetime.now(timezone.get_current_timezone()), + target_end=datetime.now(timezone.get_current_timezone()), + ) + reimport_test.save() + + self.reimport_scan_with_params(reimport_test.id, self.anchore_grype_file_name_different_ids_fabricated, scan_type=self.anchore_grype_scan_type) + findings = Finding.objects.filter(test=reimport_test) + self.assertEqual(1, len(findings)) + finding = findings[0] + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.cve) + self.assertEqual(3, len(finding.vulnerability_ids)) + self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.vulnerability_ids[0]) + self.assertEqual("CVE-2021-1234", finding.vulnerability_ids[1]) + self.assertEqual("CVE-2021-5678", finding.vulnerability_ids[2]) + def test_import_reimport_fix_available(self): import0 = self.import_scan_with_params(self.anchore_grype_file_name_fix_not_available, scan_type=self.anchore_grype_scan_type) test_id = import0["test"] @@ -2033,6 +2111,348 @@ def test_reimport_set_scan_date_parser_sets_date(self): date = findings["results"][0]["date"] self.assertEqual(date, "2006-12-26") + @override_settings( + IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=200, + IMPORT_REIMPORT_MATCH_BATCH_SIZE=200, + ) + def test_batch_reimport_large_bandit_file(self): + """ + Test that batch reimport produces identical results to non-batch mode (which we simulate via large max batch size setting). + + Step 1: Import scan (baseline), assess active count, duplicate count + Step 2: Reimport scan (same test), assess active count, duplicate count + Step 3: Import scan in NEW engagement with batch_size=50, assess active and duplicate equal to step 1 + Step 4: Reimport scan in same new engagement (batch_size=50), assess active and duplicate equal to step 2 + """ + # Step 1: Baseline import (default batch size) + # Create engagement first and set deduplication_on_engagement before import + product_type1, _ = Product_Type.objects.get_or_create(name="PT Bandit Baseline") + product1, _ = Product.objects.get_or_create(name="P Bandit Baseline", prod_type=product_type1) + engagement1 = Engagement.objects.create( + name="E Bandit Baseline", + product=product1, + target_start=timezone.now(), + target_end=timezone.now(), + ) + engagement1.deduplication_on_engagement = True + engagement1.save() + engagement1_id = engagement1.id + + import1 = self.import_scan_with_params( + self.bandit_large_file, + scan_type=self.scan_type_bandit, + engagement=engagement1_id, + product_type_name=None, + product_name=None, + engagement_name=None, + auto_create_context=False, + ) + test1_id = import1["test"] + + # Assess step 1: active count, duplicate count + step1_active = Finding.objects.filter( + test__engagement_id=engagement1_id, + active=True, + duplicate=False, + ).count() + step1_duplicate = Finding.objects.filter( + test__engagement_id=engagement1_id, + duplicate=True, + ).count() + + # Assert step 1 specific counts + # Each assertion is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + with self.subTest(step=1, metric="active"): + self.assertEqual(step1_active, 213, "Step 1 active count") + with self.subTest(step=1, metric="duplicate"): + self.assertEqual(step1_duplicate, 1, "Step 1 duplicate count") + + # Step 2: Reimport scan (same test) + self.reimport_scan_with_params( + test1_id, + self.bandit_large_file, + scan_type=self.scan_type_bandit, + ) + + # Assess step 2: active count, duplicate count + step2_active = Finding.objects.filter( + test__engagement_id=engagement1_id, + active=True, + duplicate=False, + ).count() + step2_duplicate = Finding.objects.filter( + test__engagement_id=engagement1_id, + duplicate=True, + ).count() + + # Assert step 2 specific counts + # Each assertion is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + with self.subTest(step=2, metric="active"): + self.assertEqual(step2_active, 213, "Step 2 active count") + with self.subTest(step=2, metric="duplicate"): + self.assertEqual(step2_duplicate, 1, "Step 2 duplicate count") + + # Step 3: Import scan in NEW engagement with batch_size=50 + with override_settings( + IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=50, + IMPORT_REIMPORT_MATCH_BATCH_SIZE=50, + ): + # Create engagement first and set deduplication_on_engagement before import + product_type2, _ = Product_Type.objects.get_or_create(name="PT Bandit Batch") + product2, _ = Product.objects.get_or_create(name="P Bandit Batch", prod_type=product_type2) + engagement2 = Engagement.objects.create( + name="E Bandit Batch", + product=product2, + target_start=timezone.now(), + target_end=timezone.now(), + ) + engagement2.deduplication_on_engagement = True + engagement2.save() + engagement2_id = engagement2.id + + import2 = self.import_scan_with_params( + self.bandit_large_file, + scan_type=self.scan_type_bandit, + engagement=engagement2_id, + product_type_name=None, + product_name=None, + engagement_name=None, + auto_create_context=False, + ) + test2_id = import2["test"] + + # Assess step 3: active and duplicate should equal step 1 + step3_active = Finding.objects.filter( + test__engagement_id=engagement2_id, + active=True, + duplicate=False, + ).count() + step3_duplicate = Finding.objects.filter( + test__engagement_id=engagement2_id, + duplicate=True, + ).count() + + # Each assertion is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + with self.subTest(step=3, metric="active"): + self.assertEqual( + step3_active, + step1_active, + "Step 3 active count should equal step 1 (baseline import)", + ) + with self.subTest(step=3, metric="duplicate"): + self.assertEqual( + step3_duplicate, + step1_duplicate, + "Step 3 duplicate count should equal step 1 (baseline import)", + ) + + # Step 4: Reimport scan in same new engagement (batch_size=50) + self.reimport_scan_with_params( + test2_id, + self.bandit_large_file, + scan_type=self.scan_type_bandit, + ) + + # Assess step 4: active and duplicate should equal step 2 + step4_active = Finding.objects.filter( + test__engagement_id=engagement2_id, + active=True, + duplicate=False, + ).count() + step4_duplicate = Finding.objects.filter( + test__engagement_id=engagement2_id, + duplicate=True, + ).count() + + # Each assertion is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + with self.subTest(step=4, metric="active"): + self.assertEqual( + step4_active, + step2_active, + "Step 4 active count should equal step 2 (baseline reimport)", + ) + with self.subTest(step=4, metric="duplicate"): + self.assertEqual( + step4_duplicate, + step2_duplicate, + "Step 4 duplicate count should equal step 2 (baseline reimport)", + ) + + def test_batch_deduplication_large_bandit_file(self): + """ + Test that batch deduplication produces identical results to non-batch mode (which we simulate via large max batch size setting). + + Step 1: Import scan (baseline), assess active count, duplicate count + Step 2: Import scan again (same engagement), assess active count, duplicate count + Step 3: Import scan in NEW engagement with batch_size=50, assess active and duplicate equal to step 1 + Step 4: Import scan again in same new engagement (batch_size=50), assess active and duplicate equal to step 2 + """ + # Step 1: Baseline import (default batch size) + # Create engagement first and set deduplication_on_engagement before import + product_type1, _ = Product_Type.objects.get_or_create(name="PT Bandit Baseline Dedupe") + product1, _ = Product.objects.get_or_create(name="P Bandit Baseline Dedupe", prod_type=product_type1) + engagement1 = Engagement.objects.create( + name="E Bandit Baseline Dedupe", + product=product1, + target_start=timezone.now(), + target_end=timezone.now(), + ) + engagement1.deduplication_on_engagement = True + engagement1.save() + engagement1_id = engagement1.id + + self.import_scan_with_params( + self.bandit_large_file, + scan_type=self.scan_type_bandit, + engagement=engagement1_id, + product_type_name=None, + product_name=None, + engagement_name=None, + auto_create_context=False, + ) + + # Assess step 1: active count, duplicate count + step1_active = Finding.objects.filter( + test__engagement_id=engagement1_id, + active=True, + duplicate=False, + ).count() + step1_duplicate = Finding.objects.filter( + test__engagement_id=engagement1_id, + duplicate=True, + ).count() + + # Assert step 1 specific counts + self.assertEqual(step1_active, 213, "Step 1 active count") + self.assertEqual(step1_duplicate, 1, "Step 1 duplicate count") + + # Step 2: Import scan again (same engagement) + self.import_scan_with_params( + self.bandit_large_file, + scan_type=self.scan_type_bandit, + engagement=engagement1_id, + product_type_name=None, + product_name=None, + engagement_name=None, + auto_create_context=False, + ) + + # Assess step 2: active count, duplicate count + step2_active = Finding.objects.filter( + test__engagement_id=engagement1_id, + active=True, + duplicate=False, + ).count() + step2_duplicate = Finding.objects.filter( + test__engagement_id=engagement1_id, + duplicate=True, + ).count() + + # Assert step 2 specific counts + self.assertEqual(step2_active, 213, "Step 2 active count") + self.assertEqual(step2_duplicate, 215, "Step 2 duplicate count") + + # Step 3: Import scan in NEW engagement with batch_size=50 + with override_settings( + IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=50, + IMPORT_REIMPORT_MATCH_BATCH_SIZE=50, + ): + # Create engagement first and set deduplication_on_engagement before import + product_type2, _ = Product_Type.objects.get_or_create(name="PT Bandit Batch Dedupe") + product2, _ = Product.objects.get_or_create(name="P Bandit Batch Dedupe", prod_type=product_type2) + engagement2 = Engagement.objects.create( + name="E Bandit Batch Dedupe", + product=product2, + target_start=timezone.now(), + target_end=timezone.now(), + ) + engagement2.deduplication_on_engagement = True + engagement2.save() + engagement2_id = engagement2.id + + self.import_scan_with_params( + self.bandit_large_file, + scan_type=self.scan_type_bandit, + engagement=engagement2_id, + product_type_name=None, + product_name=None, + engagement_name=None, + auto_create_context=False, + ) + + # Assess step 3: active and duplicate should equal step 1 + step3_active = Finding.objects.filter( + test__engagement_id=engagement2_id, + active=True, + duplicate=False, + ).count() + step3_duplicate = Finding.objects.filter( + test__engagement_id=engagement2_id, + duplicate=True, + ).count() + + # Each assertion is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + with self.subTest(step=3, metric="active"): + self.assertEqual( + step3_active, + step1_active, + "Step 3 active count should equal step 1 (baseline import)", + ) + with self.subTest(step=3, metric="duplicate"): + self.assertEqual( + step3_duplicate, + step1_duplicate, + "Step 3 duplicate count should equal step 1 (baseline import)", + ) + + # Step 4: Import scan again in same new engagement (batch_size=50) + self.import_scan_with_params( + self.bandit_large_file, + scan_type=self.scan_type_bandit, + engagement=engagement2_id, + product_type_name=None, + product_name=None, + engagement_name=None, + auto_create_context=False, + ) + + # Assess step 4: active and duplicate should equal step 2 + step4_active = Finding.objects.filter( + test__engagement_id=engagement2_id, + active=True, + duplicate=False, + ).count() + step4_duplicate = Finding.objects.filter( + test__engagement_id=engagement2_id, + duplicate=True, + ).count() + + # Each assertion is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + with self.subTest(step=4, metric="active"): + self.assertEqual( + step4_active, + step2_active, + "Step 4 active count should equal step 2 (baseline second import)", + ) + with self.subTest(step=4, metric="duplicate"): + self.assertEqual( + step4_duplicate, + step2_duplicate, + "Step 4 duplicate count should equal step 2 (baseline second import)", + ) + @override_settings( IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=50, IMPORT_REIMPORT_MATCH_BATCH_SIZE=50, diff --git a/unittests/test_importers_importer.py b/unittests/test_importers_importer.py index cc5fb342df7..6e526ec0d92 100644 --- a/unittests/test_importers_importer.py +++ b/unittests/test_importers_importer.py @@ -7,7 +7,17 @@ from rest_framework.test import APIClient from dojo.importers.default_importer import DefaultImporter -from dojo.models import Development_Environment, Engagement, Finding, Product, Product_Type, Test, User +from dojo.importers.default_reimporter import DefaultReImporter +from dojo.models import ( + Development_Environment, + Engagement, + Finding, + Product, + Product_Type, + Test, + User, + Vulnerability_Id, +) from dojo.tools.gitlab_sast.parser import GitlabSastParser from dojo.tools.sarif.parser import SarifParser from dojo.utils import get_object_or_none @@ -553,8 +563,7 @@ def create_default_data(self): "scan_type": NPM_AUDIT_SCAN_TYPE, } - @patch("dojo.importers.base_importer.Vulnerability_Id", autospec=True) - def test_handle_vulnerability_ids_references_and_cve(self, mock): + def test_handle_vulnerability_ids_references_and_cve(self): # Why doesn't this test use the test db and query for one? vulnerability_ids = ["CVE", "REF-1", "REF-2"] finding = Finding() @@ -562,7 +571,7 @@ def test_handle_vulnerability_ids_references_and_cve(self, mock): finding.test = self.test finding.reporter = self.testuser finding.save() - DefaultImporter(**self.importer_data).process_vulnerability_ids(finding) + DefaultImporter(**self.importer_data).store_vulnerability_ids(finding) self.assertEqual("CVE", finding.vulnerability_ids[0]) self.assertEqual("CVE", finding.cve) @@ -571,8 +580,7 @@ def test_handle_vulnerability_ids_references_and_cve(self, mock): self.assertEqual("REF-2", finding.vulnerability_ids[2]) finding.delete() - @patch("dojo.importers.base_importer.Vulnerability_Id", autospec=True) - def test_handle_no_vulnerability_ids_references_and_cve(self, mock): + def test_handle_no_vulnerability_ids_references_and_cve(self): vulnerability_ids = ["CVE"] finding = Finding() finding.test = self.test @@ -580,22 +588,21 @@ def test_handle_no_vulnerability_ids_references_and_cve(self, mock): finding.save() finding.unsaved_vulnerability_ids = vulnerability_ids - DefaultImporter(**self.importer_data).process_vulnerability_ids(finding) + DefaultImporter(**self.importer_data).store_vulnerability_ids(finding) self.assertEqual("CVE", finding.vulnerability_ids[0]) self.assertEqual("CVE", finding.cve) self.assertEqual(vulnerability_ids, finding.unsaved_vulnerability_ids) finding.delete() - @patch("dojo.importers.base_importer.Vulnerability_Id", autospec=True) - def test_handle_vulnerability_ids_references_and_no_cve(self, mock): + def test_handle_vulnerability_ids_references_and_no_cve(self): vulnerability_ids = ["REF-1", "REF-2"] finding = Finding() finding.test = self.test finding.reporter = self.testuser finding.save() finding.unsaved_vulnerability_ids = vulnerability_ids - DefaultImporter(**self.importer_data).process_vulnerability_ids(finding) + DefaultImporter(**self.importer_data).store_vulnerability_ids(finding) self.assertEqual("REF-1", finding.vulnerability_ids[0]) self.assertEqual("REF-1", finding.cve) @@ -603,14 +610,87 @@ def test_handle_vulnerability_ids_references_and_no_cve(self, mock): self.assertEqual("REF-2", finding.vulnerability_ids[1]) finding.delete() - @patch("dojo.importers.base_importer.Vulnerability_Id", autospec=True) - def test_no_handle_vulnerability_ids_references_and_no_cve(self, mock): + def test_no_handle_vulnerability_ids_references_and_no_cve(self): finding = Finding() finding.test = self.test finding.reporter = self.testuser finding.save() - DefaultImporter(**self.importer_data).process_vulnerability_ids(finding) + DefaultImporter(**self.importer_data).store_vulnerability_ids(finding) self.assertEqual(finding.cve, None) self.assertEqual(finding.unsaved_vulnerability_ids, None) self.assertEqual(finding.vulnerability_ids, []) finding.delete() + + def test_clear_vulnerability_ids_on_empty_list(self): + """Test that vulnerability IDs are cleared when an empty list is provided""" + # Create a finding with existing vulnerability IDs + finding = Finding() + finding.test = self.test + finding.reporter = self.testuser + finding.save() + + # Add some vulnerability IDs + Vulnerability_Id.objects.create(finding=finding, vulnerability_id="CVE-2020-1234") + Vulnerability_Id.objects.create(finding=finding, vulnerability_id="CVE-2020-5678") + finding.cve = "CVE-2020-1234" + finding.save() + + # Verify initial state + self.assertEqual(2, len(finding.vulnerability_ids)) + self.assertEqual("CVE-2020-1234", finding.cve) + + # Process with empty list - should clear all IDs + finding.unsaved_vulnerability_ids = [] + DefaultReImporter(test=self.test, environment=self.importer_data["environment"], scan_type=self.importer_data["scan_type"]).reconcile_vulnerability_ids(finding) + # Save the finding to persist the cve=None change + finding.save() + + # Get fresh finding from database to avoid cached property issues + finding = Finding.objects.get(pk=finding.pk) + + # Verify IDs are cleared + self.assertEqual(0, len(finding.vulnerability_ids)) + self.assertEqual(None, finding.cve) + # Verify no Vulnerability_Id objects exist for this finding + self.assertEqual(0, Vulnerability_Id.objects.filter(finding=finding).count()) + finding.delete() + + def test_change_vulnerability_ids_on_reimport(self): + """Test that vulnerability IDs are updated when different IDs are provided""" + # Create a finding with existing vulnerability IDs + finding = Finding() + finding.test = self.test + finding.reporter = self.testuser + finding.save() + + # Add initial vulnerability IDs + Vulnerability_Id.objects.create(finding=finding, vulnerability_id="CVE-2020-1234") + Vulnerability_Id.objects.create(finding=finding, vulnerability_id="CVE-2020-5678") + finding.cve = "CVE-2020-1234" + finding.save() + + # Verify initial state + self.assertEqual(2, len(finding.vulnerability_ids)) + self.assertEqual("CVE-2020-1234", finding.vulnerability_ids[0]) + self.assertEqual("CVE-2020-5678", finding.vulnerability_ids[1]) + self.assertEqual("CVE-2020-1234", finding.cve) + + # Process with different IDs - should replace old IDs + new_vulnerability_ids = ["CVE-2021-9999", "GHSA-xxxx-yyyy"] + finding.unsaved_vulnerability_ids = new_vulnerability_ids + DefaultReImporter(test=self.test, environment=self.importer_data["environment"], scan_type=self.importer_data["scan_type"]).reconcile_vulnerability_ids(finding) + # Save the finding to persist the cve change + finding.save() + + # Get fresh finding from database to avoid cached property issues + finding = Finding.objects.get(pk=finding.pk) + + # Verify old IDs are removed and new IDs are present + self.assertEqual(2, len(finding.vulnerability_ids)) + self.assertEqual("CVE-2021-9999", finding.vulnerability_ids[0]) + self.assertEqual("GHSA-xxxx-yyyy", finding.vulnerability_ids[1]) + self.assertEqual("CVE-2021-9999", finding.cve) + # Verify only new Vulnerability_Id objects exist + vuln_ids = list(Vulnerability_Id.objects.filter(finding=finding).values_list("vulnerability_id", flat=True)) + self.assertEqual(set(new_vulnerability_ids), set(vuln_ids)) + finding.delete() diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index 1963c522f7e..1e7b05d8fe5 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -1,3 +1,23 @@ +""" +Performance tests for importers. + +These tests verify that import and reimport operations maintain acceptable query counts +and async task counts to prevent performance regressions. + +Counts can be updated via the Python script at scripts/update_performance_test_counts.py. +However, counts must be verified to ensure no implicit performance regressions are introduced. +When counts change, review the differences carefully to determine if they represent: +- Legitimate optimizations (counts decreasing) +- Acceptable changes due to feature additions (counts increasing with justification) +- Unintended performance regressions (counts increasing without clear reason) + +Always verify updated counts by: +1. Running the update script to see the differences +2. Reviewing the changes to understand why counts changed +3. Running the verification step to ensure all tests pass +4. Investigating any unexpected increases in query or task counts +""" + import logging from contextlib import contextmanager @@ -33,7 +53,9 @@ STACK_HAWK_SCAN_TYPE = "StackHawk HawkScan" -class TestDojoImporterPerformance(DojoTestCase): +class TestDojoImporterPerformanceBase(DojoTestCase): + + """Base class for performance tests with shared setup and helper methods.""" def setUp(self): super().setUp() @@ -77,96 +99,161 @@ def _assertNumAsyncTask(self, num): ) logger.debug(msg) - def _import_reimport_performance(self, expected_num_queries1, expected_num_async_tasks1, expected_num_queries2, expected_num_async_tasks2, expected_num_queries3, expected_num_async_tasks3): - """ - Log output can be quite large as when the assertNumQueries fails, all queries are printed. - It could be usefule to capture the output in `less`: - ./run-unittest.sh --test-case unittests.test_importers_performance.TestDojoImporterPerformance 2>&1 | less - Then search for `expected` to find the lines where the expected number of queries is printed. - Or you can use `grep` to filter the output: - ./run-unittest.sh --test-case unittests.test_importers_performance.TestDojoImporterPerformance 2>&1 | grep expected -B 10 - """ + def _create_test_objects(self, product_name, engagement_name): + """Helper method to create test product, engagement, lead user, and environment.""" product_type, _created = Product_Type.objects.get_or_create(name="test") product, _created = Product.objects.get_or_create( - name="TestDojoDefaultImporter", + name=product_name, prod_type=product_type, ) engagement, _created = Engagement.objects.get_or_create( - name="Test Create Engagement", + name=engagement_name, product=product, target_start=timezone.now(), target_end=timezone.now(), ) lead, _ = User.objects.get_or_create(username="admin") environment, _ = Development_Environment.objects.get_or_create(name="Development") + return product, engagement, lead, environment + + def _import_reimport_performance( + self, + expected_num_queries1, + expected_num_async_tasks1, + expected_num_queries2, + expected_num_async_tasks2, + expected_num_queries3, + expected_num_async_tasks3, + scan_file1, + scan_file2, + scan_file3, + scan_type, + product_name, + engagement_name, + ): + """ + Test import/reimport/reimport performance with specified scan files and scan type. + Log output can be quite large as when the assertNumQueries fails, all queries are printed. + """ + _, engagement, lead, environment = self._create_test_objects( + product_name, + engagement_name, + ) - # first import the subset which missed one finding and a couple of endpoints on some of the findings - with ( + # First import + # Each assertion context manager is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + # Nested with statements are intentional - each assertion needs its own subTest wrapper. + with ( # noqa: SIM117 self.subTest("import1"), impersonate(Dojo_User.objects.get(username="admin")), - self.assertNumQueries(expected_num_queries1), - self._assertNumAsyncTask(expected_num_async_tasks1), - STACK_HAWK_SUBSET_FILENAME.open(encoding="utf-8") as scan, + scan_file1.open(encoding="utf-8") as scan, ): - import_options = { - "user": lead, - "lead": lead, - "scan_date": None, - "environment": environment, - "minimum_severity": "Info", - "active": True, - "verified": True, - "sync": True, - "scan_type": STACK_HAWK_SCAN_TYPE, - "engagement": engagement, - "tags": ["performance-test", "tag-in-param", "go-faster"], - "apply_tags_to_findings": True, - } - importer = DefaultImporter(**import_options) - test, _, _len_new_findings, _len_closed_findings, _, _, _ = importer.process_scan(scan) - - # use reimport with the full report so it add a finding and some endpoints - with ( + with self.subTest(step="import1", metric="queries"): + with self.assertNumQueries(expected_num_queries1): + with self.subTest(step="import1", metric="async_tasks"): + with self._assertNumAsyncTask(expected_num_async_tasks1): + import_options = { + "user": lead, + "lead": lead, + "scan_date": None, + "environment": environment, + "minimum_severity": "Info", + "active": True, + "verified": True, + "sync": True, + "scan_type": scan_type, + "engagement": engagement, + "tags": ["performance-test", "tag-in-param", "go-faster"], + "apply_tags_to_findings": True, + } + importer = DefaultImporter(**import_options) + test, _, _len_new_findings, _len_closed_findings, _, _, _ = importer.process_scan(scan) + + # Second import (reimport) + # Each assertion context manager is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + # Nested with statements are intentional - each assertion needs its own subTest wrapper. + with ( # noqa: SIM117 self.subTest("reimport1"), impersonate(Dojo_User.objects.get(username="admin")), - self.assertNumQueries(expected_num_queries2), - self._assertNumAsyncTask(expected_num_async_tasks2), - STACK_HAWK_FILENAME.open(encoding="utf-8") as scan, + scan_file2.open(encoding="utf-8") as scan, ): - reimport_options = { - "test": test, - "user": lead, - "lead": lead, - "scan_date": None, - "minimum_severity": "Info", - "active": True, - "verified": True, - "sync": True, - "scan_type": STACK_HAWK_SCAN_TYPE, - "tags": ["performance-test-reimport", "reimport-tag-in-param", "reimport-go-faster"], - "apply_tags_to_findings": True, - } - reimporter = DefaultReImporter(**reimport_options) - test, _, _len_new_findings, _len_closed_findings, _, _, _ = reimporter.process_scan(scan) - - # use reimport with the subset again to close a finding and mitigate some endpoints - with ( + with self.subTest(step="reimport1", metric="queries"): + with self.assertNumQueries(expected_num_queries2): + with self.subTest(step="reimport1", metric="async_tasks"): + with self._assertNumAsyncTask(expected_num_async_tasks2): + reimport_options = { + "test": test, + "user": lead, + "lead": lead, + "scan_date": None, + "minimum_severity": "Info", + "active": True, + "verified": True, + "sync": True, + "scan_type": scan_type, + "tags": ["performance-test-reimport", "reimport-tag-in-param", "reimport-go-faster"], + "apply_tags_to_findings": True, + } + reimporter = DefaultReImporter(**reimport_options) + test, _, _len_new_findings, _len_closed_findings, _, _, _ = reimporter.process_scan(scan) + + # Third import (reimport again) + # Each assertion context manager is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + # Nested with statements are intentional - each assertion needs its own subTest wrapper. + with ( # noqa: SIM117 self.subTest("reimport2"), impersonate(Dojo_User.objects.get(username="admin")), - self.assertNumQueries(expected_num_queries3), - self._assertNumAsyncTask(expected_num_async_tasks3), - STACK_HAWK_SUBSET_FILENAME.open(encoding="utf-8") as scan, + scan_file3.open(encoding="utf-8") as scan, ): - reimport_options = { - "test": test, - "user": lead, - "lead": lead, - "scan_date": None, - "minimum_severity": "Info", - "active": True, - "verified": True, - "sync": True, - "scan_type": STACK_HAWK_SCAN_TYPE, - } - reimporter = DefaultReImporter(**reimport_options) - test, _, _len_new_findings, _len_closed_findings, _, _, _ = reimporter.process_scan(scan) + with self.subTest(step="reimport2", metric="queries"): + with self.assertNumQueries(expected_num_queries3): + with self.subTest(step="reimport2", metric="async_tasks"): + with self._assertNumAsyncTask(expected_num_async_tasks3): + reimport_options = { + "test": test, + "user": lead, + "lead": lead, + "scan_date": None, + "minimum_severity": "Info", + "active": True, + "verified": True, + "sync": True, + "scan_type": scan_type, + } + reimporter = DefaultReImporter(**reimport_options) + test, _, _len_new_findings, _len_closed_findings, _, _, _ = reimporter.process_scan(scan) + + +class TestDojoImporterPerformanceSmall(TestDojoImporterPerformanceBase): + + """Performance tests using small sample files (StackHawk, ~6 findings).""" + + def _import_reimport_performance(self, expected_num_queries1, expected_num_async_tasks1, expected_num_queries2, expected_num_async_tasks2, expected_num_queries3, expected_num_async_tasks3): + """ + Log output can be quite large as when the assertNumQueries fails, all queries are printed. + It could be usefule to capture the output in `less`: + ./run-unittest.sh --test-case unittests.test_importers_performance.TestDojoImporterPerformanceSmall 2>&1 | less + Then search for `expected` to find the lines where the expected number of queries is printed. + Or you can use `grep` to filter the output: + ./run-unittest.sh --test-case unittests.test_importers_performance.TestDojoImporterPerformanceSmall 2>&1 | grep expected -B 10 + """ + return super()._import_reimport_performance( + expected_num_queries1, + expected_num_async_tasks1, + expected_num_queries2, + expected_num_async_tasks2, + expected_num_queries3, + expected_num_async_tasks3, + scan_file1=STACK_HAWK_SUBSET_FILENAME, + scan_file2=STACK_HAWK_FILENAME, + scan_file3=STACK_HAWK_SUBSET_FILENAME, + scan_type=STACK_HAWK_SCAN_TYPE, + product_name="TestDojoDefaultImporter", + engagement_name="Test Create Engagement", + ) @override_settings(ENABLE_AUDITLOG=True) def test_import_reimport_reimport_performance_pghistory_async(self): @@ -180,9 +267,9 @@ def test_import_reimport_reimport_performance_pghistory_async(self): self._import_reimport_performance( expected_num_queries1=306, expected_num_async_tasks1=7, - expected_num_queries2=281, + expected_num_queries2=232, expected_num_async_tasks2=18, - expected_num_queries3=170, + expected_num_queries3=114, expected_num_async_tasks3=17, ) @@ -200,11 +287,11 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._import_reimport_performance( - expected_num_queries1=312, + expected_num_queries1=313, expected_num_async_tasks1=6, - expected_num_queries2=287, + expected_num_queries2=239, expected_num_async_tasks2=17, - expected_num_queries3=176, + expected_num_queries3=121, expected_num_async_tasks3=16, ) @@ -223,11 +310,11 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr self.system_settings(enable_product_grade=True) self._import_reimport_performance( - expected_num_queries1=314, + expected_num_queries1=315, expected_num_async_tasks1=8, - expected_num_queries2=289, + expected_num_queries2=241, expected_num_async_tasks2=19, - expected_num_queries3=178, + expected_num_queries3=123, expected_num_async_tasks3=18, ) @@ -238,61 +325,64 @@ def _deduplication_performance(self, expected_num_queries1, expected_num_async_t The second import should result in all findings being marked as duplicates. This is different from reimport as we create a new test each time. """ - product_type, _created = Product_Type.objects.get_or_create(name="test") - product, _created = Product.objects.get_or_create( - name="TestDojoDeduplicationPerformance", - prod_type=product_type, + _, engagement, lead, environment = self._create_test_objects( + "TestDojoDeduplicationPerformance", + "Test Deduplication Performance Engagement", ) - engagement, _created = Engagement.objects.get_or_create( - name="Test Deduplication Performance Engagement", - product=product, - target_start=timezone.now(), - target_end=timezone.now(), - ) - lead, _ = User.objects.get_or_create(username="admin") - environment, _ = Development_Environment.objects.get_or_create(name="Development") # First import - all findings should be new - with ( + # Each assertion context manager is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + # Nested with statements are intentional - each assertion needs its own subTest wrapper. + with ( # noqa: SIM117 self.subTest("first_import"), impersonate(Dojo_User.objects.get(username="admin")), - self.assertNumQueries(expected_num_queries1), - self._assertNumAsyncTask(expected_num_async_tasks1), STACK_HAWK_FILENAME.open(encoding="utf-8") as scan, ): - import_options = { - "user": lead, - "lead": lead, - "scan_date": None, - "environment": environment, - "minimum_severity": "Info", - "active": True, - "verified": True, - "scan_type": STACK_HAWK_SCAN_TYPE, - "engagement": engagement, - } - importer = DefaultImporter(**import_options) - _, _, len_new_findings1, len_closed_findings1, _, _, _ = importer.process_scan(scan) + with self.subTest(step="first_import", metric="queries"): + with self.assertNumQueries(expected_num_queries1): + with self.subTest(step="first_import", metric="async_tasks"): + with self._assertNumAsyncTask(expected_num_async_tasks1): + import_options = { + "user": lead, + "lead": lead, + "scan_date": None, + "environment": environment, + "minimum_severity": "Info", + "active": True, + "verified": True, + "scan_type": STACK_HAWK_SCAN_TYPE, + "engagement": engagement, + } + importer = DefaultImporter(**import_options) + _, _, len_new_findings1, len_closed_findings1, _, _, _ = importer.process_scan(scan) # Second import - all findings should be duplicates - with ( + # Each assertion context manager is wrapped in its own subTest so that if one fails, the others still run. + # This allows us to see all count mismatches in a single test run, making it easier to fix + # all incorrect expected values at once rather than fixing them one at a time. + # Nested with statements are intentional - each assertion needs its own subTest wrapper. + with ( # noqa: SIM117 self.subTest("second_import"), impersonate(Dojo_User.objects.get(username="admin")), - self.assertNumQueries(expected_num_queries2), - self._assertNumAsyncTask(expected_num_async_tasks2), STACK_HAWK_FILENAME.open(encoding="utf-8") as scan, ): - import_options = { - "user": lead, - "lead": lead, - "scan_date": None, - "environment": environment, - "minimum_severity": "Info", - "active": True, - "verified": True, - "scan_type": STACK_HAWK_SCAN_TYPE, - "engagement": engagement, - } - importer = DefaultImporter(**import_options) - _, _, len_new_findings2, len_closed_findings2, _, _, _ = importer.process_scan(scan) + with self.subTest(step="second_import", metric="queries"): + with self.assertNumQueries(expected_num_queries2): + with self.subTest(step="second_import", metric="async_tasks"): + with self._assertNumAsyncTask(expected_num_async_tasks2): + import_options = { + "user": lead, + "lead": lead, + "scan_date": None, + "environment": environment, + "minimum_severity": "Info", + "active": True, + "verified": True, + "scan_type": STACK_HAWK_SCAN_TYPE, + "engagement": engagement, + } + importer = DefaultImporter(**import_options) + _, _, len_new_findings2, len_closed_findings2, _, _, _ = importer.process_scan(scan) # Log the results for analysis logger.debug(f"First import: {len_new_findings1} new findings, {len_closed_findings1} closed findings") @@ -364,8 +454,9 @@ def test_deduplication_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._deduplication_performance( - expected_num_queries1=281, + expected_num_queries1=282, expected_num_async_tasks1=7, - expected_num_queries2=245, + expected_num_queries2=246, expected_num_async_tasks2=7, + )