Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2754,37 +2754,53 @@ subscriber c@AgentClient {msgQ} = forever $ do
cleanupManager :: AgentClient -> AM' ()
cleanupManager c@AgentClient {subQ} = do
delay <- asks (initialCleanupDelay . config)
liftIO $ threadDelay' delay
int <- asks (cleanupInterval . config)
ttl <- asks $ storedMsgDataTTL . config
forever $ waitActive $ do
run ERR deleteConns
run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl)
run ERR $ withStore' c (`deleteSndMsgsExpired` ttl)
run ERR $ withStore' c (`deleteRatchetKeyHashesExpired` ttl)
run ERR $ withStore' c (`deleteExpiredNtfTokensToDelete` ttl)
run RFERR deleteRcvFilesExpired
run RFERR deleteRcvFilesDeleted
run RFERR deleteRcvFilesTmpPaths
run SFERR deleteSndFilesExpired
run SFERR deleteSndFilesDeleted
run SFERR deleteSndFilesPrefixPaths
run SFERR deleteExpiredReplicasForDeletion
liftIO $ putStrLn $ "cleanupManager: starting, initial delay " <> show delay <> ", interval " <> show int <> ", TTL " <> show ttl
liftIO $ threadDelay' delay
liftIO $ putStrLn "cleanupManager: initial delay passed, starting cleanup loop"
forever $ waitActive "cleanup loop" $ do
liftIO $ putStrLn "cleanupManager: cleanup loop iteration started"
run ERR "deleteConns" deleteConns
run ERR "deleteRcvMsgHashesExpired" $ withStore' c (`deleteRcvMsgHashesExpired` ttl)
run ERR "deleteSndMsgsExpired" $ withStore' c (`deleteSndMsgsExpired` ttl)
run ERR "deleteRatchetKeyHashesExpired" $ withStore' c (`deleteRatchetKeyHashesExpired` ttl)
run ERR "deleteExpiredNtfTokensToDelete" $ withStore' c (`deleteExpiredNtfTokensToDelete` ttl)
run RFERR "deleteRcvFilesExpired" deleteRcvFilesExpired
run RFERR "deleteRcvFilesDeleted" deleteRcvFilesDeleted
run RFERR "deleteRcvFilesTmpPaths" deleteRcvFilesTmpPaths
run SFERR "deleteSndFilesExpired" deleteSndFilesExpired
run SFERR "deleteSndFilesDeleted" deleteSndFilesDeleted
run SFERR "deleteSndFilesPrefixPaths" deleteSndFilesPrefixPaths
run SFERR "deleteExpiredReplicasForDeletion" deleteExpiredReplicasForDeletion
liftIO $ putStrLn "cleanupManager: cleanup loop iteration completed, waiting for next interval"
liftIO $ threadDelay' int
where
run :: forall e. AEntityI e => (AgentErrorType -> AEvent e) -> AM () -> AM' ()
run err a = do
waitActive . runExceptT $ a `catchAllErrors` (notify "" . err)
run :: forall e. AEntityI e => (AgentErrorType -> AEvent e) -> String -> AM () -> AM' ()
run err name a = do
liftIO $ putStrLn $ "cleanupManager: running " <> name
waitActive name . runExceptT $ a `catchAllErrors` \e -> do
liftIO $ putStrLn $ "cleanupManager: " <> name <> " error: " <> show e
notify "" $ err e
liftIO $ putStrLn $ "cleanupManager: " <> name <> " completed"
step <- asks $ cleanupStepInterval . config
liftIO $ threadDelay step
-- we are catching it to avoid CRITICAL errors in tests when this is the only remaining handle to active
waitActive :: ReaderT Env IO a -> AM' ()
waitActive a = liftIO (E.tryAny $ waitUntilActive c) >>= either (\_ -> pure ()) (\_ -> void a)
waitActive :: String -> ReaderT Env IO a -> AM' ()
waitActive name a = liftIO (E.tryAny $ waitUntilActive c) >>= either (\e -> liftIO $ putStrLn $ "cleanupManager: waitActive skipped " <> name <> ": " <> show e) (\_ -> void a)
deleteConns =
withLock (deleteLock c) "cleanupManager" $ do
void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c
void $ withStore' c getDeletedWaitingDeliveryConnIds >>= deleteDeletedWaitingDeliveryConns c
liftIO $ putStrLn "deleteConns: getting deleted conn IDs"
deletedConnIds <- withStore' c getDeletedConnIds
liftIO $ putStrLn $ "deleteConns: found " <> show (length deletedConnIds) <> " deleted conns"
void $ deleteDeletedConns c deletedConnIds
liftIO $ putStrLn "deleteConns: getting deleted waiting delivery conn IDs"
deletedWaitingIds <- withStore' c getDeletedWaitingDeliveryConnIds
liftIO $ putStrLn $ "deleteConns: found " <> show (length deletedWaitingIds) <> " deleted waiting delivery conns"
void $ deleteDeletedWaitingDeliveryConns c deletedWaitingIds
liftIO $ putStrLn "deleteConns: deleting users without conns"
withStore' c deleteUsersWithoutConns >>= mapM_ (notify "" . DEL_USER)
liftIO $ putStrLn "deleteConns: done"
deleteRcvFilesExpired = do
rcvFilesTTL <- asks $ rcvFilesTTL . config
rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL)
Expand Down
9 changes: 8 additions & 1 deletion src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,15 +1195,20 @@ countPendingSndDeliveries_ db connId msgId = do
deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRcvMsgHashesExpired db ttl = do
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
putStrLn $ "deleteRcvMsgHashesExpired: cutoffTs = " <> show cutoffTs
DB.execute db "DELETE FROM encrypted_rcv_message_hashes WHERE created_at < ?" (Only cutoffTs)
putStrLn "deleteRcvMsgHashesExpired: done"

deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteSndMsgsExpired db ttl = do
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
ts <- getCurrentTime
let cutoffTs = addUTCTime (-ttl) ts
putStrLn $ "deleteSndMsgsExpired: currentTime = " <> show ts <> ", ttl = " <> show ttl <> ", cutoffTs = " <> show cutoffTs
DB.execute
db
"DELETE FROM messages WHERE internal_ts < ? AND internal_snd_id IS NOT NULL"
(Only cutoffTs)
putStrLn "deleteSndMsgsExpired: done"

createRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> Maybe CR.RcvPrivRKEMParams -> IO ()
createRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 pqPrivKem =
Expand Down Expand Up @@ -2414,7 +2419,9 @@ checkRatchetKeyHashExists db connId hash =
deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRatchetKeyHashesExpired db ttl = do
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
putStrLn $ "deleteRatchetKeyHashesExpired: cutoffTs = " <> show cutoffTs
DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs)
putStrLn "deleteRatchetKeyHashesExpired: done"

-- | returns all connection queues, the first queue is the primary one
getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue))
Expand Down
Loading