improve tailVerify
Wait for the file to get modified, not only opened. This way, if a remote does not support resuming, and opens a new file over top of the existing file, it will wait until that remote starts writing, and open the file it's writing to, not the old file. Sponsored-by: Dartmouth College's DANDI project
This commit is contained in:
parent
e46a7dff6f
commit
e0b7f391bd
1 changed files with 48 additions and 64 deletions
112
Annex/Verify.hs
112
Annex/Verify.hs
|
@ -172,73 +172,57 @@ tailVerify iv f finished =
|
||||||
failIncremental iv
|
failIncremental iv
|
||||||
return Nothing
|
return Nothing
|
||||||
where
|
where
|
||||||
f' = fromRawFilePath f
|
-- Watch the directory containing the file, and wait for
|
||||||
waitforfiletoexist i = tryNonAsync (openBinaryFile f' ReadMode) >>= \case
|
-- the file to be modified. It's possible that the file already
|
||||||
Right h -> return (Just h)
|
-- exists before the downloader starts, but it replaces it instead
|
||||||
Left _ -> do
|
-- of resuming, and waiting for modification deals with such
|
||||||
hv <- newEmptyTMVarIO
|
-- situations.
|
||||||
wd <- inotifycreate i $
|
inotifydirchange i cont =
|
||||||
tryNonAsync (openBinaryFile f' ReadMode) >>= \case
|
INotify.addWatch i [INotify.Modify] dir $ \case
|
||||||
Right h ->
|
-- Ignore changes to other files in the directory.
|
||||||
unlessM (atomically $ tryPutTMVar hv h) $
|
INotify.Modified { INotify.maybeFilePath = fn }
|
||||||
hClose h
|
| fn == Just basef -> cont
|
||||||
Left _ -> return ()
|
_ -> noop
|
||||||
-- Wait for the file to appear, or for a signal
|
|
||||||
-- that the file is finished being written.
|
|
||||||
--
|
|
||||||
-- The TMVar is left full to prevent the file
|
|
||||||
-- being opened again if the inotify event
|
|
||||||
-- fires more than once.
|
|
||||||
v <- atomically $
|
|
||||||
(Just <$> readTMVar hv)
|
|
||||||
`orElse`
|
|
||||||
((const Nothing) <$> readTMVar finished)
|
|
||||||
void $ tryNonAsync $ INotify.removeWatch wd
|
|
||||||
return v
|
|
||||||
|
|
||||||
inotifycreate i cont = INotify.addWatch i evs (P.takeDirectory f) $ \case
|
|
||||||
-- Ignore changes to other files in the directory.
|
|
||||||
INotify.Created { INotify.filePath = fn }
|
|
||||||
| fn /= basef -> noop
|
|
||||||
INotify.MovedIn { INotify.filePath = fn }
|
|
||||||
| fn /= basef -> noop
|
|
||||||
INotify.Opened { INotify.maybeFilePath = fn }
|
|
||||||
| fn /= Just basef -> noop
|
|
||||||
INotify.Modified { INotify.maybeFilePath = fn }
|
|
||||||
| fn /= Just basef -> noop
|
|
||||||
_ -> cont
|
|
||||||
where
|
where
|
||||||
evs =
|
(dir, basef) = P.splitFileName f
|
||||||
[ INotify.Create
|
|
||||||
, INotify.MoveIn
|
|
||||||
, INotify.Move
|
|
||||||
, INotify.Open
|
|
||||||
, INotify.Modify
|
|
||||||
]
|
|
||||||
basef = P.takeFileName f
|
|
||||||
|
|
||||||
go = INotify.withINotify $ \i -> do
|
inotifyfilechange i = INotify.addWatch i [INotify.Modify] f . const
|
||||||
h <- waitforfiletoexist i
|
|
||||||
tryNonAsync (go' i h) >>= \case
|
|
||||||
Right r -> return r
|
|
||||||
Left _ -> do
|
|
||||||
maybe noop hClose h
|
|
||||||
failIncremental iv
|
|
||||||
return Nothing
|
|
||||||
|
|
||||||
go' i (Just h) = do
|
|
||||||
modified <- newEmptyTMVarIO
|
|
||||||
wd <- INotify.addWatch i [INotify.Modify] f $ \_event ->
|
|
||||||
atomically $ void $ tryPutTMVar modified ()
|
|
||||||
r <- follow h modified
|
|
||||||
void $ tryNonAsync $ INotify.removeWatch wd
|
|
||||||
return r
|
|
||||||
|
|
||||||
-- File never showed up, but we've been told it's done being
|
go = INotify.withINotify $ \i -> do
|
||||||
-- written to.
|
modified <- newEmptyTMVarIO
|
||||||
go' _ Nothing = do
|
let signalmodified = atomically $ void $ tryPutTMVar modified ()
|
||||||
failIncremental iv
|
wd <- inotifydirchange i signalmodified
|
||||||
return Nothing
|
let cleanup = void . tryNonAsync . INotify.removeWatch
|
||||||
|
let stop w = do
|
||||||
|
cleanup w
|
||||||
|
failIncremental iv
|
||||||
|
return Nothing
|
||||||
|
waitopen modified >>= \case
|
||||||
|
Nothing -> stop wd
|
||||||
|
Just h -> do
|
||||||
|
cleanup wd
|
||||||
|
wf <- inotifyfilechange i signalmodified
|
||||||
|
tryNonAsync (follow h modified) >>= \case
|
||||||
|
Left _ -> do
|
||||||
|
hClose h
|
||||||
|
stop wf
|
||||||
|
Right r -> do
|
||||||
|
cleanup wf
|
||||||
|
return r
|
||||||
|
|
||||||
|
waitopen modified = do
|
||||||
|
v <- atomically $
|
||||||
|
(Just <$> takeTMVar modified)
|
||||||
|
`orElse`
|
||||||
|
((const Nothing) <$> takeTMVar finished)
|
||||||
|
case v of
|
||||||
|
Just () -> tryNonAsync (openBinaryFile (fromRawFilePath f) ReadMode) >>= \case
|
||||||
|
Right h -> return (Just h)
|
||||||
|
-- Failed to open, wait for next
|
||||||
|
-- modification and try again.
|
||||||
|
Left _ -> waitopen modified
|
||||||
|
-- finished without the file being modified
|
||||||
|
Nothing -> return Nothing
|
||||||
|
|
||||||
follow h modified = do
|
follow h modified = do
|
||||||
b <- S.hGetNonBlocking h chunk
|
b <- S.hGetNonBlocking h chunk
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue