返回 Skill 列表
extension
分类: 开发与工程无需 API Key

messaging-testing

消息代理的集成测试模式:Redis Pub/Sub、NATS、Pulsar、SQS、ActiveMQ、Azure Service Bus和Google Pub/Sub。基于容器和模拟器的Java、Node.js和Python测试。使用时机:用户提到“消息测试”、“redis pubsub测试”、“nats测试”、“pulsar测试”、“sqs测试”、“localstack测试”、“activemq测试”、“azure service bus测试”、“google pubsub测试”、“消息队列测试”。不适用于:Kafka测试 - 请使用`messaging-testing-kafka`;RabbitMQ测试 - 请使用`messaging-testing-rabbitmq`;通用testcontainers - 请使用`testcontainers`技能

person作者: jakexiaohubgithub

Messaging Integration Testing — Multi-Broker Reference

Dedicated skills: For Kafka testing see messaging-testing-kafka. For RabbitMQ testing see messaging-testing-rabbitmq.

Redis Pub/Sub

Java: Testcontainers GenericContainer

@SpringBootTest
@Testcontainers
class RedisPubSubTest {

    @Container
    @ServiceConnection(name = "redis")
    static GenericContainer<?> redis =
        new GenericContainer<>("redis:7-alpine").withExposedPorts(6379);

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Test
    void shouldPublishAndReceive() {
        List<String> received = new CopyOnWriteArrayList<>();
        CountDownLatch latch = new CountDownLatch(1);

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisTemplate.getConnectionFactory());
        container.addMessageListener((message, pattern) -> {
            received.add(new String(message.getBody()));
            latch.countDown();
        }, new ChannelTopic("orders"));
        container.afterPropertiesSet();
        container.start();

        redisTemplate.convertAndSend("orders", "{\"orderId\":\"123\"}");

        assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
        assertThat(received).hasSize(1);
        assertThat(received.get(0)).contains("123");

        container.stop();
    }
}

Node.js: ioredis + Testcontainers

import { GenericContainer } from "testcontainers";
import Redis from "ioredis";

let container, publisher, subscriber;

beforeAll(async () => {
  container = await new GenericContainer("redis:7-alpine")
    .withExposedPorts(6379).start();
  const url = `redis://${container.getHost()}:${container.getMappedPort(6379)}`;
  publisher = new Redis(url);
  subscriber = new Redis(url);
}, 30_000);

afterAll(async () => {
  await publisher.quit();
  await subscriber.quit();
  await container.stop();
});

it("should pub/sub", async () => {
  const messages: string[] = [];
  await subscriber.subscribe("orders");
  subscriber.on("message", (ch, msg) => messages.push(msg));

  await publisher.publish("orders", JSON.stringify({ orderId: "123" }));
  await new Promise((r) => setTimeout(r, 500));

  expect(messages).toHaveLength(1);
  expect(JSON.parse(messages[0]).orderId).toBe("123");
});

NATS

Java: Testcontainers

@SpringBootTest
@Testcontainers
class NatsTest {

    @Container
    static GenericContainer<?> nats =
        new GenericContainer<>("nats:2.10-alpine").withExposedPorts(4222);

    @DynamicPropertySource
    static void natsProperties(DynamicPropertyRegistry registry) {
        registry.add("nats.url", () ->
            "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222));
    }

    @Test
    void shouldPublishAndSubscribe() throws Exception {
        Connection nc = Nats.connect(
            "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222));

        CompletableFuture<Message> future = new CompletableFuture<>();
        Dispatcher dispatcher = nc.createDispatcher(future::complete);
        dispatcher.subscribe("orders");

        nc.publish("orders", "{\"orderId\":\"123\"}".getBytes());

        Message msg = future.get(5, TimeUnit.SECONDS);
        assertThat(new String(msg.getData())).contains("123");
        nc.close();
    }
}

Node.js: nats + Testcontainers

import { GenericContainer } from "testcontainers";
import { connect, StringCodec } from "nats";

it("should pub/sub via NATS", async () => {
  const container = await new GenericContainer("nats:2.10-alpine")
    .withExposedPorts(4222).start();
  const nc = await connect({
    servers: `nats://${container.getHost()}:${container.getMappedPort(4222)}`,
  });
  const sc = StringCodec();

  const messages: string[] = [];
  const sub = nc.subscribe("orders");
  (async () => { for await (const msg of sub) messages.push(sc.decode(msg.data)); })();

  nc.publish("orders", sc.encode(JSON.stringify({ orderId: "123" })));
  await nc.flush();
  await new Promise((r) => setTimeout(r, 500));

  expect(messages).toHaveLength(1);
  await nc.close();
  await container.stop();
});

Apache Pulsar

Java: PulsarContainer + @ServiceConnection (Spring Boot 3.2+)

@SpringBootTest
@Testcontainers
class PulsarTest {

    @Container
    @ServiceConnection
    static PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.2.0");

    @Autowired
    private PulsarTemplate<String> pulsarTemplate;

    @Test
    void shouldProduceAndConsume() throws Exception {
        pulsarTemplate.send("orders", "order-123");

        // Consumer verifies via listener or direct consumer API
        await().atMost(Duration.ofSeconds(10))
            .untilAsserted(() -> {
                // Assert side effect of listener processing
            });
    }
}

Dependencies

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>pulsar</artifactId>
    <scope>test</scope>
</dependency>

Amazon SQS (LocalStack)

Java: LocalStack + @DynamicPropertySource

@SpringBootTest
@Testcontainers
class SqsTest {

    @Container
    static LocalStackContainer localstack = new LocalStackContainer(
        DockerImageName.parse("localstack/localstack:3.4"))
        .withServices(Service.SQS);

    @DynamicPropertySource
    static void sqsProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.cloud.aws.sqs.endpoint",
            () -> localstack.getEndpointOverride(Service.SQS).toString());
        registry.add("spring.cloud.aws.region.static", () -> localstack.getRegion());
        registry.add("spring.cloud.aws.credentials.access-key", localstack::getAccessKey);
        registry.add("spring.cloud.aws.credentials.secret-key", localstack::getSecretKey);
    }

    @BeforeAll
    static void createQueue() throws Exception {
        localstack.execInContainer("awslocal", "sqs", "create-queue",
            "--queue-name", "orders-queue");
    }

    @Autowired
    private SqsTemplate sqsTemplate;

    @Test
    void shouldSendAndReceive() {
        sqsTemplate.send("orders-queue", new OrderEvent("123", "CREATED"));

        await().atMost(Duration.ofSeconds(10))
            .untilAsserted(() -> {
                // Assert consumer processed the message
            });
    }
}

Node.js: LocalStack + @aws-sdk

import { LocalstackContainer } from "@testcontainers/localstack";
import { SQSClient, CreateQueueCommand, SendMessageCommand, ReceiveMessageCommand } from "@aws-sdk/client-sqs";

it("should send and receive SQS message", async () => {
  const container = await new LocalstackContainer("localstack/localstack:3.4").start();
  const client = new SQSClient({
    endpoint: container.getConnectionUri(),
    region: "us-east-1",
    credentials: { accessKeyId: "test", secretAccessKey: "test" },
  });

  const { QueueUrl } = await client.send(
    new CreateQueueCommand({ QueueName: "test-queue" }));

  await client.send(new SendMessageCommand({
    QueueUrl, MessageBody: JSON.stringify({ orderId: "123" }),
  }));

  const { Messages } = await client.send(
    new ReceiveMessageCommand({ QueueUrl, WaitTimeSeconds: 5 }));

  expect(Messages).toHaveLength(1);
  expect(JSON.parse(Messages![0].Body!).orderId).toBe("123");
  await container.stop();
});

Python: moto (Mock) or LocalStack

import boto3
from moto import mock_aws

@mock_aws
def test_sqs_send_receive():
    sqs = boto3.client("sqs", region_name="us-east-1")
    queue = sqs.create_queue(QueueName="test-queue")
    queue_url = queue["QueueUrl"]

    sqs.send_message(QueueUrl=queue_url, MessageBody='{"orderId": "123"}')

    response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
    assert len(response["Messages"]) == 1
    assert "123" in response["Messages"][0]["Body"]

ActiveMQ (Artemis)

Java: EmbeddedActiveMQ (In-Process)

@SpringBootTest
class ActiveMQTest {

    private static EmbeddedActiveMQ embeddedActiveMQ;

    @BeforeAll
    static void startBroker() throws Exception {
        Configuration config = new ConfigurationImpl()
            .setPersistenceEnabled(false)
            .setSecurityEnabled(false)
            .addAcceptorConfiguration("invm", "vm://0");
        embeddedActiveMQ = new EmbeddedActiveMQ().setConfiguration(config);
        embeddedActiveMQ.start();
    }

    @AfterAll
    static void stopBroker() throws Exception {
        embeddedActiveMQ.stop();
    }

    @Test
    void shouldSendAndReceive() {
        // Use JMS or Spring JmsTemplate to send/receive
    }
}

Java: Testcontainers

@Container
static GenericContainer<?> artemis =
    new GenericContainer<>("apache/activemq-artemis:2.33.0")
        .withExposedPorts(61616, 8161)
        .waitingFor(Wait.forLogMessage(".*AMQ241004.*", 1));

