diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index a32fce13b..f8312824e 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -2753,15 +2753,14 @@ subscriber c@AgentClient {msgQ} = forever $ do cleanupManager :: AgentClient -> AM' () cleanupManager c@AgentClient {subQ} = do - delay <- asks (initialCleanupDelay . config) - liftIO $ threadDelay' delay - int <- asks (cleanupInterval . config) - ttl <- asks $ storedMsgDataTTL . config + AgentConfig {initialCleanupDelay, cleanupInterval = int, storedMsgDataTTL = ttl, cleanupBatchSize = limit} <- + asks config + liftIO $ threadDelay' initialCleanupDelay forever $ waitActive $ do run ERR deleteConns - run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl) - run ERR $ withStore' c (`deleteSndMsgsExpired` ttl) - run ERR $ withStore' c (`deleteRatchetKeyHashesExpired` ttl) + run ERR $ withStore' c $ \db -> deleteRcvMsgHashesExpired db ttl limit + run ERR $ withStore' c $ \db -> deleteSndMsgsExpired db ttl limit + run ERR $ withStore' c $ \db -> deleteRatchetKeyHashesExpired db ttl limit run ERR $ withStore' c (`deleteExpiredNtfTokensToDelete` ttl) run RFERR deleteRcvFilesExpired run RFERR deleteRcvFilesDeleted diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 57bc11e3c..e81240cc5 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -153,6 +153,7 @@ data AgentConfig = AgentConfig persistErrorInterval :: NominalDiffTime, initialCleanupDelay :: Int64, cleanupInterval :: Int64, + cleanupBatchSize :: Int, initialLogStatsDelay :: Int64, logStatsInterval :: Int64, cleanupStepInterval :: Int, @@ -224,7 +225,8 @@ defaultAgentConfig = quotaExceededTimeout = 7 * nominalDay, persistErrorInterval = 3, -- seconds initialCleanupDelay = 30 * 1000000, -- 30 seconds - cleanupInterval = 30 * 60 * 1000000, -- 30 minutes + cleanupInterval = 5 * 60 * 1000000, -- 5 minutes + cleanupBatchSize = 10000, initialLogStatsDelay = 10 * 1000000, -- 10 seconds logStatsInterval = 10 * 1000000, -- 10 seconds cleanupStepInterval = 200000, -- 200ms diff --git a/src/Simplex/Messaging/Agent/Store/AgentStore.hs b/src/Simplex/Messaging/Agent/Store/AgentStore.hs index 0469f09dd..6153505a5 100644 --- a/src/Simplex/Messaging/Agent/Store/AgentStore.hs +++ b/src/Simplex/Messaging/Agent/Store/AgentStore.hs @@ -1192,18 +1192,39 @@ countPendingSndDeliveries_ db connId msgId = do (Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ? AND failed = 0" (connId, msgId) pure cnt -deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO () -deleteRcvMsgHashesExpired db ttl = do +deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> Int -> IO () +deleteRcvMsgHashesExpired db ttl limit = do cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - DB.execute db "DELETE FROM encrypted_rcv_message_hashes WHERE created_at < ?" (Only cutoffTs) + DB.execute + db + [sql| + DELETE FROM encrypted_rcv_message_hashes + WHERE encrypted_rcv_message_hash_id IN ( + SELECT encrypted_rcv_message_hash_id + FROM encrypted_rcv_message_hashes + WHERE created_at < ? + ORDER BY created_at ASC + LIMIT ? + ) + |] + (cutoffTs, limit) -deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> IO () -deleteSndMsgsExpired db ttl = do +deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> Int -> IO () +deleteSndMsgsExpired db ttl limit = do cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime DB.execute db - "DELETE FROM messages WHERE internal_ts < ? AND internal_snd_id IS NOT NULL" - (Only cutoffTs) + [sql| + DELETE FROM messages + WHERE (conn_id, internal_id) IN ( + SELECT conn_id, internal_id + FROM messages + WHERE internal_ts < ? AND internal_snd_id IS NOT NULL + ORDER BY internal_ts ASC + LIMIT ? + ) + |] + (cutoffTs, limit) createRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> Maybe CR.RcvPrivRKEMParams -> IO () createRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 pqPrivKem = @@ -2411,10 +2432,22 @@ checkRatchetKeyHashExists db connId hash = ) (connId, Binary hash) -deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO () -deleteRatchetKeyHashesExpired db ttl = do +deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> Int -> IO () +deleteRatchetKeyHashesExpired db ttl limit = do cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs) + DB.execute + db + [sql| + DELETE FROM processed_ratchet_key_hashes + WHERE processed_ratchet_key_hash_id IN ( + SELECT processed_ratchet_key_hash_id + FROM processed_ratchet_key_hashes + WHERE created_at < ? + ORDER BY created_at ASC + LIMIT ? + ) + |] + (cutoffTs, limit) -- | returns all connection queues, the first queue is the primary one getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue)) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 577f46fb0..51d9a8adf 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -475,7 +475,7 @@ functionalAPITests ps = do testWaitDelivery ps it "should delete connection if message can'ps be delivered due to AUTH error" $ testWaitDeliveryAUTHErr ps - it "should delete connection by timeout even if message wasn'ps delivered" $ + it "should delete connection by timeout even if message wasn't delivered" $ testWaitDeliveryTimeout ps it "should delete connection by timeout, message in progress can be delivered" $ testWaitDeliveryTimeout2 ps @@ -814,7 +814,7 @@ runAgentClientStressTestConc n pqSupport sqSecured viaProxy alice bob baseId = r receive a bId mIdVar acc' = loop acc' >> liftIO drain where drain = - timeout 50000 (get a) + timeout 100000 (get a) >>= mapM_ (\case ("", _, QCONT) -> drain; r -> expectationFailure $ "unexpected: " <> show r) loop (0, 0, 0, 0) = pure () loop acc@(!s, !m, !r, !o) = diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 9a88865e3..c51079d5e 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -214,7 +214,7 @@ cfgMS msType = withStoreCfg (testServerStoreConfig msType) $ \serverStoreCfg -> ServerConfig { transports = [], smpHandshakeTimeout = 60000000, - tbqSize = 1, + tbqSize = 4, msgQueueQuota = 4, maxJournalMsgCount = 5, maxJournalStateLines = 2,