Skip to content

Commit

Permalink
Merge pull request #112 from spq/correct-stats
Browse files Browse the repository at this point in the history
improve the statistics:
  • Loading branch information
peace-maker authored Aug 6, 2024
2 parents 81e95be + 337ec88 commit 78a2031
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 35 deletions.
9 changes: 9 additions & 0 deletions internal/index/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type (
Builder struct {
snapshots []*snapshot
knownPcaps []*pcapmetadata.PcapInfo
packetCount uint
indexDir string
snapshotDir string
snapshotFilename string
Expand Down Expand Up @@ -61,6 +62,7 @@ func New(pcapDir, indexDir, snapshotDir string, cachedKnownPcaps []*pcapmetadata
}
}
b.knownPcaps = append(b.knownPcaps, info)
b.packetCount += info.PacketCount
}
// load the snapshot file with the most packets covered
snapshotFiles, err := os.ReadDir(snapshotDir)
Expand Down Expand Up @@ -541,6 +543,9 @@ outer:
}

b.knownPcaps = append(b.knownPcaps, newPcapInfos...)
for _, pi := range newPcapInfos {
b.packetCount += pi.PacketCount
}
b.snapshots = newSnapshots

outputFiles := []string{}
Expand All @@ -551,6 +556,10 @@ outer:
return nProcessedPcaps, indexes, nil
}

func (b *Builder) PacketCount() uint {
return b.packetCount
}

