Skip to content

Commit

Permalink
Fixed issues with DELETE statement, added update/delete mutation supp…
Browse files Browse the repository at this point in the history
…ort for sqlrunner, add mutation test cases.
  • Loading branch information
gamolina committed Aug 4, 2024
1 parent 0d2915d commit 3c9b1c5
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 41 deletions.
3 changes: 3 additions & 0 deletions core/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,9 @@ func (s *Session) UpdateRow(table string, columnID uint64, updValueMap map[strin
tbuf.CurrentColumnID = columnID
tbuf.CurrentTimestamp = timePartition
for k, vc := range updValueMap {
if _, found := tbuf.PKMap[k]; found {
return fmt.Errorf("cannot update PK column %s.%s", table, k)
}
_, err := s.MapValue(table, k, vc.Value.Value(), true)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions qlbridge/exec/mutations.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func (m *DeletionTask) Run() error {
defer m.Ctx.Recover()
defer close(m.msgOutCh)

if m.sql.Where == nil {
return fmt.Errorf("Must provide a predicate")
}
vals := make([]driver.Value, 2)
deletedCt, err := m.db.DeleteExpression(m.p, m.sql.Where.Expr)
if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions quanta-admin-lib/partitionpurge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package admin

import (
"fmt"
"strings"

"github.com/disney/quanta/shared"
)

// OfflinePartitionsCmd - Offline partitions command.
type OfflinePartitionsCmd struct {
Timestamp string `arg:"" name:"time" help:"Ending time quantum value to show (shards before this date/time)."`
Table string `name:"table" help:"Table name (optional)."`
}

// Run - Offline command implementation
func (f *OfflinePartitionsCmd) Run(ctx *Context) error {

if len(f.Timestamp) == 13 && strings.Contains(f.Timestamp, "T") {
f.Timestamp = strings.ReplaceAll(f.Timestamp, "T", " ") + ":00"
}

ts, tf, err := shared.ToTQTimestamp("YMDH", f.Timestamp)
if err != nil {
return err
}

if ctx.Debug {
fmt.Printf("\nTimestamp = %v, Partitions before and including = %v\n", ts, tf)
}

conn := shared.GetClientConnection(ctx.ConsulAddr, ctx.Port, "offlinePartitions")
defer conn.Disconnect()

bitClient := shared.NewBitmapIndex(conn)
err = bitClient.OfflinePartitions(ts, f.Table)
if err != nil {
return err
}
return nil
}
13 changes: 13 additions & 0 deletions source/sql_to_quanta.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,15 @@ func (m *SQLToQuanta) WalkSourceSelect(planner plan.Planner, p *plan.Source) (pl
*/

if m.p.Complete {
u.Debugf("DONT NEED POST PREDICATE WHERE PROCESSOR")
sessionMap[exec.WHERE_MAKER] = func(ctx *plan.Context, p *plan.Where) exec.TaskRunner {
return NewNopTask(ctx)
}
v := sessionMap[exec.WHERE_MAKER]
sk := SchemaInfoString{k: exec.WHERE_MAKER}
p.Context().Session.Put(sk, nil, value.NewValue(v))
} else {
u.Debugf("USING A POST PREDICATE WHERE PROCESSOR")
dm := make(map[string]value.Value)
dm[exec.WHERE_MAKER] = value.NilValueVal
p.Context().Session.Delete(dm)
Expand Down Expand Up @@ -1828,6 +1830,8 @@ func (m *SQLToQuanta) DeleteExpression(p interface{}, where expr.Node) (int, err
// Construct query
m.q = shared.NewBitmapQuery()
frag := m.q.NewQueryFragment()
m.startDate = ""
m.endDate = ""

var err error
m.conn, err = m.s.sessionPool.Borrow(m.tbl.Name)
Expand All @@ -1849,6 +1853,14 @@ func (m *SQLToQuanta) DeleteExpression(p interface{}, where expr.Node) (int, err
return 0, fmt.Errorf("query must have a predicate")
}

if m.startDate == "" {
m.startDate = "1970-01-01T00"
}
if m.endDate == "" {
end := time.Now().AddDate(0, 0, 1)
m.endDate = end.Format(shared.YMDHTimeFmt)
}

m.q.FromTime = m.startDate
m.q.ToTime = m.endDate

Expand All @@ -1858,6 +1870,7 @@ func (m *SQLToQuanta) DeleteExpression(p interface{}, where expr.Node) (int, err
response = &shared.BitmapQueryResponse{Success: true}
response.Results = m.rowNumSet
response.Count = m.rowNumSet.GetCardinality()
return 0, nil
} else {
response, err = m.conn.BitIndex.Query(m.q)
if err != nil {
Expand Down
38 changes: 19 additions & 19 deletions sqlrunner/sqlscripts/basic_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,23 @@ select count(*) from customers_qa where isActive != true;@6
select count(*) from customers_qa where isActive != false;@24
select count(*) from customers_qa where isActive != 1;@6
select count(*) from customers_qa where isActive != 0;@24
select max(age) as max_age from customers_qa where max_age = 88 limit 1;@1
select min(age) as min_age from customers_qa where min_age = 5 limit 1;@1
select avg(age) as avg_age from customers_qa where avg_age = 46 limit 1;@1
select sum(age) as sum_age from customers_qa where sum_age = 557 limit 1;@1
select avg(age) as avg_age from customers_qa where age between 43 and 54 and avg_age = 46 limit 1;@1
-- FIXME: (atw) This is returning a row even though avg(age) is 46. Zero seems correct and not 1.
-- All of the aggegate tests need to be rewritten after sqlrunner is updated to check return values.
-- select max(age) as max_age from customers_qa where max_age = 88 limit 1;@1
-- select min(age) as min_age from customers_qa where min_age = 5 limit 1;@1
-- select avg(age) as avg_age from customers_qa where avg_age = 46 limit 1;@1
-- select sum(age) as sum_age from customers_qa where sum_age = 557 limit 1;@1
-- select avg(age) as avg_age from customers_qa where age between 43 and 54 and avg_age = 46 limit 1;@1
-- select avg(age) as avg_age from customers_qa where age > 55 and avg_age = 70 limit 1;
select sum(age) as sum_age from customers_qa where age between 43 and 54 and sum_age = 92 limit 1;@1
select min(age) as min_age from customers_qa where age > 55 and min_age = 59;@1
select max(age) as max_age from customers_qa where age > 55 and max_age = 59;@1
select min(age) as min_age from customers_qa where age < 55 and min_age = 59;@1
select max(age) as max_age from customers_qa where age < 55 and max_age = 59;@1
select min(age) as min_age from customers_qa where age >= 55 and min_age = 59;@1
select max(age) as max_age from customers_qa where age >= 55 and max_age = 59;@1
select min(age) as min_age from customers_qa where age <= 55 and min_age = 59;@1
select max(age) as max_age from customers_qa where age <= 55 and max_age = 59;@1
select min(height) as min_height from customers_qa where min_height = 48 limit 1;@1
select max(height) as max_height from customers_qa where max_height = 76 limit 1;@1
select avg(height) as avg_height from customers_qa where avg_height = 0 limit 1;@1
select sum(height) as sum_height from customers_qa where sum_height = 0 limit 1;@1
-- select sum(age) as sum_age from customers_qa where age between 43 and 54 and sum_age = 92 limit 1;@1
-- select min(age) as min_age from customers_qa where age > 55 and min_age = 59;@1
-- select max(age) as max_age from customers_qa where age > 55 and max_age = 59;@1
-- select min(age) as min_age from customers_qa where age < 55 and min_age = 59;@1
-- select max(age) as max_age from customers_qa where age < 55 and max_age = 59;@1
-- select min(age) as min_age from customers_qa where age >= 55 and min_age = 59;@1
-- select max(age) as max_age from customers_qa where age >= 55 and max_age = 59;@1
-- select min(age) as min_age from customers_qa where age <= 55 and min_age = 59;@1
-- select max(age) as max_age from customers_qa where age <= 55 and max_age = 59;@1
-- select min(height) as min_height from customers_qa where min_height = 48 limit 1;@1
-- select max(height) as max_height from customers_qa where max_height = 76 limit 1;@1
-- select avg(height) as avg_height from customers_qa where avg_height = 0 limit 1;@1
-- select sum(height) as sum_height from customers_qa where sum_height = 0 limit 1;@1
121 changes: 121 additions & 0 deletions sqlrunner/sqlscripts/mutate_tests_body.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
quanta-admin drop orders_qa
quanta-admin drop customers_qa
quanta-admin create customers_qa

-- 10 full insert statements
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('200','Bob','Madden','313 First Dr','Santa Fe','NM','99887',,'2010-01-03 00:00:00.000Z','2010-01-01 12:23:34.000Z','2011-01-05 01:02:03.000Z','aaaaabbbbbcccccdddd','887-222-3333','unknown',true,'2000-01-01',true,42,71,4,1)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('210','Iris','Henderson','999 Main Dr','Albequerque','NM','99887','2011-03-01 00:00:00.000Z','2011-01-05 01:02:03.000Z','2010-01-01 12:23:34.000Z','aaaaabbbbbcccccdddd','554-222-3333','',0,'2000-01-01',true,25,62,1,2);
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('220','Ted','Adams','977 Ute Ave','Denver','CO','92323','','2012-01-10 08:59:59.000Z','2014-03-03 23:59:59.000Z','aaaaabbbbbcccccdddd','907-222-3333',';',true,'2000-01-01',true,88,68,2,3)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('230','Larry','Russell','6823 Egret Ave','Tacoma','WA','98826','','2013-02-28 23:00:00.000Z','1923-05-22 12:12:31.000Z','aaaaabbbbbcccccdddd','222-222-3333','business',false,'2000-01-01',true,44,75,8,4)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('240','Forrest','Chambers','3232 Second St','Walla Walla','WA','98826','','2014-03-03 23:59:59.000Z','1969-07-16 20:17:11.000Z','aaaaabbbbbcccccdddd','242-222-3333','cell',true,'2000-01-01',true,16,59,3,5)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('250','Mike','Emerson','9293 Sans St','Boise','ID','91321','','2015-04-23 11:00:23.000Z','2014-03-03 23:59:59.000Z','aaaaabbbbbcccccdddd','144-222-3333',false,'2000-01-01',true,48,48,7,6)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('260','Henry','Talley','2422 Arco Rd','Billings','MT','98232','2015-05-04 00:00:00.000Z','2016-05-29 09:09:09.000Z','2016-05-29 09:09:09.000Z','aaaaabbbbbcccccdddd','555-222-3333','N/A',true,'2000-01-01',true,59,59,5,7)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('270','Aaron','Levy','1st Street','Colorado Springs','CO','92323','','2017-07-31 05:00:00.000Z','1842-07-04 01:31:23.000Z','aaaaabbbbbcccccdddd','907-222-3333','UNKNOWN',false,'2000-01-01',true,75,55,3,8)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('280','Keefer','Cash','9323 Semper Ave','Bellingham','WA','98282','','2018-12-25 11:11:11.000Z','2017-07-31 05:00:00.000Z','aaaaabbbbbcccccdddd','833-222-3333','cell;home;business',true,'2000-01-01',true,55,76,4,9)
insert into customers_qa (cust_id,first_name,last_name, address, city, state, zip, createdAtTimestamp, timestamp_micro, timestamp_millis, hashedCustId, phone, phoneType, isActive, birthdate, isLegalAge, age, height, numFamilyMembers, rownum) values('290','Bill','Gregory','224 Null Street','Wenatchee','WA','98826','2018-10-01 00:00:00.000Z','2019-07-04 01:31:23.000Z','2012-01-10 08:59:59.000Z','aaaaabbbbbcccccdddd','443-222-3333','home;business',true,'2000-01-01',true,5,72,1,10)
commit

-- Prior to this the basic data load has completed

-- Attempt update of primary key
update customers_qa set cust_id = null where state = 'ID';@0\ERR:cannot update PK column customers_qa.cust_id

-- Update non-existant table
update puke set yakk = 'yakk' where state = 'ID';@0\ERR:Could not find a DataSource for that table "puke"

-- Update non-existant column
update customers_qa set yakk = 'yakk' where state = 'ID';@0\ERR:attribute 'yakk' not found

-- Must provide a predicate
update customers_qa set phoneType = null;@0\ERR:must provide a predicate

select count(*) from customers_qa;@10

-- Set an IntBSI column to a different value
update customers_qa set age = 99 where state = 'ID';@1
commit
select count(*) from customers_qa where age = 99;@1

-- Set an IntBSI column to null
update customers_qa set age = null where state = 'ID';@1
commit
select count(*) from customers_qa where age is null;@1


-- Set an StringHashBSI column to a different value
select count(*) from customers_qa where hashedCustId != null and state = 'CO';@2
update customers_qa set hashedCustId = 'XXX' where state = 'CO';@2
commit
select count(*) from customers_qa where hashedCustId = 'XXX' and state = 'CO';@2

-- Set an StringHashBSI column to null
update customers_qa set hashedCustId = null where state = 'CO';@2
commit
select count(*) from customers_qa where hashedCustId is null and state = 'CO';@2

-- Set a new value on a non-exclusive StringEnum
select count(*) from customers_qa where phoneType = 'cell' and phoneType = 'business';@1
select count(*) from customers_qa where phoneType is null and state = 'ID';@1
update customers_qa set phoneType = 'cell' where state = 'ID';@1
commit
select count(*) from customers_qa where phoneType = 'cell' and state = 'ID';@1
update customers_qa set phoneType = 'business' where state = 'ID';@1
commit
select count(*) from customers_qa where phoneType = 'cell' and phoneType = 'business';@2

-- Set a non-exclusive StringEnum to null
update customers_qa set phoneType = null where state = 'ID';@1
commit
select count(*) from customers_qa where phoneType = 'cell' and phoneType = 'business';@1
select count(*) from customers_qa where phoneType = 'cell';@2
select count(*) from customers_qa where phoneType = 'business';@3

-- Set a boolean to different value
select count(*) from customers_qa where isLegalAge = true;@10
update customers_qa set isLegalAge = false where state = 'ID';@1
commit
select count(*) from customers_qa where isLegalAge = true;@9

-- Set boolean value to null
select count(*) from customers_qa where isLegalAge is not null;@10
update customers_qa set isLegalAge = null where state = 'ID';@1
commit
select count(*) from customers_qa where isLegalAge is not null;@9

-- Set float value to something different
select count(*) from customers_qa where height = 0;@0
update customers_qa set height = 0 where state = 'ID';@1
commit
select count(*) from customers_qa where height != 0;@9

-- Set float value to null
select count(*) from customers_qa where height is null;@0
update customers_qa set height = null where state = 'ID';@1
commit
select count(*) from customers_qa where height is not null;@9


-- DELETE tests

-- Delete non-existant table
delete from puke where state = 'ID';@0\ERR:Could not find a DataSource for that table "puke"

-- Must provide a predicate
delete from customers_qa;@0\ERR:must provide a predicate

-- Empty delete
delete from customers_qa where cust_id is null;@0
commit

-- Simple delete
select count(*) from customers_qa;@10
delete from customers_qa where state = 'ID';@1
commit
select count(*) from customers_qa;@9

-- delete batch
delete from customers_qa where cust_id is not null;@9
commit
select count(*) from customers_qa;@0


32 changes: 31 additions & 1 deletion test-integration/sqlrunner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestBasic(t *testing.T) {
assert.Equal(t, 0, len(got.FailedChildren))

state.Release()
// FIXME: see: select avg(age) as avg_age from customers_qa where age > 55 and avg_age = 70 limit 1; in the file
}

func TestInsert(t *testing.T) {
Expand Down Expand Up @@ -77,6 +76,37 @@ func TestInsert(t *testing.T) {
state.Release()
}

func TestMutate(t *testing.T) {
shared.SetUTCdefault()

isLocalRunning := test.IsLocalRunning()
// erase the storage
if !isLocalRunning { // if no cluster is up
err := os.RemoveAll("../test/localClusterData/") // start fresh
check(err)
}
// ensure we have a cluster on localhost, start one if necessary
state := test.Ensure_cluster(3)

fmt.Println("TestBasic")
currentDir, err := os.Getwd()
check(err)
err = os.Chdir("../sqlrunner") // these run from the sqlrunner/ directory
check(err)
defer os.Chdir(currentDir)

got := test.ExecuteSqlFile(state, "../sqlrunner/sqlscripts/mutate_tests_body.sql")

for _, child := range got.FailedChildren {
fmt.Println("child failed", child.Statement)
}

assert.Equal(t, got.ExpectedRowcount, got.ActualRowCount)
assert.Equal(t, 0, len(got.FailedChildren))

state.Release()
}

func TestJoins(t *testing.T) {
shared.SetUTCdefault()
isLocalRunning := test.IsLocalRunning()
Expand Down
31 changes: 10 additions & 21 deletions test/sql-types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type StatementType int64
const (
Insert StatementType = 0
Update StatementType = 1
Select StatementType = 2
Count StatementType = 3
Admin StatementType = 4
Create StatementType = 5
Delete StatementType = 2
Select StatementType = 3
Count StatementType = 4
Admin StatementType = 5
Create StatementType = 6
)

var PassCount int64
Expand Down Expand Up @@ -117,6 +118,8 @@ func AnalyzeRow(proxyConfig ProxyConnectStrings, row []string, validate bool) Sq
statementType = Insert
} else if strings.HasPrefix(lowerStmt, "update") {
statementType = Update
} else if strings.HasPrefix(lowerStmt, "delete") {
statementType = Delete
} else if strings.HasPrefix(lowerStmt, "select") {
statementType = Select
if strings.Contains(sqlInfo.Statement, "count(*)") {
Expand Down Expand Up @@ -166,15 +169,11 @@ func AnalyzeRow(proxyConfig ProxyConnectStrings, row []string, validate bool) Sq
}

switch statementType {
case Insert:
sqlInfo.ExecuteInsert(db)
case Update:
sqlInfo.ExecuteUpdate(db)
case Insert, Update, Delete:
sqlInfo.ExecuteStatement(db)
case Select:
// time.Sleep(500 * time.Millisecond)
sqlInfo.ExecuteQuery(db)
case Count:
//time.Sleep(500 * time.Millisecond)
sqlInfo.ExecuteScalar(db)
case Create:
sqlInfo.ExecuteCreate(db)
Expand Down Expand Up @@ -215,7 +214,7 @@ func (s *SqlInfo) ExecuteAdmin() {
}
}

func (s *SqlInfo) ExecuteInsert(db *sql.DB) {
func (s *SqlInfo) ExecuteStatement(db *sql.DB) {

var res sql.Result
log.Printf("Insert Statement : %s", s.Statement)
Expand All @@ -226,16 +225,6 @@ func (s *SqlInfo) ExecuteInsert(db *sql.DB) {
s.logResult()
}

func (s *SqlInfo) ExecuteUpdate(db *sql.DB) {

var res sql.Result
res, s.Err = db.Exec(s.Statement)
if res != nil {
s.ActualRowCount, _ = res.RowsAffected()
}
s.logResult()
}

func (s *SqlInfo) ExecuteQuery(db *sql.DB) {

var rows *sql.Rows
Expand Down

0 comments on commit 3c9b1c5

Please sign in to comment.