Skip to content

Commit

Permalink
handle dictionaries better
Browse files Browse the repository at this point in the history
  • Loading branch information
rpiazza committed Sep 16, 2024
1 parent 92231d9 commit f294727
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 44 deletions.
66 changes: 39 additions & 27 deletions stix2/datastore/relational_db/table_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,35 +343,44 @@ def generate_table_information(self, name, metadata, schema_name, table_name, is
nullable=False,
),
)
if len(self.valid_types) == 1:
if not isinstance(self.valid_types[0], ListProperty):
columns.append(
Column(
"value",
# its a class
determine_sql_type_from_class(self.valid_types[0]),
nullable=False,
),
)
if self.valid_types:
if len(self.valid_types) == 1:
if not isinstance(self.valid_types[0], ListProperty):
columns.append(
Column(
"value",
# its a class
determine_sql_type_from_class(self.valid_types[0]),
nullable=False,
),
)
else:
contained_class = self.valid_types[0].contained
columns.append(
Column(
"value",
# its an instance, not a class
ARRAY(contained_class.determine_sql_type()),
nullable=False,
),
)
else:
contained_class = self.valid_types[0].contained
columns.append(
Column(
"value",
# its an instance, not a class
ARRAY(contained_class.determine_sql_type()),
nullable=False,
),
)
for column_type in self.valid_types:
sql_type = determine_sql_type_from_class(column_type)
columns.append(
Column(
determine_column_name(column_type),
sql_type,
),
)
else:
for column_type in self.valid_types:
sql_type = determine_sql_type_from_class(column_type)
columns.append(
Column(
determine_column_name(column_type),
sql_type,
),
)
columns.append(
Column(
"value",
Text,
nullable=False,
),
)
return [
Table(
canonicalize_table_name(table_name + "_" + name),
Expand All @@ -383,6 +392,9 @@ def generate_table_information(self, name, metadata, schema_name, table_name, is
]





@add_method(EmbeddedObjectProperty)
def generate_table_information(self, name, metadata, schema_name, table_name, is_extension=False, is_list=False, **kwargs): # noqa: F811
level = kwargs.get("level")
Expand Down
46 changes: 29 additions & 17 deletions stix2/datastore/relational_db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,20 @@ def canonicalize_table_name(table_name, schema_name=None):
return inflection.underscore(full_name)


_IGNORE_OBJECTS = [ "language-content"]


def get_all_subclasses(cls):
all_subclasses = []

for subclass in cls.__subclasses__():
# This code might be useful if we decide that some objects just cannot have there tables
# automatically generated

# if hasattr(subclass, "_type") and subclass._type in _IGNORE_OBJECTS:
# print(f'It is currently not possible to create a table for {subclass._type}')
# return []
# else:
all_subclasses.append(subclass)
all_subclasses.extend(get_all_subclasses(subclass))
return all_subclasses
Expand Down Expand Up @@ -122,37 +132,39 @@ def flat_classes(class_or_classes):
yield class_or_classes


def determine_sql_type_from_class(cls): # noqa: F811
if cls == BinaryProperty:
def determine_sql_type_from_class(cls_or_inst): # noqa: F811
if cls_or_inst == BinaryProperty or isinstance(cls_or_inst, BinaryProperty):
return LargeBinary
elif cls == BooleanProperty:
elif cls_or_inst == BooleanProperty or isinstance(cls_or_inst, BooleanProperty):
return Boolean
elif cls == FloatProperty:
elif cls_or_inst == FloatProperty or isinstance(cls_or_inst, FloatProperty):
return Float
elif cls == HexProperty:
elif cls_or_inst == HexProperty or isinstance(cls_or_inst, HexProperty):
return LargeBinary
elif cls == IntegerProperty:
elif cls_or_inst == IntegerProperty or isinstance(cls_or_inst, IntegerProperty):
return Integer
elif cls == StringProperty or cls == ReferenceProperty:
elif (cls_or_inst == StringProperty or cls_or_inst == ReferenceProperty or
isinstance(cls_or_inst, StringProperty) or isinstance(cls_or_inst, ReferenceProperty)):
return Text
elif cls == TimestampProperty:
elif cls_or_inst == TimestampProperty or isinstance(cls_or_inst, TimestampProperty):
return TIMESTAMP(timezone=True)
elif cls == Property:
elif cls_or_inst == Property or isinstance(cls_or_inst, Property):
return Text


def determine_column_name(cls): # noqa: F811
if cls == BinaryProperty:
def determine_column_name(cls_or_inst): # noqa: F811
if cls_or_inst == BinaryProperty or isinstance(cls_or_inst, BinaryProperty):
return "binary_value"
elif cls == BooleanProperty:
elif cls_or_inst == BooleanProperty or isinstance(cls_or_inst, BooleanProperty):
return "boolean_value"
elif cls == FloatProperty:
elif cls_or_inst == FloatProperty or isinstance(cls_or_inst, FloatProperty):
return "float_value"
elif cls == HexProperty:
elif cls_or_inst == HexProperty or isinstance(cls_or_inst, HexProperty):
return "hex_value"
elif cls == IntegerProperty:
elif cls_or_inst == IntegerProperty or isinstance(cls_or_inst, IntegerProperty):
return "integer_value"
elif cls == StringProperty or cls == ReferenceProperty:
elif (cls_or_inst == StringProperty or cls_or_inst == ReferenceProperty or
isinstance(cls_or_inst, StringProperty) or isinstance(cls_or_inst, ReferenceProperty)):
return "string_value"
elif cls == TimestampProperty:
elif cls_or_inst == TimestampProperty or isinstance(cls_or_inst, TimestampProperty):
return "timestamp_value"

0 comments on commit f294727

Please sign in to comment.