123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- package cache
- import (
- "context"
- "errors"
- "time"
- "github.com/go-redis/redis/v8"
- )
- var (
- ctx = context.Background()
- defaultRedis *RedisCache
- )
- // RedisOptions options
- type RedisOptions struct {
- DB int
- PoolSize int
- DialTimeout time.Duration
- ReadTimeout time.Duration
- WriteTimeout time.Duration
- PoolTimeout time.Duration
- Password string
- Addr string
- }
- // RedisClusterOptions cluster option
- type RedisClusterOptions struct {
- DB int
- PoolSize int
- MaxRetries int
- MinRetryBackoff time.Duration
- MaxRetryBackoff time.Duration
- DialTimeout time.Duration
- ReadTimeout time.Duration
- WriteTimeout time.Duration
- PoolTimeout time.Duration
- IdleTimeout time.Duration
- IdleCheckFrequency time.Duration
- Password string
- Addrs []string
- }
- // RedisCache define
- type RedisCache struct {
- cluster bool
- ps *redis.PubSub
- c *redis.Client
- cc *redis.ClusterClient
- }
- // NewRedisCache new RedisCache object
- func NewRedisCache(opt RedisOptions) (rc *RedisCache, err error) {
- rc = new(RedisCache)
- c := redis.NewClient(&redis.Options{
- Addr: opt.Addr,
- Password: opt.Password,
- DialTimeout: opt.DialTimeout,
- ReadTimeout: opt.ReadTimeout,
- WriteTimeout: opt.WriteTimeout,
- PoolSize: opt.PoolSize,
- PoolTimeout: opt.PoolTimeout,
- })
- _, err = c.Ping(ctx).Result()
- rc.c = c
- rc.cluster = false
- return
- }
- // NewRedisClusterCache new RedisCluster object
- func NewRedisClusterCache(opt RedisClusterOptions) (rc *RedisCache, err error) {
- rc = new(RedisCache)
- var cfg redis.ClusterOptions
- cfg.Addrs = opt.Addrs
- cfg.Password = opt.Password
- cfg.DialTimeout = opt.DialTimeout
- cfg.ReadTimeout = opt.ReadTimeout
- cfg.WriteTimeout = opt.WriteTimeout
- cfg.PoolSize = opt.PoolSize
- cfg.PoolTimeout = opt.PoolTimeout
- cfg.IdleTimeout = opt.IdleTimeout
- cfg.IdleCheckFrequency = opt.IdleCheckFrequency
- c := redis.NewClusterClient(&cfg)
- _, err = c.Ping(ctx).Result()
- rc.cc = c
- rc.cluster = true
- return
- }
- // Reconnect reconnect
- func (c RedisCache) Reconnect() (s string, err error) {
- if c.cluster {
- s, err = c.cc.Ping(ctx).Result()
- return
- }
- s, err = c.c.Ping(ctx).Result()
- return
- }
- // Get get value from cache
- func (c RedisCache) Get(k string) (string, error) {
- if c.cluster {
- return c.cc.Get(ctx, k).Result()
- }
- return c.c.Get(ctx, k).Result()
- }
- // Set key-value to cache
- func (c RedisCache) Set(k, v string, expiration time.Duration) (string, error) {
- if c.cluster {
- return c.cc.Set(ctx, k, v, expiration).Result()
- }
- return c.c.Set(ctx, k, v, expiration).Result()
- }
- // Del delete key from cache
- func (c RedisCache) Del(ks ...string) (int64, error) {
- if c.cluster {
- return c.cc.Del(ctx, ks...).Result()
- }
- return c.c.Del(ctx, ks...).Result()
- }
- // Publish posts the message to the channel.
- func (c RedisCache) Publish(channel string, message interface{}) (int64, error) {
- if c.cluster {
- return c.cc.Publish(ctx, channel, message).Result()
- }
- return c.c.Publish(ctx, channel, message).Result()
- }
- // Subscribe subscribes the client to the specified channels.
- func (c *RedisCache) Subscribe(channels ...string) error {
- if c.cluster {
- c.ps = c.cc.Subscribe(ctx, channels...)
- } else {
- c.ps = c.c.Subscribe(ctx, channels...)
- }
- return c.ps.Ping(ctx)
- }
- // CloseSubscribe close
- func (c *RedisCache) CloseSubscribe() error {
- if c.ps == nil {
- return nil
- }
- return c.ps.Close()
- }
- // Unsubscribe the client from the given channels, or from all of them if none is given.
- func (c *RedisCache) Unsubscribe(channels ...string) error {
- if c.ps == nil {
- return nil
- }
- return c.ps.Unsubscribe(ctx, channels...)
- }
- // ReceiveSubscribeMessage returns a Message or error ignoring Subscription and Pong messages. This is low-level API and in most cases Channel should be used instead.
- func (c *RedisCache) ReceiveSubscribeMessage() (channel, message string, err error) {
- if c.ps == nil {
- err = errors.New("not init subscribe")
- return
- }
- var msg *redis.Message
- msg, err = c.ps.ReceiveMessage(ctx)
- if err != nil {
- return
- }
- channel = msg.Channel
- message = msg.Payload
- return
- }
- // Info redis info
- func (c RedisCache) Info(section ...string) (string, error) {
- if c.cluster {
- return c.cc.Info(ctx, section...).Result()
- }
- return c.c.Info(ctx, section...).Result()
- }
- // ClusterInfo redis cluster info
- func (c RedisCache) ClusterInfo() (string, error) {
- if c.cluster {
- return c.cc.ClusterInfo(ctx).Result()
- }
- return c.c.ClusterInfo(ctx).Result()
- }
- // SetDefaultRedisOption set default option
- func SetDefaultRedisOption(opt RedisOptions) (err error) {
- defaultRedis, err = NewRedisCache(opt)
- return
- }
- // SetDefaultRedisClusterOption set default cluster option
- func SetDefaultRedisClusterOption(opt RedisClusterOptions) (err error) {
- defaultRedis, err = NewRedisClusterCache(opt)
- return
- }
- // RedisReconnect re connect
- func RedisReconnect() (string, error) {
- return defaultRedis.Reconnect()
- }
- // RedisGet get value from cache
- func RedisGet(k string) (string, error) {
- return defaultRedis.Get(k)
- }
- // RedisSet key-value to cache
- func RedisSet(k, v string, expiration time.Duration) (string, error) {
- return defaultRedis.Set(k, v, expiration)
- }
- // RedisDel delete keys from cache
- func RedisDel(keys ...string) (int64, error) {
- return defaultRedis.Del(keys...)
- }
- // RedisInfo redis info
- func RedisInfo(section ...string) (string, error) {
- return defaultRedis.Info(section...)
- }
- // RedisClusterInfo redis cluster info
- func RedisClusterInfo() (string, error) {
- return defaultRedis.ClusterInfo()
- }
- // Publish posts the message to the channel.
- func RedisPublish(channel string, message interface{}) (int64, error) {
- return defaultRedis.Publish(channel, message)
- }
- // Subscribe subscribes the client to the specified channels.
- func RedisSubscribe(channels ...string) error {
- return defaultRedis.Subscribe(channels...)
- }
- // RedisCloseSubscribe close
- func RedisCloseSubscribe() error {
- return defaultRedis.CloseSubscribe()
- }
- // Unsubscribe the client from the given channels, or from all of them if none is given.
- func RedisUnsubscribe(channels ...string) error {
- return defaultRedis.Unsubscribe(channels...)
- }
- // RedisReceiveSubscribeMessage returns a Message or error ignoring Subscription and Pong messages. This is low-level API and in most cases Channel should be used instead.
- func RedisReceiveSubscribeMessage() (channel, message string, err error) {
- return defaultRedis.ReceiveSubscribeMessage()
- }
|