Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tpch support to develop before query processing changes #145

Merged
merged 19 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d287f56
Added sourceOrdinal to load 'csv' like files (tpc-h data).
gamolina Jun 12, 2024
95c9708
Added tpc-h data producer to generate kinesis data, fix bugs so that …
gamolina Jun 15, 2024
2109e57
Added tpc-h data producer to generate kinesis data, fix bugs so that …
gamolina Jun 16, 2024
c887cd9
Added ability to use directly mapped surrogate keys to column IDs and…
gamolina Jun 20, 2024
f14c153
Eliminate the 'commit interval' parameter from kinesis consumer as it…
gamolina Jun 28, 2024
ab35e53
Refactor the consumer to properly batch incoming data for improved bu…
gamolina Jul 8, 2024
4c9f4c9
Add TPC-H config files.
gamolina Jul 8, 2024
e49756e
Add TPC-H create and drop scripts.
gamolina Jul 8, 2024
66bbd65
More burst capacity fixes post load test.
gamolina Jul 14, 2024
5e343f1
Fix incorrect error counter increment for flushes.
gamolina Jul 14, 2024
30d893d
Fix tests,,,, maybe
gamolina Jul 15, 2024
2ab0151
Disable part of the test suite for now.
gamolina Jul 15, 2024
a3fdd7e
Don't trap SIGQUIT, because we can't get thread dumps.
gamolina Jul 17, 2024
171d931
Fix deadlock in node Sync() code. Admin tool verbosity changes.
gamolina Jul 18, 2024
b768144
Partition info report feature added to admin tool
gamolina Jul 26, 2024
1e1474d
Added partition offlining/purge feature to admin tool.
gamolina Jul 27, 2024
1793bfd
Allow for batch updates and properly implement 'exclusive' bitmap pro…
gamolina Aug 1, 2024
0d2915d
Null out non-exclusive fields bug.
gamolina Aug 1, 2024
3c9b1c5
Fixed issues with DELETE statement, added update/delete mutation supp…
gamolina Aug 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Docker/kinesis_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,4 @@ if [ -n "$SCAN_INTERVAL" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --scan-interval="${SCAN_INTERVAL}""
fi
if [ -n "$COMMIT_INTERVAL" ]
then
BOOL_FLAGS=${BOOL_FLAGS}" --commit-interval="${COMMIT_INTERVAL}""
fi
exec /usr/bin/quanta-kinesis-consumer ${STREAM} ${SCHEMA} ${SHARD_KEY} ${REGION} ${BOOL_FLAGS}
119 changes: 92 additions & 27 deletions core/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewStringHashBSIMapper(conf map[string]string) (Mapper, error) {
}

// MapValue - Map a string value to an int64
func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Session) (result uint64, err error) {
func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Session,
isUpdate bool) (result uint64, err error) {

var strVal string
switch val.(type) {
Expand Down Expand Up @@ -58,6 +59,15 @@ func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Sessi
strVal = string(b)
val = strVal
result = Get64BitHash(strVal)
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
if tbuf, ok := c.TableBuffers[attr.Parent.Name]; ok {
stringPath := indexPath(tbuf, attr.FieldName, "strings") // indexPath() is in core/session.go
c.BatchBuffer.SetPartitionedString(stringPath, tbuf.CurrentColumnID, strVal)
}
}
return
default:
err = fmt.Errorf("StringHashBSIMapper not expecting a '%T' for '%s'", val, attr.FieldName)
return
Expand All @@ -69,8 +79,8 @@ func (m StringHashBSIMapper) MapValue(attr *Attribute, val interface{}, c *Sessi
err = c.StringIndex.Index(strVal)
}
stringPath := indexPath(tbuf, attr.FieldName, "strings") // indexPath() is in core/session.go
c.BatchBuffer.SetPartitionedString(stringPath, tbuf.CurrentColumnID, val)
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
c.BatchBuffer.SetPartitionedString(stringPath, tbuf.CurrentColumnID, strVal)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
} else {
err = fmt.Errorf("table %s not open for this connection", attr.Parent.Name)
}
Expand All @@ -90,7 +100,7 @@ func NewBoolDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map boolean values true/false to rowid = 0 false rowid = 1 true
func (m BoolDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

result = uint64(0)
switch val.(type) {
Expand Down Expand Up @@ -125,12 +135,17 @@ func (m BoolDirectMapper) MapValue(attr *Attribute, val interface{},
if val.(int64) == 1 {
result = uint64(1)
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
return
}
default:
err = fmt.Errorf("%v: No handling for type '%T'", val, val)
return
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand All @@ -156,7 +171,7 @@ func NewIntDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row ID.
func (m IntDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case uint64:
Expand Down Expand Up @@ -185,11 +200,17 @@ func (m IntDirectMapper) MapValue(attr *Attribute, val interface{},
return
}
result = uint64(v)
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
return
}
default:
err = fmt.Errorf("%v: No handling for type '%T'", val, val)
return
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand All @@ -212,18 +233,22 @@ func NewStringToIntDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row ID.
func (m StringToIntDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

if val == nil && c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}

var v int64
v, err = strconv.ParseInt(strings.TrimSpace(val.(string)), 10, 64)
if err == nil && c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
}
if v <= 0 {
err = fmt.Errorf("cannot map %d as a positive non-zero value", v)
return
}
result = uint64(v)
if err == nil && c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}

Expand All @@ -239,7 +264,7 @@ func NewFloatScaleBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to an int64.
func (m FloatScaleBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

var floatVal float64
switch val.(type) {
Expand All @@ -260,6 +285,11 @@ func (m FloatScaleBSIMapper) MapValue(attr *Attribute, val interface{},
if err != nil {
return
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("type passed for '%s' is of type '%T' which in unsupported", attr.FieldName, val)
return
Expand All @@ -286,7 +316,7 @@ func (m FloatScaleBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(0)
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -303,7 +333,7 @@ func NewIntBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to an int64.
func (m IntBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case int64:
Expand All @@ -330,11 +360,16 @@ func (m IntBSIMapper) MapValue(attr *Attribute, val interface{},
return
}
result = uint64(v)
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -358,7 +393,7 @@ func NewStringEnumMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row id.
func (m StringEnumMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

var multi []string
switch val.(type) {
Expand All @@ -380,6 +415,11 @@ func (m StringEnumMapper) MapValue(attr *Attribute, val interface{},
case int64:
strVal := fmt.Sprintf("%d", val.(int64))
multi = []string{strVal}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}
return
default:
return 0, fmt.Errorf("cannot cast '%s' from '%T' to a string", attr.FieldName, val)
}
Expand All @@ -393,7 +433,7 @@ func (m StringEnumMapper) MapValue(attr *Attribute, val interface{},
if result, err = attr.GetValue(val); err != nil {
return
}
if err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries); err != nil {
if err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate); err != nil {
return
}
}
Expand Down Expand Up @@ -437,20 +477,25 @@ func NewBoolRegexMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row id.
func (m BoolRegexMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case bool:
result = uint64(0)
if val.(bool) {
result = uint64(1)
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}
return
default:
return 0, fmt.Errorf("cannot cast '%s' from '%T' to a string", attr.FieldName, val)
}

if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand Down Expand Up @@ -482,7 +527,7 @@ func NewSysMillisBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Maps a value to an int64
func (m SysMillisBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case string:
Expand All @@ -509,11 +554,16 @@ func (m SysMillisBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(val.(int64))
case float64:
result = uint64(val.(float64))
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -530,7 +580,7 @@ func NewSysMicroBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Maps a value to an int64.
func (m SysMicroBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case string:
Expand All @@ -557,11 +607,16 @@ func (m SysMicroBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(val.(int64))
case float64:
result = uint64(val.(float64))
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -578,7 +633,7 @@ func NewSysSecBSIMapper(conf map[string]string) (Mapper, error) {

// MapValue - Maps a value to an int64.
func (m SysSecBSIMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case string:
Expand All @@ -601,11 +656,16 @@ func (m SysSecBSIMapper) MapValue(attr *Attribute, val interface{},
result = uint64(val.(int64))
case int32:
result = uint64(val.(int32))
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, false)
}
return
default:
err = fmt.Errorf("%s: No handling for type '%T'", m.String(), val)
}
if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, false)
}
return
}
Expand All @@ -622,7 +682,7 @@ func NewIntToBoolDirectMapper(conf map[string]string) (Mapper, error) {

// MapValue - Map a value to a row id.
func (m IntToBoolDirectMapper) MapValue(attr *Attribute, val interface{},
c *Session) (result uint64, err error) {
c *Session, isUpdate bool) (result uint64, err error) {

switch val.(type) {
case int:
Expand All @@ -640,12 +700,17 @@ func (m IntToBoolDirectMapper) MapValue(attr *Attribute, val interface{},
if val.(string) == "true" {
result = uint64(1)
}
case nil:
if c != nil {
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, nil, isUpdate)
}
return
default:
return 0, fmt.Errorf("cannot cast '%s' from '%T' to a boolean", attr.FieldName, val)
}

if c != nil && err == nil {
err = m.UpdateBitmap(c, attr.Parent.Name, attr.FieldName, result, attr.IsTimeSeries)
err = m.MutateBitmap(c, attr.Parent.Name, attr.FieldName, result, isUpdate)
}
return
}
Expand Down
Loading
Loading