Skip to content

Commit

Permalink
feat: add support for timestamp_ntz data type (#32)
Browse files Browse the repository at this point in the history
* add support for timestamp_ntz type

* add example for timestamp_ntz
  • Loading branch information
robin11 authored Oct 30, 2024
1 parent c7b6985 commit a02590d
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 32 deletions.
15 changes: 10 additions & 5 deletions examples/sdk/create_table_use_table_schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func main() {
}

c15 := tableschema.Column{
Name: "timestamp_ntz_type",
Type: datatype.TimestampNtzType,
}

c16 := tableschema.Column{
Name: "boolean_type",
Type: datatype.BooleanType,
}
Expand All @@ -101,22 +106,22 @@ func main() {
)
jsonType := datatype.NewJsonType()

c16 := tableschema.Column{
c17 := tableschema.Column{
Name: "map_type",
Type: mapType,
}

c17 := tableschema.Column{
c18 := tableschema.Column{
Name: "array_type",
Type: arrayType,
}

c18 := tableschema.Column{
c19 := tableschema.Column{
Name: "struct_type",
Type: structType,
}

c19 := tableschema.Column{
c20 := tableschema.Column{
Name: "json_type",
Type: jsonType,
}
Expand All @@ -133,7 +138,7 @@ func main() {

schemaBuilder := tableschema.NewSchemaBuilder()
schemaBuilder.Name("all_types_demo").
Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19).
Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
PartitionColumns(p1, p2).
Lifecycle(2) // 单位: 天

Expand Down
2 changes: 2 additions & 0 deletions examples/sdk/tunnel/upload_data_use_protoc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
timestampNtz, _ := data.NewTimestampNtz("2022-10-19 17:00:00.000")

mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
Expand Down Expand Up @@ -172,6 +173,7 @@ func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
date,
datetime,
timestamp,
timestampNtz,
data.Bool(true),
mapData,
arrayData,
Expand Down
8 changes: 5 additions & 3 deletions examples/sdk/tunnel/upload_data_use_stream_tunnel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
timestampNtz, _ := data.NewTimestampNtz("2022-10-19 17:00:00.000")

mapType := schema.Columns[15].Type.(datatype.MapType)
mapType := schema.Columns[17].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
Expand All @@ -94,7 +95,7 @@ func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
return nil, err
}

arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayType := schema.Columns[18].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
Expand All @@ -106,7 +107,7 @@ func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
return nil, err
}

structType := schema.Columns[17].Type.(datatype.StructType)
structType := schema.Columns[19].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)

arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
Expand Down Expand Up @@ -142,6 +143,7 @@ func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
date,
datetime,
timestamp,
timestampNtz,
data.Bool(true),
mapData,
arrayData,
Expand Down
1 change: 1 addition & 0 deletions examples/sql/create_table/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
" date_type date," +
" datetime_type datetime," +
" timestamp_type timestamp," +
" timestamp_ntz_type timestamp_ntz," +
" boolean_type boolean," +
" map_type map<string, bigint>," +
" array_type array< string>," +
Expand Down
3 changes: 2 additions & 1 deletion examples/sql/select_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func main() {
var date sqldriver.NullDate // if the column is not nullable, odps/data.Date is ok too
var dateTime sqldriver.NullDateTime // if the column is not nullable, odps/data.Datetime is ok too
var timestamp sqldriver.NullTimeStamp // if the column is not nullable, odps/data.TimeStamp is ok too
var timestampNtz sqldriver.NullTimeStampNtz // if the column is not nullable, odps/data.TimeStamp is ok too
var boolData sqldriver.NullBool // if the column is not nullable, bool is ok too
var mapData sqldriver.Map
var arrayData sqldriver.Array
Expand All @@ -56,7 +57,7 @@ func main() {

record := []interface{}{
&tinyInt, &smallInt, &intData, &bigInt, &binaryData, &floatData, &doubleData, &decimal, &varchar,
&char, &stringData, &date, &dateTime, &timestamp, &boolData, &mapData, &arrayData, &structType,
&char, &stringData, &date, &dateTime, &timestamp, &timestampNtz, &boolData, &mapData, &arrayData, &structType,
&p1, &p2,
}

Expand Down
2 changes: 2 additions & 0 deletions examples/sql/select_data_1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func main() {
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullTimeStamp:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullTimeStampNtz:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullBool:
fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
case *sqldriver.Map:
Expand Down
30 changes: 30 additions & 0 deletions odps/data/datetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
type Date time.Time
type DateTime time.Time
type Timestamp time.Time
type TimestampNtz time.Time

func NewDate(s string) (Date, error) {
t, err := time.ParseInLocation(DateFormat, s, time.UTC)
Expand Down Expand Up @@ -129,3 +130,32 @@ func (t Timestamp) Sql() string {
func (t *Timestamp) Scan(value interface{}) error {
return errors.WithStack(tryConvertType(value, t))
}

func NewTimestampNtz(s string) (TimestampNtz, error) {
t, err := time.Parse(TimeStampFormat, s)
if err != nil {
return TimestampNtz(time.Time{}), err
}
return TimestampNtz(t), nil
}

func (t TimestampNtz) Type() datatype.DataType {
return datatype.TimestampNtzType
}

func (t TimestampNtz) Time() time.Time {
return time.Time(t)
}

func (t TimestampNtz) String() string {
ts := time.Time(t)
return ts.Format(TimeStampFormat)
}

func (t TimestampNtz) Sql() string {
return fmt.Sprintf("timestamp_ntz'%s'", t.String())
}

func (t *TimestampNtz) Scan(value interface{}) error {
return errors.WithStack(tryConvertType(value, t))
}
6 changes: 6 additions & 0 deletions odps/datatype/data_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
VARCHAR
DATE
TIMESTAMP
TIMESTAMP_NTZ
BINARY
IntervalDayTime
IntervalYearMonth
Expand Down Expand Up @@ -90,6 +91,8 @@ func TypeCodeFromStr(s string) TypeID {
return DATE
case "TIMESTAMP":
return TIMESTAMP
case "TIMESTAMP_NTZ":
return TIMESTAMP_NTZ
case "BINARY":
return BINARY
case "INTERVAL_DAY_TIME":
Expand Down Expand Up @@ -152,6 +155,8 @@ func (t TypeID) String() string {
return "DATE"
case TIMESTAMP:
return "TIMESTAMP"
case TIMESTAMP_NTZ:
return "TIMESTAMP_NTZ"
case BINARY:
return "BINARY"
case IntervalDayTime:
Expand Down Expand Up @@ -462,6 +467,7 @@ var BooleanType = PrimitiveType{BOOLEAN}
var DateType = PrimitiveType{DATE}
var DateTimeType = PrimitiveType{DATETIME}
var TimestampType = PrimitiveType{TIMESTAMP}
var TimestampNtzType = PrimitiveType{TIMESTAMP_NTZ}
var StringType = PrimitiveType{STRING}
var FloatType = PrimitiveType{FLOAT}
var BinaryType = PrimitiveType{BINARY}
Expand Down
3 changes: 2 additions & 1 deletion odps/sql_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"encoding/csv"
"encoding/json"
"encoding/xml"
"strings"

"github.com/aliyun/aliyun-odps-go-sdk/odps/common"
"github.com/pkg/errors"
"strings"
)

type SQLTask struct {
Expand Down
46 changes: 24 additions & 22 deletions odps/tableschema/arrow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ import (
)

// TypeToArrowType convert odps field type to arrow field type
//* Storage Type | Arrow Type
//* ----------------------+---------------------
//* boolean | boolean
//* tinyint | int8
//* smallint | int16
//* int | int32
//* bigint | int64
//* float | float32
//* double | float64
//* char | utf8
//* varchar | utf8
//* string | utf8
//* binary | binary
//* date | date32
//* datetime | timestamp(nano)
//* timestamp | timestamp(nano) 【注:精度选择功能开发中】
//* interval_day_time | day_time_interval
//* interval_year_month | month_interval
//* decimal | decimal
//* struct | struct
//* array | list
//* map | map
// * Storage Type | Arrow Type
// * ----------------------+---------------------
// * boolean | boolean
// * tinyint | int8
// * smallint | int16
// * int | int32
// * bigint | int64
// * float | float32
// * double | float64
// * char | utf8
// * varchar | utf8
// * string | utf8
// * binary | binary
// * date | date32
// * datetime | timestamp(nano)
// * timestamp | timestamp(nano) 【注:精度选择功能开发中】
// * interval_day_time | day_time_interval
// * interval_year_month | month_interval
// * decimal | decimal
// * struct | struct
// * array | list
// * map | map
func TypeToArrowType(odpsType datatype.DataType) (arrow.DataType, error) {
switch odpsType.ID() {
case datatype.BOOLEAN:
Expand Down Expand Up @@ -72,6 +72,8 @@ func TypeToArrowType(odpsType datatype.DataType) (arrow.DataType, error) {
//return &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, nil
case datatype.TIMESTAMP:
return arrow.FixedWidthTypes.Timestamp_ns, nil
case datatype.TIMESTAMP_NTZ:
return arrow.FixedWidthTypes.Timestamp_ns, nil
//return &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "UTC"}, nil
case datatype.IntervalDayTime:
return arrow.FixedWidthTypes.DayTimeInterval, nil
Expand Down
12 changes: 12 additions & 0 deletions odps/tunnel/record_protoc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,18 @@ func (r *RecordProtocReader) readField(dt datatype.DataType) (data.Data, error)
r.recordCrc.Update(nanoSeconds)

fieldValue = data.Timestamp(time.Unix(seconds, int64(nanoSeconds)))
case datatype.TIMESTAMP_NTZ:
seconds, err := r.protocReader.ReadSInt64()
if err != nil {
return nil, errors.WithStack(err)
}
nanoSeconds, err := r.protocReader.ReadSInt32()
if err != nil {
return nil, errors.WithStack(err)
}
r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)
fieldValue = data.TimestampNtz(time.Unix(seconds, int64(nanoSeconds)).UTC())
case datatype.DECIMAL:
v, err := r.protocReader.ReadBytes()
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions odps/tunnel/record_protoc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (r *RecordProtocWriter) writeFieldTag(colIndex int, dt datatype.DataType) e
wireType = protowire.Fixed32Type
case datatype.IntervalDayTime,
datatype.TIMESTAMP,
datatype.TIMESTAMP_NTZ,
datatype.STRING,
datatype.CHAR,
datatype.VARCHAR,
Expand Down Expand Up @@ -261,6 +262,20 @@ func (r *RecordProtocWriter) writeField(val data.Data) error {
return errors.WithStack(err)
}

return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
case data.TimestampNtz:
t := val.Time()
seconds := t.Unix()
nanoSeconds := int32(t.Nanosecond())

r.recordCrc.Update(seconds)
r.recordCrc.Update(nanoSeconds)

err := r.protocWriter.WriteSInt64(seconds)
if err != nil {
return errors.WithStack(err)
}

return errors.WithStack(r.protocWriter.WriteSInt32(nanoSeconds))
case data.Decimal:
b := []byte(val.Value())
Expand Down
13 changes: 13 additions & 0 deletions sqldriver/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type NullString sql.NullString
type NullDate sql.NullTime
type NullDateTime sql.NullTime
type NullTimeStamp sql.NullTime
type NullTimeStampNtz sql.NullTime
type Binary sql.RawBytes
type Decimal data.Decimal
type Map data.Map
Expand Down Expand Up @@ -64,6 +65,10 @@ func (n *NullTimeStamp) Scan(value interface{}) error {
return (*sql.NullTime)(n).Scan(value)
}

func (n *NullTimeStampNtz) Scan(value interface{}) error {
return (*sql.NullTime)(n).Scan(value)
}

func (n *Decimal) Scan(value interface{}) error {
return (*data.Decimal)(n).Scan(value)
}
Expand Down Expand Up @@ -196,6 +201,10 @@ func (n NullTimeStamp) IsNull() bool {
return !n.Valid
}

func (n NullTimeStampNtz) IsNull() bool {
return !n.Valid
}

func (n NullBool) IsNull() bool {
return !n.Valid
}
Expand Down Expand Up @@ -240,6 +249,10 @@ func (n NullTimeStamp) String() string {
return data.Timestamp(n.Time).String()
}

func (n NullTimeStampNtz) String() string {
return data.TimestampNtz(n.Time).String()
}

func (n Decimal) String() string {
return data.Decimal(n).String()
}
Expand Down
4 changes: 4 additions & 0 deletions sqldriver/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (rr *rowsReader) Next(dst []driver.Value) error {
dst[i] = time.Time(ri.(data.Date))
case datatype.TIMESTAMP:
dst[i] = time.Time(ri.(data.Timestamp))
case datatype.TIMESTAMP_NTZ:
dst[i] = time.Time(ri.(data.TimestampNtz))
//case datatype.DECIMAL:
// dst[i] = ri
//case datatype.MAP:
Expand Down Expand Up @@ -191,6 +193,8 @@ func (rr *rowsReader) ColumnTypeScanType(index int) reflect.Type {
return reflect.TypeOf(NullDate{})
case datatype.TIMESTAMP:
return reflect.TypeOf(NullTimeStamp{})
case datatype.TIMESTAMP_NTZ:
return reflect.TypeOf(NullTimeStampNtz{})
case datatype.DECIMAL:
return reflect.TypeOf(Decimal{})
case datatype.MAP:
Expand Down

0 comments on commit a02590d

Please sign in to comment.