func (b *Builder) KnownPcaps() []*pcapmetadata.PcapInfo {
return b.knownPcaps
}
56 changes: 38 additions & 18 deletions internal/index/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ const (

type (
PcapStatistics struct {
PcapCount int
ImportJobCount int
StreamCount int
PacketCount int
PcapCount int
PacketCount int
ImportJobCount int
IndexCount int
StreamCount int
StreamRecordCount int
PacketRecordCount int
}

Event struct {
Expand Down Expand Up @@ -108,7 +111,8 @@ type (

builder *builder.Builder
indexes []*index.Reader
nStreams, nPackets int
nStreamRecords int
nPacketRecords int
nextStreamID uint64
nUnmergeableIndexes int
stateFilename string
Expand Down Expand Up @@ -140,6 +144,8 @@ type (
PcapCount int
StreamCount int
PacketCount int
StreamRecordCount int
PacketRecordCount int
MergeJobRunning bool
TaggingJobRunning bool
ConverterJobRunning bool
Expand Down Expand Up @@ -249,8 +255,8 @@ func New(pcapDir, indexDir, snapshotDir, stateDir, converterDir string) (*Manage
continue
}
mgr.indexes = append(mgr.indexes, idx)
mgr.nStreams += idx.StreamCount()
mgr.nPackets += idx.PacketCount()
mgr.nStreamRecords += idx.StreamCount()
mgr.nPacketRecords += idx.PacketCount()
if next := idx.MaxStreamID() + 1; mgr.nextStreamID < next {
mgr.nextStreamID = next
}
Expand Down Expand Up @@ -596,8 +602,8 @@ func (mgr *Manager) importPcapJob(filenames []string, nextStreamID uint64, exist
// add new indexes if some were created
if len(createdIndexes) > 0 {
mgr.indexes = append(mgr.indexes, createdIndexes...)
mgr.nStreams += newStreamCount
mgr.nPackets += newPacketCount
mgr.nStreamRecords += newStreamCount
mgr.nPacketRecords += newPacketCount
mgr.nextStreamID = newNextStreamID
mgr.lock(createdIndexes)
mgr.addedStreamsDuringTaggingJob.Or(addedStreams)
Expand All @@ -623,10 +629,13 @@ func (mgr *Manager) importPcapJob(filenames []string, nextStreamID uint64, exist
mgr.event(Event{
Type: "pcapProcessed",
PcapStats: PcapStatistics{
PcapCount: len(mgr.builder.KnownPcaps()),
ImportJobCount: len(mgr.importJobs),
StreamCount: mgr.nStreams,
PacketCount: mgr.nPackets,
PcapCount: len(mgr.builder.KnownPcaps()),
ImportJobCount: len(mgr.importJobs),
StreamCount: int(mgr.nextStreamID),
PacketCount: int(mgr.builder.PacketCount()),
IndexCount: len(mgr.indexes),
StreamRecordCount: mgr.nStreamRecords,
PacketRecordCount: mgr.nPacketRecords,
},
})
mgr.triggerPcapProcessedWebhooks(filenames[:processedFiles])
Expand All @@ -643,7 +652,7 @@ func (mgr *Manager) startMergeJobIfNeeded() {
return
}
}
nStreams := mgr.nStreams
nStreams := mgr.nStreamRecords
for i, idx := range mgr.indexes {
c := idx.StreamCount()
nStreams -= c
Expand Down Expand Up @@ -715,14 +724,23 @@ func (mgr *Manager) mergeIndexesJob(offset int, indexes []*index.Reader, release
mgr.lock(mergedIndexes)
mgr.indexes = append(mgr.indexes[:offset], append(mergedIndexes, mgr.indexes[offset+len(indexes):]...)...)
mgr.nUnmergeableIndexes += len(mergedIndexes) - 1
mgr.nStreams += streamsDiff
mgr.nPackets += packetsDiff
mgr.nStreamRecords += streamsDiff
mgr.nPacketRecords += packetsDiff
}
mgr.mergeJobRunning = false
mgr.startMergeJobIfNeeded()
releaser.release(mgr)
mgr.event(Event{
Type: "indexesMerged",
PcapStats: PcapStatistics{
PcapCount: len(mgr.builder.KnownPcaps()),
ImportJobCount: len(mgr.importJobs),
StreamCount: int(mgr.nextStreamID),
PacketCount: int(mgr.builder.PacketCount()),
IndexCount: len(mgr.indexes),
StreamRecordCount: mgr.nStreamRecords,
PacketRecordCount: mgr.nPacketRecords,
},
})
}
}
Expand Down Expand Up @@ -813,8 +831,10 @@ func (mgr *Manager) Status() Statistics {
IndexLockCount: locks,
PcapCount: len(mgr.builder.KnownPcaps()),
ImportJobCount: len(mgr.importJobs),
StreamCount: mgr.nStreams,
PacketCount: mgr.nPackets,
StreamRecordCount: mgr.nStreamRecords,
PacketRecordCount: mgr.nPacketRecords,
StreamCount: int(mgr.nextStreamID),
PacketCount: int(mgr.builder.PacketCount()),
MergeJobRunning: mgr.mergeJobRunning,
TaggingJobRunning: mgr.taggingJobRunning,
ConverterJobRunning: mgr.converterJobRunning,
Expand Down
2 changes: 2 additions & 0 deletions web/src/apiClient.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ export function isStatistics(obj: unknown): obj is Statistics {
typeof typedObj["PcapCount"] === "number" &&
typeof typedObj["ImportJobCount"] === "number" &&
typeof typedObj["StreamCount"] === "number" &&
typeof typedObj["StreamRecordCount"] === "number" &&
typeof typedObj["PacketCount"] === "number" &&
typeof typedObj["PacketRecordCount"] === "number" &&
typeof typedObj["MergeJobRunning"] === "boolean" &&
typeof typedObj["TaggingJobRunning"] === "boolean" &&
typeof typedObj["ConverterJobRunning"] === "boolean"
Expand Down
2 changes: 2 additions & 0 deletions web/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ export type Statistics = {
PcapCount: number;
ImportJobCount: number;
StreamCount: number;
StreamRecordCount: number;
PacketCount: number;
PacketRecordCount: number;
MergeJobRunning: boolean;
TaggingJobRunning: boolean;
ConverterJobRunning: boolean;
Expand Down
5 changes: 2 additions & 3 deletions web/src/components/Status.vue
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<v-card-title>Status</v-card-title>
<v-simple-table>
<tbody>
<tr v-for="(value, name) in status" :key="name">
<tr v-for="(value, name) in store.status || []" :key="name">
<th>{{ name }}</th>
<td width="100%">{{ value }}</td>
</tr>
Expand All @@ -26,12 +26,11 @@

<script lang="ts" setup>
import ToolBar from "./ToolBar.vue";
import { computed, onMounted } from "vue";
import { onMounted } from "vue";
import { useRootStore } from "@/stores";
import { EventBus } from "./EventBus";
const store = useRootStore();
const status = computed(() => store.status);
onMounted(() => {
updateStatus();
Expand Down
14 changes: 9 additions & 5 deletions web/src/stores/websocket.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Generated type guards for "websocket.ts".
* WARNING: Do not manually change this file.
*/
import { Event, TagEvent, ConverterEvent, PcapProcessedEvent } from "./websocket";
import { Event, TagEvent, ConverterEvent, PcapStatsEvent } from "./websocket";

export function isEvent(obj: unknown): obj is Event {
const typedObj = obj as Event
Expand Down Expand Up @@ -68,19 +68,23 @@ export function isConverterEvent(obj: unknown): obj is ConverterEvent {
)
}

export function isPcapProcessedEvent(obj: unknown): obj is PcapProcessedEvent {
const typedObj = obj as PcapProcessedEvent
export function isPcapStatsEvent(obj: unknown): obj is PcapStatsEvent {
const typedObj = obj as PcapStatsEvent
return (
(typedObj !== null &&
typeof typedObj === "object" ||
typeof typedObj === "function") &&
typedObj["Type"] === "pcapProcessed" &&
(typedObj["Type"] === "indexesMerged" ||
typedObj["Type"] === "pcapProcessed") &&
(typedObj["PcapStats"] !== null &&
typeof typedObj["PcapStats"] === "object" ||
typeof typedObj["PcapStats"] === "function") &&
typeof typedObj["PcapStats"]["PcapCount"] === "number" &&
typeof typedObj["PcapStats"]["PacketCount"] === "number" &&
typeof typedObj["PcapStats"]["ImportJobCount"] === "number" &&
typeof typedObj["PcapStats"]["IndexCount"] === "number" &&
typeof typedObj["PcapStats"]["StreamCount"] === "number" &&
typeof typedObj["PcapStats"]["PacketCount"] === "number"
typeof typedObj["PcapStats"]["StreamRecordCount"] === "number" &&
typeof typedObj["PcapStats"]["PacketRecordCount"] === "number"
)
}
25 changes: 16 additions & 9 deletions web/src/stores/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { useStreamsStore } from "./streams";
import {
isConverterEvent,
isEvent,
isPcapProcessedEvent,
isPcapStatsEvent,
isTagEvent,
} from "./websocket.guard";

Expand Down Expand Up @@ -45,14 +45,17 @@ export type ConverterEvent = {

export type PcapStats = {
PcapCount: number;
PacketCount: number;
ImportJobCount: number;
IndexCount: number;
StreamCount: number;
PacketCount: number;
StreamRecordCount: number;
PacketRecordCount: number;
};

/** @see {isPcapProcessedEvent} ts-auto-guard:type-guard */
export type PcapProcessedEvent = {
Type: "pcapProcessed";
/** @see {isPcapStatsEvent} ts-auto-guard:type-guard */
export type PcapStatsEvent = {
Type: "pcapProcessed" | "indexesMerged";
PcapStats: PcapStats;
};

Expand Down Expand Up @@ -174,16 +177,20 @@ export function setupWebsocket() {
}
break;
case "pcapProcessed":
if (!isPcapProcessedEvent(e)) {
console.error("Invalid pcap processed event:", e);
case "indexesMerged":
if (!isPcapStatsEvent(e)) {
console.error("Invalid pcap stats event:", e);
return;
}
streamsStore.outdated = true;
if (e.Type == "pcapProcessed") streamsStore.outdated = true;
if (store.status != null) {
store.status.PcapCount = e.PcapStats.PcapCount;
store.status.PacketCount = e.PcapStats.PacketCount;
store.status.ImportJobCount = e.PcapStats.ImportJobCount;
store.status.IndexCount = e.PcapStats.IndexCount;
store.status.StreamCount = e.PcapStats.StreamCount;
store.status.PacketCount = e.PcapStats.PacketCount;
store.status.StreamRecordCount = e.PcapStats.StreamRecordCount;
store.status.PacketRecordCount = e.PcapStats.PacketRecordCount;
}
break;
default:
Expand Down

0 comments on commit 78a2031

Please sign in to comment.