数据库
首页 > 数据库> > Redis-高级篇

Redis-高级篇

作者:互联网

发布订阅

先创建频道名redisWechat

redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages… (press Ctrl-C to quit)

  1. “subscribe”
  2. “redisChat”
  3. (integer) 1

重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就
能接收到消息。

redis 127.0.0.1:6379> PUBLISH redisChat “Redis is a great caching technique”
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat “Learn redis by runoob.com”
(integer) 1

  1. “message”
  2. “redisChat”
  3. “Redis is a great caching technique”
  4. “message”
  5. “redisChat”
  6. “Learn redis by runoob.com”

在这里插入图片描述


//发布者
public class Publisher extends Thread{

    private final JedisPool jedisPool;

    public Publisher(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }


    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();//从连接池子中获取连接
        while (true){
            try {
                String input = reader.readLine();
                if (!"quit".equals(input)){
                    jedis.publish("mychannel",input);
                }else{
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
//订阅者
public class Subscriber extends JedisPubSub {

    @Override
    //收到消息会调用这个方法
    public void onMessage(String channel, String message) {
        System.out.println(String.format("receive one message from channel %s,message is %s",channel,message));
    }

    @Override
    //订阅频道时会调用
    public void onSubscribe(String channel, int subscribedChannels) {

        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }

    @Override
    //取消订阅时会调用
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }
}

//订阅频道
public class SubThread extends Thread{

    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();
    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            //通过subscriber的api去订阅,入参是订阅者和频道名
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            jedis.close();
        }
    }
}

//测试类
public class PubSubDemo {
    public static void main(String[] args) {
        // 连接redis服务端
        JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "119.23.69.255", 6379);

        SubThread subThread1 = new SubThread(jedisPool); //订阅者1
        subThread1.start();
        SubThread subThread2 = new SubThread(jedisPool); //订阅者2
        subThread2.start();
        Publisher publisher = new Publisher(jedisPool); //发布者
        publisher.start();

    }
}

1 即时聊天
2 微博订阅,推送
3 异步处理等…

redis: 如果发布一条消息,没有订阅者的话,这条消息就会丢失,不会存在内存中
rabbitmq: 如果发布一条消息,如果没有消费者消费该队列,那么这条消息就会一直存在队列中

消费者负载均衡

rabbitmq:队列可以被多个消费者同时监控消费,但是每一条消息只能被消费一次,由于rabbitmq的消费确 认机制,因此它能够根据消费者的消费能力而调整它的负载;
redis: 发布订阅模式,一个队列可以被多个消费者同时订阅,当有消息到达时,会将该消息依次发送给每个订 阅者;

队列监控

rabbitmq : 实现了后台监控平台,可以在该平台上看到所有创建的队列的详细情况,良好的后台管理平台可以 方便我们更好的使用;
redis : 没有所谓的监控平台

总结
redis: 轻量级,低延迟,高并发,低可靠性;
rabbitmq:重量级,高可靠,异步,不保证实时;
rabbitmq是一个专门的AMQP协议队列,他的优势就在于提供可靠的队列服务,并且可做到异步,而redis主 要是用于缓存的,redis的发布订阅模块,可用于实现及时性,且可靠性低的功能。

标签:订阅,String,Redis,redis,高级,jedisPool,public,channel
来源: https://blog.csdn.net/ccccc202/article/details/116571777