Skip to content

Commit f123348

Browse files
authored
sort by first values #439; add kb_type #442 (#448)
1 parent 6c53629 commit f123348

6 files changed

Lines changed: 208 additions & 139 deletions

File tree

obstracts/server/models.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ class ObjectValue(models.Model):
377377
"""
378378
stix_id = models.CharField(max_length=256, db_index=True)
379379
type = models.CharField(max_length=256, db_index=True)
380-
ttp_type = models.CharField(max_length=64, null=True, blank=True, db_index=True)
380+
knowledgebase = models.CharField(max_length=64, null=True, blank=True, db_index=True)
381381
values = models.JSONField()
382382
file = models.ForeignKey(File, on_delete=models.CASCADE, related_name='object_values')
383383
created = models.DateTimeField(default=None, null=True)
@@ -390,7 +390,7 @@ class Meta:
390390
unique_together = [['stix_id', 'file']]
391391

392392
def __str__(self):
393-
return f'ObjectValue(stix_id={self.stix_id}, ttp_type={self.ttp_type})'
393+
return f'ObjectValue(stix_id={self.stix_id}, knowledgebase={self.knowledgebase})'
394394

395395

396396
class Job(models.Model):

obstracts/server/values/filters.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11

22
from django.db.models import JSONField, Lookup
3+
from django.db.models import Func, CharField
4+
35

46

57
@JSONField.register_lookup
@@ -68,4 +70,13 @@ def as_sql(self, compiler, connection):
6870

6971
# Use jsonb_each_text to iterate over key-value pairs and check if any value matches exactly
7072
sql = f"EXISTS (SELECT 1 FROM jsonb_each_text({lhs}) WHERE value = %s)"
71-
return sql, lhs_params + rhs_params
73+
return sql, lhs_params + rhs_params
74+
75+
class NormalizeDict(Func):
76+
"""
77+
Returns the value for the first key (alphabetically) from a JSONB object.
78+
"""
79+
80+
template = "(SELECT kv.value FROM jsonb_each_text(%(expressions)s) AS kv ORDER BY kv.key LIMIT 1)"
81+
output_field = CharField()
82+

obstracts/server/values/serializers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class ObjectValueSerializer(serializers.Serializer):
66

77
id = serializers.CharField(source='stix_id')
88
type = serializers.CharField()
9-
ttp_type = serializers.CharField(required=False)
9+
kb_name = serializers.CharField(read_only=True, required=False, source='knowledgebase')
1010
values = serializers.JSONField(read_only=True)
1111
matched_posts = serializers.ListField(child=serializers.UUIDField())
1212
created = serializers.DateTimeField(required=False)

obstracts/server/values/values.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,27 @@ def external_id(obj):
2020
def hashes(obj):
2121
return obj.get("hashes", {})
2222

23+
KB_TYPES = {
24+
"Tactic": [dict(type='x-mitre-tactic')],
25+
"Analytic": [dict(type='x-mitre-analytic')],
26+
"Detection Strategy": [dict(type='x-mitre-detection-strategy')],
27+
"Technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=False), dict(type='attack-pattern', x_mitre_is_subtechnique=None)],
28+
"Sub-technique": [dict(type='attack-pattern', x_mitre_is_subtechnique=True)],
29+
"Mitigation": [dict(type='course-of-action')],
30+
"Group": [dict(type='intrusion-set')],
31+
"Software": [dict(type='malware'), dict(type='tool')],
32+
"Campaign": [dict(type='campaign')],
33+
"Data Source": [dict(type='x-mitre-data-source')],
34+
"Data Component": [dict(type='x-mitre-data-component')],
35+
"Asset": [dict(type='x-mitre-asset')],
36+
}
37+
38+
def get_kb_type(obj):
39+
for form, criteria_list in KB_TYPES.items():
40+
for criteria in criteria_list:
41+
if all(obj.get(k) == v for k, v in criteria.items()):
42+
return form
43+
return None
2344

2445
def get_file_values(obj):
2546
values = {}
@@ -133,36 +154,36 @@ def get_values(obj: dict, value_keys: list[str] | dict[str, str] | Callable):
133154
}
134155

135156

136-
def get_ttp_type(obj: dict) -> str | None:
157+
def guess_kb_data(obj: dict) -> str | None:
137158
"""
138-
Determine the TTP type of a STIX object based on its properties.
159+
Determine the KnowledgeBase type of a STIX object based on its properties.
139160
140161
Returns:
141162
- "cve" for vulnerability objects
142163
- "cwe" for weakness objects
143164
- "location" for location objects
144165
- "enterprise-attack", "mobile-attack", "ics-attack" based on x_mitre_domains
145166
- "capec", "atlas", "disarm", "sector" based on external_references source_name
146-
- None if not a TTP object
167+
- None if not a KnowledgeBase object
147168
"""
148169
obj_type = obj["type"]
149-
ttp_type = None
170+
kb_name = None
150171
extra = {}
151172

152173
# Check for CVE (vulnerability)
153174
match obj_type:
154175
case "vulnerability":
155-
ttp_type = "cve"
176+
kb_name = "cve"
156177
case "weakness":
157-
ttp_type = "cwe"
178+
kb_name = "cwe"
158179
case "location":
159-
ttp_type = "location"
180+
kb_name = "location"
160181
# Check for MITRE ATT&CK domains
161182
x_mitre_domains = obj.get("x_mitre_domains", [])
162183
if x_mitre_domains:
163184
domain = x_mitre_domains[0]
164185
if domain in ["enterprise-attack", "mobile-attack", "ics-attack"]:
165-
ttp_type = domain
186+
kb_name = domain
166187

167188
# Check external references for other TTP types
168189
external_refs = obj.get("external_references", [])
@@ -176,10 +197,12 @@ def get_ttp_type(obj: dict) -> str | None:
176197
"sector2stix": "sector",
177198
}
178199
if source_name in ttp_source_name_mapping:
179-
ttp_type = ttp_source_name_mapping[source_name]
180-
if ttp_type and (ttp_ids := external_id(obj)):
181-
extra["ttp_id"] = ttp_ids[0]
182-
return ttp_type, extra
200+
kb_name = ttp_source_name_mapping[source_name]
201+
if kb_name and (kb_ids := external_id(obj)):
202+
extra["kb_id"] = kb_ids[0]
203+
if kb_name and (kb_type := get_kb_type(obj)):
204+
extra["kb_type"] = kb_type
205+
return kb_name, extra
183206

184207

185208
def get_visibility(obj: dict) -> str:
@@ -209,12 +232,12 @@ def extract_object_metadata(obj: dict) -> dict:
209232
A dictionary containing:
210233
- id: The STIX object ID
211234
- type: The STIX object type
212-
- ttp_type: The TTP type if applicable (None otherwise)
235+
- knowledgebase: The source KnowledgeBase type if applicable (None otherwise)
213236
- values: The extracted values based on the object type
214237
"""
215238
obj_id = obj["id"]
216239
obj_type = obj["type"]
217-
ttp_type, ttp_extra = get_ttp_type(obj)
240+
kb_name, kb_extra = guess_kb_data(obj)
218241

