package main import ( "database/sql" "testing" ) func TestDBWriteQueueWritesRecordsAsync(t *testing.T) { st := openTestStore(t) defer st.Close() queue := newDBWriteQueue(st) record := textMessageTestRecord("queued") queue.EnqueueRecord(record, mqttClientInfo{ClientID: "client-1"}) record["text"] = "mutated after enqueue" queue.Close() var text, clientID string if err := rawTestDB(t, st).QueryRow("SELECT text, mqtt_client_id FROM text_message WHERE from_id = ?", "!12345678").Scan(&text, &clientID); err != nil { t.Fatal(err) } if text != "queued" || clientID != "client-1" { t.Fatalf("queued row = text %q client %q, want queued/client-1", text, clientID) } } func TestDBWriteQueueWritesDiscardAsync(t *testing.T) { st := openTestStore(t) defer st.Close() queue := newDBWriteQueue(st) record := map[string]any{"topic": "msh/test", "error": "bad packet"} queue.EnqueueDiscard(record, []byte{1, 2, 3}, mqttClientInfo{RemoteAddr: "127.0.0.1:1883"}) record["error"] = "mutated after enqueue" queue.Close() var topic, reason, rawBase64, remoteAddr string if err := rawTestDB(t, st).QueryRow("SELECT topic, error, raw_base64, mqtt_remote_addr FROM discard_details").Scan(&topic, &reason, &rawBase64, &remoteAddr); err != nil { t.Fatal(err) } if topic != "msh/test" || reason != "bad packet" || rawBase64 != "AQID" || remoteAddr != "127.0.0.1:1883" { t.Fatalf("discard row = %q/%q/%q/%q, want queued values", topic, reason, rawBase64, remoteAddr) } } func TestDBWriteQueueLen(t *testing.T) { queue := &dbWriteQueue{jobs: make(chan dbWriteJob, 1)} queue.enqueue(dbWriteJob{run: func() error { return nil }}) if queue.Len() != 1 { t.Fatalf("queue.Len() = %d, want 1", queue.Len()) } } func TestDBWriteQueueIgnoresUnsupportedRecordType(t *testing.T) { st := openTestStore(t) defer st.Close() queue := newDBWriteQueue(st) queue.EnqueueRecord(map[string]any{"type": "empty_packet", "from": "!12345678"}, mqttClientInfo{}) queue.Close() var count int if err := rawTestDB(t, st).QueryRow("SELECT COUNT(*) FROM text_message").Scan(&count); err != nil { t.Fatal(err) } if count != 0 { t.Fatalf("text_message count = %d, want 0", count) } } func TestDBWriteQueueNilStore(t *testing.T) { if queue := newDBWriteQueue(nil); queue != nil { t.Fatalf("newDBWriteQueue(nil) = %#v, want nil", queue) } var queue *dbWriteQueue queue.EnqueueRecord(textMessageTestRecord("ignored"), mqttClientInfo{}) queue.EnqueueDiscard(map[string]any{"topic": "ignored"}, []byte{1}, mqttClientInfo{}) queue.Close() } func TestDBWriteQueueRecordValidationErrorDoesNotStopWorker(t *testing.T) { st := openTestStore(t) defer st.Close() queue := newDBWriteQueue(st) badRecord := textMessageTestRecord("bad") delete(badRecord, "from") queue.EnqueueRecord(badRecord, mqttClientInfo{}) queue.EnqueueRecord(textMessageTestRecord("good"), mqttClientInfo{}) queue.Close() var text string if err := rawTestDB(t, st).QueryRow("SELECT text FROM text_message").Scan(&text); err != nil { t.Fatal(err) } if text != "good" { t.Fatalf("text = %q, want good", text) } var missing sql.NullString if err := rawTestDB(t, st).QueryRow("SELECT text FROM text_message WHERE text = ?", "bad").Scan(&missing); err != sql.ErrNoRows { t.Fatalf("bad row error = %v, want sql.ErrNoRows", err) } }