{- P2P protocol, Annex implementation - - Copyright 2016-2024 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} {-# LANGUAGE RankNTypes, FlexibleContexts #-} module P2P.Annex ( RunState(..) , mkRunState , P2PConnection(..) , runFullProto ) where import Annex.Common import Annex.Content import Annex.Transfer import Annex.ChangedRefs import P2P.Protocol import P2P.IO import Logs.Location import Types.NumCopies import Utility.Metered import Utility.MonotonicClock import Annex.Verify import Control.Monad.Free import Control.Concurrent.STM import Data.Time.Clock.POSIX -- Full interpreter for Proto, that can receive and send objects. runFullProto :: RunState -> P2PConnection -> Proto a -> Annex (Either ProtoFailure a) runFullProto runst conn = go where go :: RunProto Annex go (Pure v) = return (Right v) go (Free (Net n)) = runNet runst conn go n go (Free (Local l)) = runLocal runst go l runLocal :: RunState -> RunProto Annex -> LocalF (Proto a) -> Annex (Either ProtoFailure a) runLocal runst runner a = case a of TmpContentSize k next -> do tmp <- fromRepo $ gitAnnexTmpObjectLocation k size <- liftIO $ catchDefaultIO 0 $ getFileSize tmp runner (next (Len size)) FileSize f next -> do size <- liftIO $ catchDefaultIO 0 $ getFileSize (toRawFilePath f) runner (next (Len size)) ContentSize k next -> do let getsize = liftIO . catchMaybeIO . getFileSize size <- inAnnex' isJust Nothing getsize k runner (next (Len <$> size)) ReadContent k af o offset sender next -> do let proceed c = do r <- tryNonAsync c case r of Left e -> return $ Left $ ProtoFailureException e Right (Left e) -> return $ Left e Right (Right ok) -> runner (next ok) -- If the content is not present, or the transfer doesn't -- run for any other reason, the sender action still must -- be run, so is given empty and Invalid data. let fallback = runner (sender mempty (return Invalid)) v <- tryNonAsync $ prepSendAnnex k o case v of Right (Just (f, _sz, checkchanged)) -> proceed $ do -- alwaysUpload to allow multiple uploads of the same key. let runtransfer ti = transfer alwaysUpload k af Nothing $ \p -> sinkfile f offset checkchanged sender p ti checktransfer runtransfer fallback Right Nothing -> proceed fallback Left e -> return $ Left $ ProtoFailureException e StoreContent k af o l getb validitycheck next -> do -- This is the same as the retrievalSecurityPolicy of -- Remote.P2P and Remote.Git. let rsp = RetrievalAllKeysSecure v <- tryNonAsync $ do iv <- startVerifyKeyContentIncrementally DefaultVerify k let runtransfer ti = Right <$> transfer download' k af Nothing (\p -> logStatusAfter k $ getViaTmp rsp DefaultVerify k af Nothing $ \tmp -> storefile (fromRawFilePath tmp) o l getb iv validitycheck p ti) let fallback = return $ Left $ ProtoFailureMessage "transfer already in progress, or unable to take transfer lock" checktransfer runtransfer fallback case v of Left e -> return $ Left $ ProtoFailureException e Right (Left e) -> return $ Left e Right (Right ok) -> runner (next ok) StoreContentTo dest iv o l getb validitycheck next -> do v <- tryNonAsync $ do let runtransfer ti = Right <$> storefile dest o l getb iv validitycheck nullMeterUpdate ti let fallback = return $ Left $ ProtoFailureMessage "Transfer failed" checktransfer runtransfer fallback case v of Left e -> return $ Left $ ProtoFailureException e Right (Left e) -> return $ Left e Right (Right ok) -> runner (next ok) SendContentWith consumer getb validitycheck next -> do v <- tryNonAsync $ do let fallback = return $ Left $ ProtoFailureMessage "Transfer failed" let consumer' b ti = do validator <- consumer b indicatetransferred ti return validator runner getb >>= \case Left e -> giveup $ describeProtoFailure e Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case Left e -> return (Left e) Right validator -> runner validitycheck >>= \case Right v -> Right <$> validator v _ -> Right <$> validator Nothing case v of Left e -> return $ Left $ ProtoFailureException e Right (Left e) -> return $ Left e Right (Right ok) -> runner (next ok) SetPresent k u next -> do v <- tryNonAsync $ logChange k u InfoPresent case v of Left e -> return $ Left $ ProtoFailureException e Right () -> runner next CheckContentPresent k next -> do v <- tryNonAsync $ inAnnex k case v of Left e -> return $ Left $ ProtoFailureException e Right result -> runner (next result) RemoveContent k mts next -> do let cleanup = do logStatus k InfoMissing return True let checkts = case mts of Nothing -> return True Just ts -> do now <- liftIO currentMonotonicTimestamp return (now < ts) v <- tryNonAsync $ ifM (Annex.Content.inAnnex k) ( lockContentForRemoval k cleanup $ \contentlock -> ifM checkts ( do removeAnnex contentlock cleanup , return False ) , return True ) case v of Left e -> return $ Left $ ProtoFailureException e Right result -> runner (next result) TryLockContent k protoaction next -> do v <- tryNonAsync $ lockContentShared k (Just p2pDefaultLockContentRetentionDuration) $ \verifiedcopy -> case verifiedcopy of LockedCopy _ -> runner (protoaction True) _ -> runner (protoaction False) -- If locking fails, lockContentShared throws an exception. -- Let the peer know it failed. case v of Left _ -> runner $ do protoaction False next Right _ -> runner next WaitRefChange next -> case runst of Serving _ (Just h) _ -> do v <- tryNonAsync $ liftIO $ waitChangedRefs h case v of Left e -> return $ Left $ ProtoFailureException e Right changedrefs -> runner (next changedrefs) _ -> return $ Left $ ProtoFailureMessage "change notification not available" UpdateMeterTotalSize m sz next -> do liftIO $ setMeterTotalSize m sz runner next RunValidityCheck checkaction next -> runner . next =<< checkaction GetLocalCurrentTime next -> runner . next =<< liftIO getPOSIXTime where transfer mk k af sd ta = case runst of -- Update transfer logs when serving. -- Using noRetry because we're the sender. Serving theiruuid _ _ -> mk theiruuid k af sd noRetry ta noNotification -- Transfer logs are updated higher in the stack when -- a client. Client _ -> ta nullMeterUpdate storefile dest (Offset o) (Len l) getb incrementalverifier validitycheck p ti = do v <- runner getb case v of Right b -> do liftIO $ withBinaryFile dest ReadWriteMode $ \h -> do p' <- resumeVerifyFromOffset o incrementalverifier p h meteredWrite p' (writeVerifyChunk incrementalverifier h) b indicatetransferred ti rightsize <- do sz <- liftIO $ getFileSize (toRawFilePath dest) return (toInteger sz == l + o) runner validitycheck >>= \case Right (Just Valid) -> case incrementalverifier of Just iv | rightsize -> liftIO (finalizeIncrementalVerifier iv) >>= \case Nothing -> return (True, UnVerified) Just True -> return (True, Verified) Just False -> do verificationOfContentFailed (toRawFilePath dest) return (False, UnVerified) | otherwise -> return (False, UnVerified) Nothing -> return (rightsize, UnVerified) Right (Just Invalid) | l == 0 -> -- Special case, for when -- content was not -- available to send, -- which is indicated by -- sending 0 bytes and -- Invalid. return (False, UnVerified) _ -> do -- Invalid, or old protocol -- version. Validity is not -- known. Force content -- verification. return (rightsize, MustVerify) Left e -> giveup $ describeProtoFailure e sinkfile f (Offset o) checkchanged sender p ti = bracket setup cleanup go where setup = liftIO $ openBinaryFile f ReadMode cleanup = liftIO . hClose go h = do let p' = offsetMeterUpdate p (toBytesProcessed o) when (o /= 0) $ liftIO $ hSeek h AbsoluteSeek o b <- liftIO $ hGetContentsMetered h p' let validitycheck = local $ runValidityCheck $ checkchanged >>= return . \case False -> Invalid True -> Valid r <- runner (sender b validitycheck) indicatetransferred ti return r -- This allows using actions like download and viaTmp -- that may abort a transfer, and clean up the protocol after them. -- -- Runs an action that may make a transfer, passing a transfer -- indicator. The action should call indicatetransferred on it, -- only after it's actually sent/received the all data. -- -- If the action ends without having called indicatetransferred, -- runs the fallback action, which can close the protoocol -- connection or otherwise clean up after the transfer not having -- occurred. -- -- If the action throws an exception, the fallback is not run. checktransfer ta fallback = do ti <- liftIO $ newTVarIO False r <- ta ti ifM (liftIO $ atomically $ readTVar ti) ( return r , fallback ) indicatetransferred ti = liftIO $ atomically $ writeTVar ti True