a little progress on serveGet hang
Now it gets to the validity checker, but it seems it never runs it.
This commit is contained in:
parent
8cb1332407
commit
80fb5445b5
3 changed files with 43 additions and 17 deletions
13
P2P/Annex.hs
13
P2P/Annex.hs
|
@ -107,16 +107,23 @@ runLocal runst runner a = case a of
|
||||||
ProtoFailureMessage "Transfer failed"
|
ProtoFailureMessage "Transfer failed"
|
||||||
let consumer' b ti = do
|
let consumer' b ti = do
|
||||||
validator <- consumer b
|
validator <- consumer b
|
||||||
|
liftIO $ print "got validator"
|
||||||
indicatetransferred ti
|
indicatetransferred ti
|
||||||
|
liftIO $ print "indicatetransferred ti done"
|
||||||
return validator
|
return validator
|
||||||
runner getb >>= \case
|
runner getb >>= \case
|
||||||
Left e -> giveup $ describeProtoFailure e
|
Left e -> giveup $ describeProtoFailure e
|
||||||
Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case
|
Right b -> checktransfer (\ti -> Right <$> consumer' b ti) fallback >>= \case
|
||||||
Left e -> return (Left e)
|
Left e -> return (Left e)
|
||||||
Right validator ->
|
Right validator -> do
|
||||||
|
liftIO $ print "running validity check"
|
||||||
runner validitycheck >>= \case
|
runner validitycheck >>= \case
|
||||||
Right v -> Right <$> validator v
|
Right v -> do
|
||||||
_ -> Right <$> validator Nothing
|
liftIO $ print ("calling validator 1", v)
|
||||||
|
Right <$> validator v
|
||||||
|
_ -> do
|
||||||
|
liftIO $ print "calling validator nothing"
|
||||||
|
Right <$> validator Nothing
|
||||||
case v of
|
case v of
|
||||||
Left e -> return $ Left $ ProtoFailureException e
|
Left e -> return $ Left $ ProtoFailureException e
|
||||||
Right (Left e) -> return $ Left e
|
Right (Left e) -> return $ Left e
|
||||||
|
|
23
P2P/Http.hs
23
P2P/Http.hs
|
@ -158,8 +158,11 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
|
||||||
aid <- liftIO $ async $ inAnnexWorker st $ do
|
aid <- liftIO $ async $ inAnnexWorker st $ do
|
||||||
let consumer bs = do
|
let consumer bs = do
|
||||||
liftIO $ atomically $ putTMVar bsv bs
|
liftIO $ atomically $ putTMVar bsv bs
|
||||||
|
liftIO $ print "consumer waiting for endv"
|
||||||
liftIO $ atomically $ takeTMVar endv
|
liftIO $ atomically $ takeTMVar endv
|
||||||
|
liftIO $ print "consumer took endv"
|
||||||
return $ \v -> do
|
return $ \v -> do
|
||||||
|
liftIO $ print "consumer put validityv"
|
||||||
liftIO $ atomically $
|
liftIO $ atomically $
|
||||||
putTMVar validityv v
|
putTMVar validityv v
|
||||||
return True
|
return True
|
||||||
|
@ -178,17 +181,22 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
|
||||||
stream (releaseconn, bv, endv, validityv, aid) =
|
stream (releaseconn, bv, endv, validityv, aid) =
|
||||||
S.fromActionStep B.null $ do
|
S.fromActionStep B.null $ do
|
||||||
print "chunk"
|
print "chunk"
|
||||||
modifyMVar bv $ \case
|
modifyMVar bv $ nextchunk $
|
||||||
(b:bs) -> return (bs, b)
|
cleanup (releaseconn, endv, validityv, aid)
|
||||||
[] -> do
|
|
||||||
endbit <- cleanup (releaseconn, endv, validityv, aid)
|
|
||||||
return ([], endbit)
|
|
||||||
|
|
||||||
|
nextchunk atend (b:bs)
|
||||||
|
| not (B.null b) = return (bs, b)
|
||||||
|
| otherwise = nextchunk atend bs
|
||||||
|
nextchunk atend [] = do
|
||||||
|
endbit <- atend
|
||||||
|
return ([], endbit)
|
||||||
|
|
||||||
cleanup (releaseconn, endv, validityv, aid) =
|
cleanup (releaseconn, endv, validityv, aid) =
|
||||||
ifM (atomically $ isEmptyTMVar endv)
|
ifM (atomically $ isEmptyTMVar endv)
|
||||||
( pure mempty
|
( do
|
||||||
, do
|
print "at end"
|
||||||
atomically $ putTMVar endv ()
|
atomically $ putTMVar endv ()
|
||||||
|
print "signaled end"
|
||||||
validity <- atomically $ takeTMVar validityv
|
validity <- atomically $ takeTMVar validityv
|
||||||
print ("got validity", validity)
|
print ("got validity", validity)
|
||||||
wait aid >>= \case
|
wait aid >>= \case
|
||||||
|
@ -207,6 +215,7 @@ serveGet st apiver (B64Key k) cu su bypass baf startat sec auth = do
|
||||||
Just Invalid -> "XXXXXXX"
|
Just Invalid -> "XXXXXXX"
|
||||||
-- FIXME: need to count bytes and emit
|
-- FIXME: need to count bytes and emit
|
||||||
-- something to make it invalid
|
-- something to make it invalid
|
||||||
|
, pure mempty
|
||||||
)
|
)
|
||||||
|
|
||||||
sizer = pure $ Len $ case startat of
|
sizer = pure $ Len $ case startat of
|
||||||
|
|
24
servant.hs
24
servant.hs
|
@ -32,23 +32,33 @@ import System.IO.Unsafe
|
||||||
import qualified Network.Wai.Handler.Warp as Warp
|
import qualified Network.Wai.Handler.Warp as Warp
|
||||||
|
|
||||||
type API = "readme" :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
|
type API = "readme" :> StreamGet NoFraming OctetStream (SourceIO BS.ByteString)
|
||||||
|
:<|> "writeme" :> StreamBody NoFraming OctetStream (SourceIO BS.ByteString) :> Post '[JSON] Bool
|
||||||
|
|
||||||
api :: Proxy API
|
api :: Proxy API
|
||||||
api = Proxy
|
api = Proxy
|
||||||
|
|
||||||
server :: Server API
|
server :: Server API
|
||||||
server = readme where
|
server = readme :<|> writeme where
|
||||||
readme = liftIO $ do
|
readme = liftIO $ do
|
||||||
putStrLn "/proxy"
|
putStrLn "/proxy"
|
||||||
return $ S.SourceT $ \k -> do
|
return $ S.SourceT $ \k -> do
|
||||||
k =<< readfilelazy "README.md"
|
k =<< readfilelazy "README.md"
|
||||||
k =<< readfilelazy "another"
|
k =<< readfilelazy "another"
|
||||||
|
|
||||||
|
writeme :: SourceIO BS.ByteString -> Handler Bool
|
||||||
|
writeme src = do
|
||||||
|
liftIO $ print "gathering lazy bytestring"
|
||||||
|
b <- liftIO $ S.unSourceT src gatherbytestring
|
||||||
|
liftIO $ print "got lazy bytestring, writing to file"
|
||||||
|
liftIO$ BL.writeFile "writem" b
|
||||||
|
liftIO$ print "write complete"
|
||||||
|
return True
|
||||||
|
|
||||||
app :: Application
|
app :: Application
|
||||||
app = serve api server
|
app = serve api server
|
||||||
|
|
||||||
cli :: ClientM (S.SourceT IO BS.ByteString)
|
cli :: ClientM (S.SourceT IO BS.ByteString)
|
||||||
cli = client api
|
cli :<|> writecli = client api
|
||||||
|
|
||||||
main :: IO ()
|
main :: IO ()
|
||||||
main = do
|
main = do
|
||||||
|
@ -61,17 +71,17 @@ main = do
|
||||||
("client":ns:_) -> do
|
("client":ns:_) -> do
|
||||||
mgr <- newManager defaultManagerSettings
|
mgr <- newManager defaultManagerSettings
|
||||||
burl <- parseBaseUrl "http://localhost:8000/"
|
burl <- parseBaseUrl "http://localhost:8000/"
|
||||||
withClientM cli (mkClientEnv mgr burl) $ \me -> case me of
|
withClientM (writecli getit) (mkClientEnv mgr burl) $ \me -> case me of
|
||||||
Left err -> print err
|
Left err -> print err
|
||||||
Right src -> do
|
Right src -> print src
|
||||||
b <- S.unSourceT src gatherbytestring
|
|
||||||
liftIO $ print "got it all, writing"
|
|
||||||
BL.writeFile "got" (BL.init b)
|
|
||||||
_ -> do
|
_ -> do
|
||||||
putStrLn "Try:"
|
putStrLn "Try:"
|
||||||
putStrLn "cabal new-run cookbook-basic-streaming server"
|
putStrLn "cabal new-run cookbook-basic-streaming server"
|
||||||
putStrLn "cabal new-run cookbook-basic-streaming client 10"
|
putStrLn "cabal new-run cookbook-basic-streaming client 10"
|
||||||
putStrLn "time curl -H 'Accept: application/json' localhost:8000/slow/5"
|
putStrLn "time curl -H 'Accept: application/json' localhost:8000/slow/5"
|
||||||
|
where
|
||||||
|
getit = S.SourceT $ \k -> do
|
||||||
|
k =<< readfilelazy "/home/joey/README.md"
|
||||||
|
|
||||||
readfilelazy :: FilePath -> IO (S.StepT IO BS.ByteString)
|
readfilelazy :: FilePath -> IO (S.StepT IO BS.ByteString)
|
||||||
readfilelazy file = do
|
readfilelazy file = do
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue