返回 Skill 列表
extension
分类: 开发与工程无需 API Key

redis-pubsub

Redis Pub/Sub 和 Streams 用于消息传递。涵盖发布/订阅、带有消费者组的流以及实时模式。适用于轻量级消息传递和实时功能。使用场景:用户提到“redis pub/sub”、“redis streams”、“实时消息传递”、“轻量级消息传递”、“模式订阅”,询问关于“快速消息传递”、“简单的发布/订阅”、“使用 Redis 的事件流”。不适用场景:需要保证交付 - 使用 `kafka`、`rabbitmq` 或 `pulsar`;复杂的路由 - 使用 `rabbitmq`;高耐久性需求 - 使用 `kafka`;企业级功能 - 使用 `activemq`;云原生 - 使用云服务提供商。

person作者: jakexiaohubgithub

Redis Pub/Sub & Streams Core Knowledge

Full Reference: See advanced.md for Streams consumer groups (Node.js, Java, Python), pending message recovery, TLS configuration, and Sentinel setup.

Deep Knowledge: Use mcp__documentation__fetch_docs with technology: redis-pubsub for comprehensive documentation.

Quick Start (Docker)

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes

Core Concepts

| Feature | Pub/Sub | Streams | |---------|---------|---------| | Persistence | No | Yes | | Consumer Groups | No | Yes | | Message History | No | Yes | | Delivery | Fire-and-forget | At-least-once | | Use Case | Real-time broadcast | Event sourcing, queues |

Pub/Sub (Node.js)

import Redis from 'ioredis';

const publisher = new Redis({ host: 'localhost', port: 6379 });
const subscriber = new Redis({ host: 'localhost', port: 6379 });

// Subscriber
subscriber.subscribe('orders', 'notifications', (err, count) => {
  console.log(`Subscribed to ${count} channels`);
});

subscriber.on('message', (channel, message) => {
  const data = JSON.parse(message);
  console.log(`Received on ${channel}:`, data);
});

// Pattern subscription
subscriber.psubscribe('order.*');
subscriber.on('pmessage', (pattern, channel, message) => {
  console.log(`Pattern ${pattern}, Channel ${channel}:`, message);
});

// Publisher
await publisher.publish('orders', JSON.stringify({
  orderId: '123',
  status: 'created',
}));

Pub/Sub (Java - Spring)

@Configuration
public class RedisPubSubConfig {
    @Bean
    public RedisMessageListenerContainer container(
            RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(orderListener(), new ChannelTopic("orders"));
        container.addMessageListener(orderPatternListener(), new PatternTopic("order.*"));
        return container;
    }

    @Bean
    public MessageListener orderListener() {
        return (message, pattern) -> {
            String body = new String(message.getBody());
            Order order = objectMapper.readValue(body, Order.class);
            processOrder(order);
        };
    }
}

@Service
public class OrderPublisher {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public void publishOrder(Order order) {
        redisTemplate.convertAndSend("orders",
            objectMapper.writeValueAsString(order));
    }
}

Pub/Sub (Python)

import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Subscriber (run in thread)
def subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe('orders')
    pubsub.psubscribe('order.*')

    for message in pubsub.listen():
        if message['type'] in ('message', 'pmessage'):
            data = json.loads(message['data'])
            print(f"Received: {data}")

# Publisher
r.publish('orders', json.dumps({'orderId': '123', 'status': 'created'}))

Stream Commands Reference

| Command | Description | |---------|-------------| | XADD | Add entry to stream | | XREAD | Read entries | | XREADGROUP | Read with consumer group | | XACK | Acknowledge processing | | XPENDING | List pending entries | | XCLAIM | Claim pending entry | | XTRIM | Trim stream size |

When NOT to Use This Skill

  • Guaranteed message delivery - Pub/Sub is fire-and-forget
  • Complex routing patterns - RabbitMQ is better
  • High-throughput event sourcing - Kafka provides better throughput
  • Multi-datacenter replication - Kafka or Pulsar handle this better

Anti-Patterns

| Anti-Pattern | Why It's Bad | Solution | |--------------|--------------|----------| | Pub/Sub for critical data | No persistence | Use Streams | | No XTRIM on streams | Memory growth | Set MAXLEN or auto-trim | | Single consumer group | Can't scale | Use consumer groups | | No XPENDING monitoring | Lost messages | Monitor and claim pending | | No ACK after processing | Premature consumption | ACK only after success |

Quick Troubleshooting

| Issue | Likely Cause | Fix | |-------|--------------|-----| | Messages not received | No active subscribers | Pub/Sub requires active subscribers | | Stream growing unbounded | No XTRIM | Add MAXLEN limit | | High memory usage | Large stream | Trim streams | | Pending messages growing | Consumer crashes | Implement XCLAIM recovery | | Duplicate processing | Consumer group issue | Ensure unique consumer IDs |

Production Checklist

  • [ ] Authentication enabled
  • [ ] TLS configured
  • [ ] Sentinel/Cluster for HA
  • [ ] Stream trimming configured
  • [ ] Consumer group monitoring
  • [ ] Pending message handling
  • [ ] Memory limits set
  • [ ] Persistence configured (AOF)

Monitoring Metrics

| Metric | Alert Threshold | |--------|-----------------| | Stream length | > 1000000 | | Pending entries | > 10000 | | Consumer lag | > 1000 | | Memory usage | > 80% |

Reference Documentation

Available topics: basics, pubsub, streams, patterns