diff --git a/Command/Export.hs b/Command/Export.hs index 4e1880c134..cb3943e2e1 100644 --- a/Command/Export.hs +++ b/Command/Export.hs @@ -220,7 +220,7 @@ performExport r ea db ek af contentsha loc = do let rollback = void $ performUnexport r ea db [ek] loc sendAnnex k rollback $ \f -> - metered Nothing k (return $ Just f) $ \m -> do + metered Nothing k (return $ Just f) $ \_ m -> do let m' = combineMeterUpdate pm m storer f k loc m' , do @@ -228,7 +228,7 @@ performExport r ea db ek af contentsha loc = do return False ) -- Sending a non-annexed file. - GitKey sha1k -> metered Nothing sha1k (return Nothing) $ \m -> + GitKey sha1k -> metered Nothing sha1k (return Nothing) $ \_ m -> withTmpFile "export" $ \tmp h -> do b <- catObject contentsha liftIO $ L.hPut h b diff --git a/Messages/Progress.hs b/Messages/Progress.hs index dcb7541017..2cdad16eda 100644 --- a/Messages/Progress.hs +++ b/Messages/Progress.hs @@ -24,46 +24,52 @@ import qualified System.Console.Concurrent as Console #endif {- Shows a progress meter while performing a transfer of a key. - - The action is passed a callback to use to update the meter. + - The action is passed the meter and a callback to use to update the meter. - - When the key's size is not known, the srcfile is statted to get the size. - This allows uploads of keys without size to still have progress - displayed. --} -metered :: Maybe MeterUpdate -> Key -> Annex (Maybe FilePath) -> (MeterUpdate -> Annex a) -> Annex a +metered :: Maybe MeterUpdate -> Key -> Annex (Maybe FilePath) -> (Meter -> MeterUpdate -> Annex a) -> Annex a metered othermeter key getsrcfile a = withMessageState $ \st -> flip go st =<< getsz where go _ (MessageState { outputType = QuietOutput }) = nometer go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = False }) = do showOutput - meter <- liftIO $ mkMeter msize bandwidthMeter $ - displayMeterHandle stdout - m <- liftIO $ rateLimitMeterUpdate 0.1 msize $ + meter <- liftIO $ mkMeter msize $ + displayMeterHandle stdout bandwidthMeter + m <- liftIO $ rateLimitMeterUpdate 0.1 meter $ updateMeter meter - r <- a (combinemeter m) + r <- a meter (combinemeter m) liftIO $ clearMeterHandle meter stdout return r go msize (MessageState { outputType = NormalOutput, concurrentOutputEnabled = True }) = #if WITH_CONCURRENTOUTPUT withProgressRegion $ \r -> do - meter <- liftIO $ mkMeter msize bandwidthMeter $ \_ s -> - Regions.setConsoleRegion r ('\n' : s) - m <- liftIO $ rateLimitMeterUpdate 0.1 msize $ + meter <- liftIO $ mkMeter msize $ \_ msize' old new -> + let s = bandwidthMeter msize' old new + in Regions.setConsoleRegion r ('\n' : s) + m <- liftIO $ rateLimitMeterUpdate 0.1 meter $ updateMeter meter - a (combinemeter m) + a meter (combinemeter m) #else nometer #endif go msize (MessageState { outputType = JSONOutput jsonoptions }) | jsonProgress jsonoptions = do buf <- withMessageState $ return . jsonBuffer - m <- liftIO $ rateLimitMeterUpdate 0.1 msize $ - JSON.progress buf msize - a (combinemeter m) + meter <- liftIO $ mkMeter msize $ \_ msize' _old (new, _now) -> + JSON.progress buf msize' new + m <- liftIO $ rateLimitMeterUpdate 0.1 meter $ + updateMeter meter + a meter (combinemeter m) | otherwise = nometer - nometer = a $ combinemeter (const noop) + nometer = do + dummymeter <- liftIO $ mkMeter Nothing $ + \_ _ _ _ -> return () + a dummymeter (combinemeter (const noop)) combinemeter m = case othermeter of Nothing -> m @@ -82,7 +88,7 @@ metered othermeter key getsrcfile a = withMessageState $ \st -> meteredFile :: FilePath -> Maybe MeterUpdate -> Key -> Annex a -> Annex a meteredFile file combinemeterupdate key a = withMessageState $ \s -> if needOutputMeter s - then metered combinemeterupdate key (return Nothing) $ \p -> + then metered combinemeterupdate key (return Nothing) $ \_ p -> watchFileSize file p a else a where diff --git a/P2P/Annex.hs b/P2P/Annex.hs index 2bac7e6e4e..249bf58d82 100644 --- a/P2P/Annex.hs +++ b/P2P/Annex.hs @@ -117,6 +117,9 @@ runLocal runst runner a = case a of Left e -> return (Left (show e)) Right changedrefs -> runner (next changedrefs) _ -> return $ Left "change notification not available" + UpdateMeterTotalSize m sz next -> do + liftIO $ setMeterTotalSize m sz + runner next where transfer mk k af ta = case runst of -- Update transfer logs when serving. diff --git a/P2P/Protocol.hs b/P2P/Protocol.hs index 24b51cf0f5..e8a5eddb28 100644 --- a/P2P/Protocol.hs +++ b/P2P/Protocol.hs @@ -257,7 +257,10 @@ data LocalF c -- action. If unable to lock the content, or the content is not -- present, runs the protocol action with False. | WaitRefChange (ChangedRefs -> c) - -- ^ Waits for one or more git refs to change and returns them. + -- ^ Waits for one or more git refs to change and returns them.a + | UpdateMeterTotalSize Meter Integer c + -- ^ Updates the total size of a Meter, for cases where the size is + -- not known until the data is being received. deriving (Functor) type Local = Free LocalF @@ -323,8 +326,9 @@ remove key = do net $ sendMessage (REMOVE key) checkSuccess -get :: FilePath -> Key -> AssociatedFile -> MeterUpdate -> Proto Bool -get dest key af p = receiveContent p sizer storer (\offset -> GET offset af key) +get :: FilePath -> Key -> AssociatedFile -> Meter -> MeterUpdate -> Proto Bool +get dest key af m p = + receiveContent (Just m) p sizer storer (\offset -> GET offset af key) where sizer = fileSize dest storer = storeContentTo dest @@ -433,7 +437,7 @@ serveAuthed servermode myuuid = void $ serverLoop handler else do let sizer = tmpContentSize key let storer = storeContent key af - ok <- receiveContent nullMeterUpdate sizer storer PUT_FROM + ok <- receiveContent Nothing nullMeterUpdate sizer storer PUT_FROM when ok $ local $ setPresent key myuuid return ServerContinue @@ -477,15 +481,18 @@ sendContent key af offset@(Offset n) p = go =<< local (contentSize key) net $ sendBytes len content p' checkSuccess -receiveContent :: MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool -receiveContent p sizer storer mkmsg = do +receiveContent :: Maybe Meter -> MeterUpdate -> Local Len -> (Offset -> Len -> Proto L.ByteString -> Local Bool) -> (Offset -> Message) -> Proto Bool +receiveContent mm p sizer storer mkmsg = do Len n <- local sizer let p' = offsetMeterUpdate p (toBytesProcessed n) let offset = Offset n net $ sendMessage (mkmsg offset) r <- net receiveMessage case r of - Just (DATA len) -> do + Just (DATA len@(Len l)) -> do + local $ case mm of + Nothing -> return () + Just m -> updateMeterTotalSize m (n+l) ok <- local $ storer offset len (net (receiveBytes len p')) sendSuccess ok diff --git a/Remote/Helper/P2P.hs b/Remote/Helper/P2P.hs index f609f97d44..c83c38f88b 100644 --- a/Remote/Helper/P2P.hs +++ b/Remote/Helper/P2P.hs @@ -33,14 +33,14 @@ type WithConn a c = (ClosableConnection c -> Annex (ClosableConnection c, a)) -> store :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> MeterUpdate -> Annex Bool store runner k af p = do let getsrcfile = fmap fst <$> prepSendAnnex k - metered (Just p) k getsrcfile $ \p' -> + metered (Just p) k getsrcfile $ \_ p' -> fromMaybe False <$> runner p' (P2P.put k af p') retrieve :: (MeterUpdate -> ProtoRunner Bool) -> Key -> AssociatedFile -> FilePath -> MeterUpdate -> Annex (Bool, Verification) retrieve runner k af dest p = unVerified $ - metered (Just p) k (return Nothing) $ \p' -> fromMaybe False - <$> runner p (P2P.get dest k af p') + metered (Just p) k (return Nothing) $ \m p' -> fromMaybe False + <$> runner p' (P2P.get dest k af m p') remove :: ProtoRunner Bool -> Key -> Annex Bool remove runner k = fromMaybe False <$> runner (P2P.remove k) diff --git a/Remote/Helper/Special.hs b/Remote/Helper/Special.hs index 83e08c5aac..446bd369c7 100644 --- a/Remote/Helper/Special.hs +++ b/Remote/Helper/Special.hs @@ -228,7 +228,7 @@ specialRemote' cfg c preparestorer prepareretriever prepareremover preparecheckp chunkconfig = chunkConfig cfg displayprogress p k srcfile a - | displayProgress cfg = metered (Just p) k (return srcfile) a + | displayProgress cfg = metered (Just p) k (return srcfile) (const a) | otherwise = a p {- Sink callback for retrieveChunks. Stores the file content into the diff --git a/Utility/Metered.hs b/Utility/Metered.hs index a5dda5413f..2fdb700948 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -1,6 +1,6 @@ {- Metered IO and actions - - - Copyright 2012-2016 Joey Hess + - Copyright 2012-2018 Joey Hess - - License: BSD-2-clause -} @@ -288,14 +288,14 @@ outputFilter cmd params environ outfilter errfilter = catchBoolIO $ do -- | Limit a meter to only update once per unit of time. -- -- It's nice to display the final update to 100%, even if it comes soon --- after a previous update. To make that happen, a total size has to be --- provided. -rateLimitMeterUpdate :: NominalDiffTime -> Maybe Integer -> MeterUpdate -> IO MeterUpdate -rateLimitMeterUpdate delta totalsize meterupdate = do +-- after a previous update. To make that happen, the Meter has to know +-- its total size. +rateLimitMeterUpdate :: NominalDiffTime -> Meter -> MeterUpdate -> IO MeterUpdate +rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do lastupdate <- newMVar (toEnum 0 :: POSIXTime) return $ mu lastupdate where - mu lastupdate n@(BytesProcessed i) = case totalsize of + mu lastupdate n@(BytesProcessed i) = tryReadMVar totalsizev >>= \case Just t | i >= t -> meterupdate n _ -> do now <- getPOSIXTime @@ -306,35 +306,40 @@ rateLimitMeterUpdate delta totalsize meterupdate = do meterupdate n else putMVar lastupdate prev -data Meter = Meter (Maybe Integer) (MVar MeterState) (MVar String) RenderMeter DisplayMeter +data Meter = Meter (MVar Integer) (MVar MeterState) (MVar String) DisplayMeter type MeterState = (BytesProcessed, POSIXTime) -type DisplayMeter = MVar String -> String -> IO () +type DisplayMeter = MVar String -> Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> IO () type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String -- | Make a meter. Pass the total size, if it's known. -mkMeter :: Maybe Integer -> RenderMeter -> DisplayMeter -> IO Meter -mkMeter totalsize rendermeter displaymeter = Meter - <$> pure totalsize +mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter +mkMeter totalsize displaymeter = Meter + <$> maybe newEmptyMVar newMVar totalsize <*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime) <*> newMVar "" - <*> pure rendermeter <*> pure displaymeter +setMeterTotalSize :: Meter -> Integer -> IO () +setMeterTotalSize (Meter totalsizev _ _ _) totalsize = do + void $ tryTakeMVar totalsizev + putMVar totalsizev totalsize + -- | Updates the meter, displaying it if necessary. updateMeter :: Meter -> BytesProcessed -> IO () -updateMeter (Meter totalsize sv bv rendermeter displaymeter) new = do +updateMeter (Meter totalsizev sv bv displaymeter) new = do now <- getPOSIXTime (old, before) <- swapMVar sv (new, now) - when (old /= new) $ - displaymeter bv $ - rendermeter totalsize (old, before) (new, now) + when (old /= new) $ do + totalsize <- tryReadMVar totalsizev + displaymeter bv totalsize (old, before) (new, now) -- | Display meter to a Handle. -displayMeterHandle :: Handle -> DisplayMeter -displayMeterHandle h v s = do +displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter +displayMeterHandle h rendermeter v msize old new = do + let s = rendermeter msize old new olds <- swapMVar v s -- Avoid writing when the rendered meter has not changed. when (olds /= s) $ do @@ -344,7 +349,7 @@ displayMeterHandle h v s = do -- | Clear meter displayed by displayMeterHandle. clearMeterHandle :: Meter -> Handle -> IO () -clearMeterHandle (Meter _ _ v _ _) h = do +clearMeterHandle (Meter _ _ v _) h = do olds <- readMVar v hPutStr h $ '\r' : replicate (length olds) ' ' ++ "\r" hFlush h