Skip to content

Commit

Permalink
Stage READ/WRITE support and restrict URL format (matrixorigin#17820)
Browse files Browse the repository at this point in the history
  • Loading branch information
cpegeric committed Aug 21, 2024
1 parent 80509e6 commit 07b25c1
Show file tree
Hide file tree
Showing 8 changed files with 719 additions and 237 deletions.
88 changes: 17 additions & 71 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3266,7 +3266,7 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
}
} else {
// format credentials and hash it
credentials = HashPassWord(formatCredentials(cs.Credentials))
credentials = formatCredentials(cs.Credentials)

if !cs.Status.Exist {
StageStatus = "disabled"
Expand All @@ -3278,6 +3278,11 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
comment = cs.Comment.Comment
}

if !(strings.HasPrefix(cs.Url, function.STAGE_PROTOCOL+"://") || strings.HasPrefix(cs.Url, function.S3_PROTOCOL+"://") ||
strings.HasPrefix(cs.Url, function.FILE_PROTOCOL+":///")) {
return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///")
}

sql, err = getSqlForInsertIntoMoStages(ctx, string(cs.Name), cs.Url, credentials, StageStatus, types.CurrentTimestamp().String2(time.UTC, 0), comment)
if err != nil {
return err
Expand All @@ -3295,88 +3300,24 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
func doCheckFilePath(ctx context.Context, ses *Session, ep *tree.ExportParam) (err error) {
//var err error
var filePath string
var sql string
var erArray []ExecResult
var stageName string
var stageStatus string
var url string
if ep == nil {
return err
}

bh := ses.GetBackgroundExec(ctx)
defer bh.Close()

err = bh.Exec(ctx, "begin;")
defer func() {
err = finishTxn(ctx, bh, err)
}()
if err != nil {
return err
}

// detect filepath contain stage or not
filePath = ep.FilePath
if !strings.Contains(filePath, ":") {
// the filepath is the target path
sql = getSqlForCheckStageStatus(ctx, "enabled")
bh.ClearExecResultSet()
err = bh.Exec(ctx, sql)
if err != nil {
return err
}

erArray, err = getResultSet(ctx, bh)
if err != nil {
return err
}

// if have stage enabled
if execResultArrayHasData(erArray) {
return moerr.NewInternalError(ctx, "stage exists, please try to check and use a stage instead")
} else {
// use the filepath
return err
}
} else {
stageName = strings.Split(filePath, ":")[0]
// check the stage status
sql, err = getSqlForCheckStageStatusWithStageName(ctx, stageName)
if err != nil {
return err
}
bh.ClearExecResultSet()
err = bh.Exec(ctx, sql)
if strings.HasPrefix(filePath, function.STAGE_PROTOCOL+"://") {
// stage:// URL
s, err := function.UrlToStageDef(filePath, ses.proc)
if err != nil {
return err
}

erArray, err = getResultSet(ctx, bh)
// s.ToPath() returns the fileservice filepath, i.e. s3,...:/path for S3 or /path for local file
ses.ep.userConfig.StageFilePath, _, err = s.ToPath()
if err != nil {
return err
}
if execResultArrayHasData(erArray) {
stageStatus, err = erArray[0].GetString(ctx, 0, 1)
if err != nil {
return err
}

// is the stage staus is disabled
if stageStatus == tree.StageStatusDisabled.String() {
return moerr.NewInternalError(ctx, "stage '%s' is invalid, please check", stageName)
} else if stageStatus == tree.StageStatusEnabled.String() {
// replace the filepath using stage url
url, err = erArray[0].GetString(ctx, 0, 0)
if err != nil {
return err
}

filePath = strings.Replace(filePath, stageName+":", url, 1)
ses.ep.userConfig.StageFilePath = filePath
}
} else {
return moerr.NewInternalError(ctx, "stage '%s' is not exists, please check", stageName)
}
}
return err

Expand Down Expand Up @@ -3440,6 +3381,11 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e
}
} else {
if as.UrlOption.Exist {
if !(strings.HasPrefix(as.UrlOption.Url, function.STAGE_PROTOCOL+"://") ||
strings.HasPrefix(as.UrlOption.Url, function.S3_PROTOCOL+"://") ||
strings.HasPrefix(as.UrlOption.Url, function.FILE_PROTOCOL+":///")) {
return moerr.NewBadConfig(ctx, "URL protocol only supports stage://, s3:// and file:///")
}
sql = getsqlForUpdateStageUrl(string(as.Name), as.UrlOption.Url)
err = bh.Exec(ctx, sql)
if err != nil {
Expand All @@ -3448,7 +3394,7 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e
}

if as.CredentialsOption.Exist {
credentials = HashPassWord(formatCredentials(as.CredentialsOption))
credentials = formatCredentials(as.CredentialsOption)
sql = getsqlForUpdateStageCredentials(string(as.Name), credentials)
err = bh.Exec(ctx, sql)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,9 +1499,24 @@ func (c *Compile) compileExternScan(n *plan.Node) ([]*Scope, error) {
} else if param.ScanType == tree.INLINE {
return c.compileExternValueScan(n, param, strictSqlMode)
} else {
if err := plan2.InitInfileParam(param); err != nil {
if err := plan2.InitInfileOrStageParam(param, c.proc); err != nil {
return nil, err
}

// if filepath is stage URL, ScanType may change to tree.S3. check param.Parallel again
if param.ScanType == tree.S3 && param.Parallel {
mcpu = 0
ID2Addr = make(map[int]int, 0)
for i := 0; i < len(c.cnList); i++ {
tmp := mcpu
if c.cnList[i].Mcpu > external.S3ParallelMaxnum {
mcpu += external.S3ParallelMaxnum
} else {
mcpu += c.cnList[i].Mcpu
}
ID2Addr[i] = mcpu - tmp
}
}
}

t = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/plan/build_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func checkFileExist(param *tree.ExternParam, ctx CompilerContext) (string, error
return "", err
}
} else {
if err := InitInfileParam(param); err != nil {
if err := InitInfileOrStageParam(param, ctx.GetProcess()); err != nil {
return "", err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/plan/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func getExternalStats(node *plan.Node, builder *QueryBuilder) *Stats {
return DefaultHugeStats()
}
} else {
if err = InitInfileParam(param); err != nil {
if err = InitInfileOrStageParam(param, builder.compCtx.GetProcess()); err != nil {
return DefaultHugeStats()
}
}
Expand Down
Loading

0 comments on commit 07b25c1

Please sign in to comment.