219242
# Get the value configuration for this object type
220243
type_config = type_value_map.get(obj_type, {})
@@ -223,11 +246,11 @@ def extract_object_metadata(obj: dict) -> dict:
223246
# Extract values using get_values function
224247
values = get_values(obj, value_keys) or {}
225248

226-
values.update(ttp_extra)
249+
values.update(kb_extra)
227250
return {
228251
"stix_id": obj_id,
229252
"type": obj_type,
230-
"ttp_type": ttp_type,
253+
"knowledgebase": kb_name,
231254
"values": values,
232255
"modified": obj.get("modified"),
233256
"created": obj.get("created"),

obstracts/server/values/views.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,26 @@
11
import textwrap
2-
from rest_framework import viewsets, filters, mixins
3-
from rest_framework.response import Response
2+
from rest_framework import viewsets, mixins
43
from django_filters.rest_framework import (
54
DjangoFilterBackend,
65
FilterSet,
76
CharFilter,
8-
MultipleChoiceFilter,
97
BooleanFilter,
108
BaseCSVFilter,
119
)
1210
from django_filters.fields import ChoiceField
1311
from obstracts.server import autoschema as api_schema
1412

15-
from drf_spectacular.utils import extend_schema, extend_schema_view, OpenApiParameter
16-
from drf_spectacular.types import OpenApiTypes
17-
from django.db.models import F, Value, JSONField as DjangoJSONField, Min, Max
18-
from django.db.models.functions import JSONObject
13+
from drf_spectacular.utils import extend_schema, extend_schema_view
14+
from django.db.models import F, Min, Max
1915
from django.contrib.postgres.aggregates import ArrayAgg
2016

2117

2218
from obstracts.server.models import ObjectValue
2319
from obstracts.server.utils import Pagination
24-
from obstracts.server.values.values import sco_value_map, sdo_value_map
20+
from obstracts.server.values.values import sco_value_map, sdo_value_map, KB_TYPES
2521
from .serializers import ObjectValueSerializer
2622
from dogesec_commons.utils.ordering import Ordering
23+
from .filters import NormalizeDict
2724

2825
TTP_TYPES = [
2926
"cve",
@@ -111,7 +108,7 @@ class BaseObjectValueView(mixins.ListModelMixin, viewsets.GenericViewSet):
111108
pagination_class = Pagination("values")
112109
filter_backends = [DjangoFilterBackend, Ordering]
113110
filterset_class = ObjectValueFilterSet
114-
ordering_fields = ["stix_id", "type", "ttp_type"]
111+
ordering_fields = ["stix_id", "type", "knowledgebase", "value"]
115112
ordering = "stix_id_descending"
116113
openapi_tags = ["Object Values"]
117114

@@ -128,11 +125,12 @@ def get_queryset(self):
128125
# Aggregate all post_ids for each unique stix_id
129126
queryset = queryset.values("stix_id").annotate(
130127
type=F("type"),
131-
ttp_type=F("ttp_type"),
128+
knowledgebase=F("knowledgebase"),
132129
values=F("values"),
133130
matched_posts=ArrayAgg("file__post_id", distinct=True),
134131
created=Min("created"),
135132
modified=Max("modified"),
133+
value=NormalizeDict(F("values")),
136134
)
137135

138136
return queryset
@@ -175,8 +173,8 @@ class SCOValueView(BaseObjectValueView):
175173
"""View for STIX Cyber Observable Objects (SCOs) only."""
176174

177175
allowed_types = list(sco_value_map.keys())
178-
ordering_fields = ["values", "stix_id", "type"]
179-
ordering = "values_ascending"
176+
ordering_fields = ["value", "stix_id", "type"]
177+
ordering = "value_ascending"
180178

181179
class filterset_class(ObjectValueFilterSet):
182180
types = ChoiceCSVFilter(
@@ -225,11 +223,11 @@ class SDOValueView(BaseObjectValueView):
225223
"""View for STIX Domain Objects (SDOs) only."""
226224

227225
allowed_types = list(sdo_value_map.keys())
228-
ordering_fields = ["stix_id", "type", "ttp_type", "values", "created", "modified"]
226+
ordering_fields = ["stix_id", "type", "knowledgebase", "value", "created", "modified"]
229227

230228
class filterset_class(ObjectValueFilterSet):
231-
ttp_types = ChoiceCSVFilter(
232-
field_name="ttp_type",
229+
knowledgebases = ChoiceCSVFilter(
230+
field_name="knowledgebase",
233231
help_text="Filter results by source of TTP object (cve, cwe, enterprise-attack, mobile-attack, ics-attack, capec, location, disarm, atlas, sector)",
234232
choices=[(c, c) for c in TTP_TYPES],
235233
)
@@ -238,3 +236,9 @@ class filterset_class(ObjectValueFilterSet):
238236
help_text="Filter the results by one or more STIX Domain Object types",
239237
choices=[(c, c) for c in sdo_value_map.keys()],
240238
)
239+
240+
kb_type = ChoiceCSVFilter(
241+
field_name="values__kb_type",
242+
help_text="Filter results by knowledge base type.",
243+
choices=[(c, c) for c in KB_TYPES.keys()],
244+
)

0 commit comments

Comments
 (0)