package poller_test import ( "context" "encoding/json" "testing" "time" "github.com/bsm/redislock" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" goredis "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/staack/the-other-dude/poller/internal/bus" "github.com/staack/the-other-dude/poller/internal/store" "github.com/staack/the-other-dude/poller/internal/testutil" ) // TestPollPublishConsumeCycle_Integration verifies the complete pipeline: // // 1. DeviceStore reads devices from real PostgreSQL // 2. Publisher sends status events through real NATS JetStream // 3. A NATS consumer receives the events with correct data // 4. Redis distributed lock can be obtained and released // // The actual PollDevice function requires a real RouterOS device, so we test // the integration seams individually and verify they compose correctly. func TestPollPublishConsumeCycle_Integration(t *testing.T) { if testing.Short() { t.Skip("skipping integration test in short mode") } ctx := context.Background() tenantID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" dummyCreds := []byte("dummy-encrypted-credentials") // --- Phase 1: PostgreSQL + DeviceStore --- connStr, pgCleanup := testutil.SetupPostgres(t) defer pgCleanup() v7 := "7.16" major7 := 7 deviceID := testutil.InsertTestDevice(t, connStr, store.Device{ TenantID: tenantID, IPAddress: "10.0.0.1", APIPort: 8728, APISSLPort: 8729, EncryptedCredentials: dummyCreds, RouterOSVersion: &v7, MajorVersion: &major7, }) ds, err := store.NewDeviceStore(ctx, connStr) require.NoError(t, err) defer ds.Close() devices, err := ds.FetchDevices(ctx) require.NoError(t, err) require.Len(t, devices, 1) assert.Equal(t, deviceID, devices[0].ID) assert.Equal(t, tenantID, devices[0].TenantID) // --- Phase 2: NATS + Publisher --- natsURL, natsCleanup := testutil.SetupNATS(t) defer natsCleanup() pub, err := bus.NewPublisher(natsURL) require.NoError(t, err) defer pub.Close() // Create a consumer to verify events. nc, err := nats.Connect(natsURL) require.NoError(t, err) defer nc.Close() js, err := jetstream.New(nc) require.NoError(t, err) cons, err := js.CreateOrUpdateConsumer(ctx, "DEVICE_EVENTS", jetstream.ConsumerConfig{ FilterSubject: "device.status.>", AckPolicy: jetstream.AckNonePolicy, }) require.NoError(t, err) // Simulate what PollDevice does after connecting to a device: // publish a status event with data from the fetched device. dev := devices[0] statusEvent := bus.DeviceStatusEvent{ DeviceID: dev.ID, TenantID: dev.TenantID, Status: "online", LastSeen: time.Now().UTC().Format(time.RFC3339), } err = pub.PublishStatus(ctx, statusEvent) require.NoError(t, err) // Verify consumer receives the event. msgBatch, err := cons.Fetch(1, jetstream.FetchMaxWait(5*time.Second)) require.NoError(t, err) var received *jetstream.Msg for msg := range msgBatch.Messages() { received = &msg break } require.NotNil(t, received, "consumer should receive the status event") var got bus.DeviceStatusEvent err = json.Unmarshal((*received).Data(), &got) require.NoError(t, err) assert.Equal(t, dev.ID, got.DeviceID) assert.Equal(t, dev.TenantID, got.TenantID) assert.Equal(t, "online", got.Status) // --- Phase 3: Redis distributed lock --- redisAddr, redisCleanup := testutil.SetupRedis(t) defer redisCleanup() rdb := goredis.NewClient(&goredis.Options{Addr: redisAddr}) defer rdb.Close() locker := redislock.New(rdb) lockKey := "poll:device:" + dev.ID lock, err := locker.Obtain(ctx, lockKey, 10*time.Second, nil) require.NoError(t, err, "should obtain Redis distributed lock") // A second attempt should fail (lock held). _, err = locker.Obtain(ctx, lockKey, 10*time.Second, nil) assert.ErrorIs(t, err, redislock.ErrNotObtained, "second lock attempt should fail") // Release and re-obtain. err = lock.Release(ctx) require.NoError(t, err, "should release lock") lock2, err := locker.Obtain(ctx, lockKey, 10*time.Second, nil) require.NoError(t, err, "should re-obtain lock after release") _ = lock2.Release(ctx) } // TestSchedulerReconcile_WithRealDB_Integration verifies that the Scheduler's // reconciliation loop correctly starts and stops device polling goroutines // when backed by a real PostgreSQL database. // // We test this by running the Scheduler for a brief period and verifying it // fetches devices and starts goroutines. Since PollDevice requires real // RouterOS hardware, the goroutines will fail on the poll cycle (no device to // connect to), but the scheduler's reconciliation logic is the integration // point we are testing here. func TestSchedulerReconcile_WithRealDB_Integration(t *testing.T) { if testing.Short() { t.Skip("skipping integration test in short mode") } ctx := context.Background() tenantID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" dummyCreds := []byte("dummy-encrypted-credentials") connStr, pgCleanup := testutil.SetupPostgres(t) defer pgCleanup() // Insert 2 devices. id1 := testutil.InsertTestDevice(t, connStr, store.Device{ TenantID: tenantID, IPAddress: "10.0.0.1", APIPort: 8728, APISSLPort: 8729, EncryptedCredentials: dummyCreds, }) id2 := testutil.InsertTestDevice(t, connStr, store.Device{ TenantID: tenantID, IPAddress: "10.0.0.2", APIPort: 8728, APISSLPort: 8729, EncryptedCredentials: dummyCreds, }) ds, err := store.NewDeviceStore(ctx, connStr) require.NoError(t, err) defer ds.Close() // Verify DeviceStore returns both devices (integration seam check). devices, err := ds.FetchDevices(ctx) require.NoError(t, err) require.Len(t, devices, 2) returnedIDs := make(map[string]bool) for _, d := range devices { returnedIDs[d.ID] = true } assert.True(t, returnedIDs[id1], "device 1 should be fetched from real DB") assert.True(t, returnedIDs[id2], "device 2 should be fetched from real DB") }