@DynamicPropertySource
static void jmsProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.artemis.broker-url", () ->
        "tcp://" + artemis.getHost() + ":" + artemis.getMappedPort(61616));
    registry.add("spring.artemis.user", () -> "artemis");
    registry.add("spring.artemis.password", () -> "artemis");
}

Azure Service Bus

Emulator Container

@Container
static GenericContainer<?> servicebus =
    new GenericContainer<>("mcr.microsoft.com/azure-messaging/servicebus-emulator:latest")
        .withExposedPorts(5672)
        .withEnv("ACCEPT_EULA", "Y")
        .withEnv("MSSQL_SA_PASSWORD", "StrongPassword1!")
        .waitingFor(Wait.forListeningPort());

@DynamicPropertySource
static void sbProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.azure.servicebus.connection-string", () ->
        "Endpoint=sb://" + servicebus.getHost() + ":" +
        servicebus.getMappedPort(5672) + ";SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test");
}

Node.js: Emulator + @azure/service-bus

import { ServiceBusClient } from "@azure/service-bus";
import { GenericContainer } from "testcontainers";

// Start emulator container, then:
const client = new ServiceBusClient(connectionString);
const sender = client.createSender("test-queue");
await sender.sendMessages({ body: { orderId: "123" } });

const receiver = client.createReceiver("test-queue");
const [message] = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 });
expect(message.body.orderId).toBe("123");
await receiver.completeMessage(message);

Google Pub/Sub

Emulator (gcloud)

@Container
static GenericContainer<?> pubsub =
    new GenericContainer<>("gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators")
        .withExposedPorts(8085)
        .withCommand("gcloud", "beta", "emulators", "pubsub", "start",
            "--host-port=0.0.0.0:8085", "--project=test-project")
        .waitingFor(Wait.forLogMessage(".*Server started.*", 1));

@DynamicPropertySource
static void pubsubProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.gcp.pubsub.emulator-host", () ->
        pubsub.getHost() + ":" + pubsub.getMappedPort(8085));
    registry.add("spring.cloud.gcp.project-id", () -> "test-project");
}

Node.js: Emulator + @google-cloud/pubsub

import { PubSub } from "@google-cloud/pubsub";
import { GenericContainer } from "testcontainers";

const container = await new GenericContainer(
  "gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators")
  .withExposedPorts(8085)
  .withCommand("gcloud", "beta", "emulators", "pubsub", "start",
    "--host-port=0.0.0.0:8085", "--project=test-project")
  .start();

process.env.PUBSUB_EMULATOR_HOST =
  `${container.getHost()}:${container.getMappedPort(8085)}`;

const pubsub = new PubSub({ projectId: "test-project" });
const [topic] = await pubsub.createTopic("test-topic");
const [subscription] = await topic.createSubscription("test-sub");

await topic.publishMessage({ data: Buffer.from(JSON.stringify({ orderId: "123" })) });

const [messages] = await subscription.pull({ maxMessages: 1 });
expect(messages).toHaveLength(1);
expect(JSON.parse(messages[0].message.data.toString()).orderId).toBe("123");

Python: Emulator + google-cloud-pubsub

import os
from google.cloud import pubsub_v1

def test_pubsub(pubsub_container):
    os.environ["PUBSUB_EMULATOR_HOST"] = (
        f"{pubsub_container.get_container_host_ip()}:"
        f"{pubsub_container.get_exposed_port(8085)}"
    )
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path("test-project", "test-topic")
    publisher.create_topic(request={"name": topic_path})

    future = publisher.publish(topic_path, b'{"orderId": "123"}')
    future.result(timeout=5)

    subscriber = pubsub_v1.SubscriberClient()
    sub_path = subscriber.subscription_path("test-project", "test-sub")
    subscriber.create_subscription(request={"name": sub_path, "topic": topic_path})

    response = subscriber.pull(request={"subscription": sub_path, "max_messages": 1})
    assert len(response.received_messages) == 1

Cross-Broker Best Practices

| Do | Don't | |----|-------| | Use static containers shared across tests | Create container per test method | | Use @ServiceConnection when available | Hardcode connection properties | | Use await() or latches for async assertions | Use Thread.sleep() | | Clean up resources in @AfterAll | Leave connections/containers open | | Use emulators for cloud services in CI | Connect to real cloud services in tests | | Pin specific image versions | Use latest tag |

Reference Documentation

Cross-reference: For Kafka testing see messaging-testing-kafka. For RabbitMQ testing see messaging-testing-rabbitmq. For generic Testcontainers patterns see testcontainers skill.