Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2753,15 +2753,14 @@ subscriber c@AgentClient {msgQ} = forever $ do

cleanupManager :: AgentClient -> AM' ()
cleanupManager c@AgentClient {subQ} = do
delay <- asks (initialCleanupDelay . config)
liftIO $ threadDelay' delay
int <- asks (cleanupInterval . config)
ttl <- asks $ storedMsgDataTTL . config
AgentConfig {initialCleanupDelay, cleanupInterval = int, storedMsgDataTTL = ttl, cleanupBatchSize = limit} <-
asks config
liftIO $ threadDelay' initialCleanupDelay
forever $ waitActive $ do
run ERR deleteConns
run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl)
run ERR $ withStore' c (`deleteSndMsgsExpired` ttl)
run ERR $ withStore' c (`deleteRatchetKeyHashesExpired` ttl)
run ERR $ withStore' c $ \db -> deleteRcvMsgHashesExpired db ttl limit
run ERR $ withStore' c $ \db -> deleteSndMsgsExpired db ttl limit
run ERR $ withStore' c $ \db -> deleteRatchetKeyHashesExpired db ttl limit
run ERR $ withStore' c (`deleteExpiredNtfTokensToDelete` ttl)
run RFERR deleteRcvFilesExpired
run RFERR deleteRcvFilesDeleted
Expand Down
4 changes: 3 additions & 1 deletion src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ data AgentConfig = AgentConfig
persistErrorInterval :: NominalDiffTime,
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
cleanupBatchSize :: Int,
initialLogStatsDelay :: Int64,
logStatsInterval :: Int64,
cleanupStepInterval :: Int,
Expand Down Expand Up @@ -224,7 +225,8 @@ defaultAgentConfig =
quotaExceededTimeout = 7 * nominalDay,
persistErrorInterval = 3, -- seconds
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
cleanupInterval = 5 * 60 * 1000000, -- 5 minutes
cleanupBatchSize = 10000,
initialLogStatsDelay = 10 * 1000000, -- 10 seconds
logStatsInterval = 10 * 1000000, -- 10 seconds
cleanupStepInterval = 200000, -- 200ms
Expand Down
53 changes: 43 additions & 10 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1192,18 +1192,39 @@ countPendingSndDeliveries_ db connId msgId = do
(Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ? AND failed = 0" (connId, msgId)
pure cnt

deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRcvMsgHashesExpired db ttl = do
deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> Int -> IO ()
deleteRcvMsgHashesExpired db ttl limit = do
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
DB.execute db "DELETE FROM encrypted_rcv_message_hashes WHERE created_at < ?" (Only cutoffTs)
DB.execute
db
[sql|
DELETE FROM encrypted_rcv_message_hashes
WHERE encrypted_rcv_message_hash_id IN (
SELECT encrypted_rcv_message_hash_id
FROM encrypted_rcv_message_hashes
WHERE created_at < ?
ORDER BY created_at ASC
LIMIT ?
)
|]
(cutoffTs, limit)

deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteSndMsgsExpired db ttl = do
deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> Int -> IO ()
deleteSndMsgsExpired db ttl limit = do
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
DB.execute
db
"DELETE FROM messages WHERE internal_ts < ? AND internal_snd_id IS NOT NULL"
(Only cutoffTs)
[sql|
DELETE FROM messages
WHERE (conn_id, internal_id) IN (
SELECT conn_id, internal_id
FROM messages
WHERE internal_ts < ? AND internal_snd_id IS NOT NULL
ORDER BY internal_ts ASC
LIMIT ?
)
|]
(cutoffTs, limit)

createRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> Maybe CR.RcvPrivRKEMParams -> IO ()
createRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 pqPrivKem =
Expand Down Expand Up @@ -2411,10 +2432,22 @@ checkRatchetKeyHashExists db connId hash =
)
(connId, Binary hash)

deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRatchetKeyHashesExpired db ttl = do
deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> Int -> IO ()
deleteRatchetKeyHashesExpired db ttl limit = do
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs)
DB.execute
db
[sql|
DELETE FROM processed_ratchet_key_hashes
WHERE processed_ratchet_key_hash_id IN (
SELECT processed_ratchet_key_hash_id
FROM processed_ratchet_key_hashes
WHERE created_at < ?
ORDER BY created_at ASC
LIMIT ?
)
|]
(cutoffTs, limit)

-- | returns all connection queues, the first queue is the primary one
getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue))
Expand Down
4 changes: 2 additions & 2 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ functionalAPITests ps = do
testWaitDelivery ps
it "should delete connection if message can'ps be delivered due to AUTH error" $
testWaitDeliveryAUTHErr ps
it "should delete connection by timeout even if message wasn'ps delivered" $
it "should delete connection by timeout even if message wasn't delivered" $
testWaitDeliveryTimeout ps
it "should delete connection by timeout, message in progress can be delivered" $
testWaitDeliveryTimeout2 ps
Expand Down Expand Up @@ -814,7 +814,7 @@ runAgentClientStressTestConc n pqSupport sqSecured viaProxy alice bob baseId = r
receive a bId mIdVar acc' = loop acc' >> liftIO drain
where
drain =
timeout 50000 (get a)
timeout 100000 (get a)
>>= mapM_ (\case ("", _, QCONT) -> drain; r -> expectationFailure $ "unexpected: " <> show r)
loop (0, 0, 0, 0) = pure ()
loop acc@(!s, !m, !r, !o) =
Expand Down
2 changes: 1 addition & 1 deletion tests/SMPClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ cfgMS msType = withStoreCfg (testServerStoreConfig msType) $ \serverStoreCfg ->
ServerConfig
{ transports = [],
smpHandshakeTimeout = 60000000,
tbqSize = 1,
tbqSize = 4,
msgQueueQuota = 4,
maxJournalMsgCount = 5,
maxJournalStateLines = 2,
Expand Down
Loading