starting work on import from S3
Not in a usuable state yet.
This commit is contained in:
parent
352612618c
commit
bf6c7ea6b6
1 changed files with 167 additions and 17 deletions
184
Remote/S3.hs
184
Remote/S3.hs
|
@ -40,12 +40,14 @@ import Annex.Common
|
|||
import Types.Remote
|
||||
import Types.Export
|
||||
import qualified Git
|
||||
import qualified Annex
|
||||
import Config
|
||||
import Config.Cost
|
||||
import Remote.Helper.Special
|
||||
import Remote.Helper.Http
|
||||
import Remote.Helper.Messages
|
||||
import Remote.Helper.ExportImport
|
||||
import Types.Import
|
||||
import qualified Remote.Helper.AWS as AWS
|
||||
import Creds
|
||||
import Annex.UUID
|
||||
|
@ -71,7 +73,7 @@ remote = RemoteType
|
|||
, generate = gen
|
||||
, setup = s3Setup
|
||||
, exportSupported = exportIsSupported
|
||||
, importSupported = importUnsupported
|
||||
, importSupported = importIsSupported
|
||||
}
|
||||
|
||||
gen :: Git.Repo -> UUID -> RemoteConfig -> RemoteGitConfig -> Annex (Maybe Remote)
|
||||
|
@ -112,7 +114,14 @@ gen r u c gc = do
|
|||
, removeExportDirectory = Nothing
|
||||
, renameExport = renameExportS3 hdl this info
|
||||
}
|
||||
, importActions = importUnsupported
|
||||
, importActions = ImportActions
|
||||
{ listImportableContents = listImportableContentsS3 hdl this info
|
||||
, retrieveExportWithContentIdentifier = retrieveExportWithContentIdentifierS3 hdl info
|
||||
, storeExportWithContentIdentifier = storeExportWithContentIdentifierS3 hdl this info magic
|
||||
, removeExportWithContentIdentifier = removeExportWithContentIdentifierS3 hdl info
|
||||
, removeExportDirectoryWhenEmpty = Nothing
|
||||
, checkPresentExportWithContentIdentifier = checkPresentExportWithContentIdentifierS3 hdl info
|
||||
}
|
||||
, whereisKey = Just (getPublicWebUrls u info c)
|
||||
, remoteFsck = Nothing
|
||||
, repairRepo = Nothing
|
||||
|
@ -150,8 +159,8 @@ s3Setup' ss u mcreds c gc
|
|||
, ("bucket", defbucket)
|
||||
]
|
||||
|
||||
use fullconfig = do
|
||||
enableBucketVersioning ss fullconfig gc u
|
||||
use fullconfig info = do
|
||||
enableBucketVersioning ss info fullconfig gc u
|
||||
gitConfigSpecialRemote u fullconfig [("s3", "true")]
|
||||
return (fullconfig, u)
|
||||
|
||||
|
@ -159,10 +168,12 @@ s3Setup' ss u mcreds c gc
|
|||
(c', encsetup) <- encryptionSetup c gc
|
||||
c'' <- setRemoteCredPair encsetup c' gc (AWS.creds u) mcreds
|
||||
let fullconfig = c'' `M.union` defaults
|
||||
info <- extractS3Info fullconfig
|
||||
checkexportimportsafe fullconfig info
|
||||
case ss of
|
||||
Init -> genBucket fullconfig gc u
|
||||
_ -> return ()
|
||||
use fullconfig
|
||||
use fullconfig info
|
||||
|
||||
archiveorg = do
|
||||
showNote "Internet Archive mode"
|
||||
|
@ -182,10 +193,26 @@ s3Setup' ss u mcreds c gc
|
|||
-- special constraints on key names
|
||||
M.insert "mungekeys" "ia" defaults
|
||||
info <- extractS3Info archiveconfig
|
||||
checkexportimportsafe archiveconfig info
|
||||
hdl <- mkS3HandleVar archiveconfig gc u
|
||||
withS3HandleOrFail u hdl $
|
||||
writeUUIDFile archiveconfig u info
|
||||
use archiveconfig
|
||||
use archiveconfig info
|
||||
|
||||
checkexportimportsafe c' info =
|
||||
checkexportimportsafe' c' info
|
||||
=<< Annex.getState Annex.force
|
||||
checkexportimportsafe' c' info force
|
||||
| force = return ()
|
||||
| versioning info = return ()
|
||||
| exportTree c' && importTree c' = giveup $ unwords
|
||||
[ "Combining exporttree=yes and importtree=yes"
|
||||
, "with an unversioned S3 bucket is not safe;"
|
||||
, "exporting can overwrite other modifications"
|
||||
, "to files in the bucket."
|
||||
, "Recommend you add versioning=yes."
|
||||
, "(Or use --force if you don't mind possibly losing data.)"
|
||||
]
|
||||
|
||||
store :: S3HandleVar -> Remote -> S3Info -> Maybe Magic -> Storer
|
||||
store mh r info magic = fileStorer $ \k f p -> withS3HandleOrFail (uuid r) mh $ \h -> do
|
||||
|
@ -346,17 +373,21 @@ checkKeyHelper info h loc = liftIO $ runResourceT $ do
|
|||
#endif
|
||||
|
||||
storeExportS3 :: S3HandleVar -> Remote -> S3Info -> Maybe Magic -> FilePath -> Key -> ExportLocation -> MeterUpdate -> Annex Bool
|
||||
storeExportS3 hv r info magic f k loc p = withS3Handle hv $ \case
|
||||
Just h -> catchNonAsync (go h) (\e -> warning (show e) >> return False)
|
||||
storeExportS3 hv r info magic f k loc p = fst
|
||||
<$> storeExportS3' hv r info magic f k loc p
|
||||
|
||||
storeExportS3' :: S3HandleVar -> Remote -> S3Info -> Maybe Magic -> FilePath -> Key -> ExportLocation -> MeterUpdate -> Annex (Bool, Maybe S3VersionID)
|
||||
storeExportS3' hv r info magic f k loc p = withS3Handle hv $ \case
|
||||
Just h -> catchNonAsync (go h) (\e -> warning (show e) >> return (False, Nothing))
|
||||
Nothing -> do
|
||||
warning $ needS3Creds (uuid r)
|
||||
return False
|
||||
return (False, Nothing)
|
||||
where
|
||||
go h = do
|
||||
let o = T.pack $ bucketExportLocation info loc
|
||||
storeHelper info h magic f o p
|
||||
>>= setS3VersionID info (uuid r) k
|
||||
return True
|
||||
mvid <- storeHelper info h magic f o p
|
||||
setS3VersionID info (uuid r) k mvid
|
||||
return (True, mvid)
|
||||
|
||||
retrieveExportS3 :: S3HandleVar -> Remote -> S3Info -> Key -> ExportLocation -> FilePath -> MeterUpdate -> Annex Bool
|
||||
retrieveExportS3 hv r info _k loc f p =
|
||||
|
@ -420,6 +451,93 @@ renameExportS3 hv r info k src dest = Just <$> go
|
|||
srcobject = T.pack $ bucketExportLocation info src
|
||||
dstobject = T.pack $ bucketExportLocation info dest
|
||||
|
||||
listImportableContentsS3 :: S3HandleVar -> Remote -> S3Info -> Annex (Maybe (ImportableContents (ContentIdentifier, ByteSize)))
|
||||
listImportableContentsS3 hv r info =
|
||||
withS3Handle hv $ \case
|
||||
Nothing -> do
|
||||
warning $ needS3Creds (uuid r)
|
||||
return Nothing
|
||||
Just h -> catchMaybeIO $ liftIO $ runResourceT $ startlist h
|
||||
where
|
||||
startlist h
|
||||
| versioning info = do
|
||||
rsp <- sendS3Handle h $
|
||||
S3.getBucketObjectVersions (bucket info)
|
||||
liftIO $ print rsp
|
||||
continuelistversioned h [] rsp
|
||||
| otherwise = do
|
||||
rsp <- sendS3Handle h $
|
||||
S3.getBucket (bucket info)
|
||||
liftIO $ print rsp
|
||||
continuelistunversioned h [] rsp
|
||||
|
||||
continuelistunversioned h l rsp
|
||||
| S3.gbrIsTruncated rsp = do
|
||||
rsp' <- sendS3Handle h $
|
||||
(S3.getBucket (bucket info))
|
||||
{ S3.gbMarker = S3.gbrNextMarker rsp
|
||||
}
|
||||
liftIO $ print rsp'
|
||||
continuelistunversioned h (rsp:l) rsp'
|
||||
| otherwise = return (mklistunversioned (reverse (rsp:l)))
|
||||
|
||||
continuelistversioned h l rsp
|
||||
| S3.gbovrIsTruncated rsp = do
|
||||
rsp' <- sendS3Handle h $
|
||||
(S3.getBucketObjectVersions (bucket info))
|
||||
{ S3.gbovKeyMarker = S3.gbovrNextKeyMarker rsp
|
||||
, S3.gbovVersionIdMarker = S3.gbovrNextVersionIdMarker rsp
|
||||
}
|
||||
liftIO $ print rsp
|
||||
continuelistversioned h (rsp:l) rsp'
|
||||
| otherwise = return (mklistversioned (reverse (rsp:l)))
|
||||
|
||||
mklistunversioned l = ImportableContents
|
||||
{ importableContents =
|
||||
concatMap (mapMaybe go . S3.gbrContents) l
|
||||
, importableHistory = []
|
||||
}
|
||||
where
|
||||
go oi = do
|
||||
loc <- bucketImportLocation info $
|
||||
T.unpack $ S3.objectKey oi
|
||||
let sz = S3.objectSize oi
|
||||
let cid = mkS3UnversionedContentIdentifier $
|
||||
S3.objectETag oi
|
||||
return (loc, (cid, sz))
|
||||
|
||||
mklistversioned l = ImportableContents [] [] -- FIXME
|
||||
|
||||
retrieveExportWithContentIdentifierS3 :: S3HandleVar -> S3Info -> ExportLocation -> ContentIdentifier -> FilePath -> Annex (Maybe Key) -> MeterUpdate -> Annex (Maybe Key)
|
||||
retrieveExportWithContentIdentifierS3 hv info loc cid dest mkkey p = undefined
|
||||
|
||||
-- Does not check if content on S3 is safe to overwrite, because there
|
||||
-- is no atomic way to do so. When the bucket is versioned, this is
|
||||
-- acceptable because listImportableContentsS3 will find versions
|
||||
-- of files that were overwritten by this and no data is lost.
|
||||
--
|
||||
-- When the bucket is not versioned, data loss can result.
|
||||
-- This is why that configuration requires --force to enable.
|
||||
storeExportWithContentIdentifierS3 :: S3HandleVar -> Remote -> S3Info -> Maybe Magic -> FilePath -> Key -> ExportLocation -> [ContentIdentifier] -> MeterUpdate -> Annex (Maybe ContentIdentifier)
|
||||
storeExportWithContentIdentifierS3 hv r info magic src k loc _overwritablecids p =
|
||||
storeExportS3' hv r info magic src k loc p >>= \case
|
||||
(False, _) -> return Nothing
|
||||
(True, Just vid) -> return $ Just $
|
||||
mkS3VersionedContentIdentifier vid
|
||||
(True, Nothing) -> return $ Just $
|
||||
-- FIXME for an unversioned bucket, should use the etag
|
||||
-- of the file, which is its md5sum, as the ContentIdentifier
|
||||
-- NOT mempty!
|
||||
-- This is blocked by
|
||||
-- https://github.com/aristidb/aws/issues/258
|
||||
mkS3UnversionedContentIdentifier mempty
|
||||
|
||||
removeExportWithContentIdentifierS3 :: S3HandleVar -> S3Info -> Key -> ExportLocation -> [ContentIdentifier] -> Annex Bool
|
||||
removeExportWithContentIdentifierS3 hv info k loc removeablecids = undefined
|
||||
|
||||
checkPresentExportWithContentIdentifierS3 :: S3HandleVar -> S3Info -> Key -> ExportLocation -> [ContentIdentifier] -> Annex Bool
|
||||
checkPresentExportWithContentIdentifierS3 hv info _k loc knowncids = undefined
|
||||
|
||||
{- Generate the bucket if it does not already exist, including creating the
|
||||
- UUID file within the bucket.
|
||||
-
|
||||
|
@ -611,6 +729,7 @@ data S3Info = S3Info
|
|||
, storageClass :: S3.StorageClass
|
||||
, bucketObject :: Key -> BucketObject
|
||||
, bucketExportLocation :: ExportLocation -> BucketObject
|
||||
, bucketImportLocation :: BucketObject -> Maybe ImportLocation
|
||||
, metaHeaders :: [(T.Text, T.Text)]
|
||||
, partSize :: Maybe Integer
|
||||
, isIA :: Bool
|
||||
|
@ -631,6 +750,7 @@ extractS3Info c = do
|
|||
, storageClass = getStorageClass c
|
||||
, bucketObject = getBucketObject c
|
||||
, bucketExportLocation = getBucketExportLocation c
|
||||
, bucketImportLocation = getBucketImportLocation c
|
||||
, metaHeaders = getMetaHeaders c
|
||||
, partSize = getPartSize c
|
||||
, isIA = configIA c
|
||||
|
@ -690,6 +810,19 @@ getBucketObject c = munge . serializeKey
|
|||
getBucketExportLocation :: RemoteConfig -> ExportLocation -> BucketObject
|
||||
getBucketExportLocation c loc = getFilePrefix c ++ fromExportLocation loc
|
||||
|
||||
getBucketImportLocation :: RemoteConfig -> BucketObject -> Maybe ImportLocation
|
||||
getBucketImportLocation c obj
|
||||
-- The uuidFile should not be imported.
|
||||
| obj == uuidfile = Nothing
|
||||
-- Only import files that are under the fileprefix, when
|
||||
-- one is configured.
|
||||
| prefix `isPrefixOf` obj = Just $ mkImportLocation $ drop prefixlen obj
|
||||
| otherwise = Nothing
|
||||
where
|
||||
prefix = getFilePrefix c
|
||||
prefixlen = length prefix
|
||||
uuidfile = uuidFile c
|
||||
|
||||
{- Internet Archive documentation limits filenames to a subset of ascii.
|
||||
- While other characters seem to work now, this entity encodes everything
|
||||
- else to avoid problems. -}
|
||||
|
@ -804,7 +937,11 @@ getPublicUrlMaker info = case publicurl info of
|
|||
Just (iaPublicUrl info)
|
||||
_ -> Nothing
|
||||
|
||||
|
||||
-- S3 uses a unique version id for each object stored on it.
|
||||
--
|
||||
-- The Object is included in this because retrieving a particular
|
||||
-- version id involves a request for an object, so this keeps track of what
|
||||
-- the object is.
|
||||
data S3VersionID = S3VersionID S3.Object T.Text
|
||||
deriving (Show)
|
||||
|
||||
|
@ -834,6 +971,20 @@ parseS3VersionID b = do
|
|||
o <- eitherToMaybe $ T.decodeUtf8' $ BS.drop 1 rest
|
||||
mkS3VersionID o (eitherToMaybe $ T.decodeUtf8' v)
|
||||
|
||||
-- For a versioned bucket, the S3VersionID is used as the
|
||||
-- ContentIdentifier.
|
||||
mkS3VersionedContentIdentifier :: S3VersionID -> ContentIdentifier
|
||||
mkS3VersionedContentIdentifier (S3VersionID _ v) =
|
||||
ContentIdentifier $ T.encodeUtf8 v
|
||||
|
||||
type S3Etag = T.Text
|
||||
|
||||
-- For an unversioned bucket, the S3Etag is instead used as the
|
||||
-- ContentIdentifier.
|
||||
mkS3UnversionedContentIdentifier :: S3Etag -> ContentIdentifier
|
||||
mkS3UnversionedContentIdentifier t =
|
||||
ContentIdentifier $ T.encodeUtf8 t
|
||||
|
||||
setS3VersionID :: S3Info -> UUID -> Key -> Maybe S3VersionID -> Annex ()
|
||||
setS3VersionID info u k vid
|
||||
| versioning info = maybe noop (setS3VersionID' u k) vid
|
||||
|
@ -881,13 +1032,12 @@ getS3VersionIDPublicUrls mk info u k =
|
|||
-- Enable versioning on the bucket can only be done at init time;
|
||||
-- setting versioning in a bucket that git-annex has already exported
|
||||
-- files to risks losing the content of those un-versioned files.
|
||||
enableBucketVersioning :: SetupStage -> RemoteConfig -> RemoteGitConfig -> UUID -> Annex ()
|
||||
enableBucketVersioning :: SetupStage -> S3Info -> RemoteConfig -> RemoteGitConfig -> UUID -> Annex ()
|
||||
#if MIN_VERSION_aws(0,21,1)
|
||||
enableBucketVersioning ss c gc u = do
|
||||
enableBucketVersioning ss info c gc u = do
|
||||
#else
|
||||
enableBucketVersioning ss c _ _ = do
|
||||
enableBucketVersioning ss info _ _ _ = do
|
||||
#endif
|
||||
info <- extractS3Info c
|
||||
case ss of
|
||||
Init -> when (versioning info) $
|
||||
enableversioning (bucket info)
|
||||
|
|
Loading…
Reference in a new issue