package cache import ( "sync" "time" "github.com/go-redis/redis/v7" ) // RedisConfig config type RedisConfig struct { Addr string UserName string Password string DB int DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration PoolTimeout time.Duration PoolSize int } // RedisClusterConfig redis cluster configure type RedisClusterConfig struct { // A seed list of host:port addresses of cluster nodes. Addrs []string UserName string Password string DB int // The maximum number of retries before giving up. Command is retried // on network errors and MOVED/ASK redirects. // Default is 16. MaxRedirects int // Enables read-only commands on slave nodes. ReadOnly bool // Allows routing read-only commands to the closest master or slave node. RouteByLatency bool //OnConnect func(*Conn) error MaxRetries int MinRetryBackoff time.Duration MaxRetryBackoff time.Duration DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration // PoolSize applies per cluster node and not for the whole cluster. PoolSize int PoolTimeout time.Duration IdleTimeout time.Duration IdleCheckFrequency time.Duration } var ( redisConfig RedisConfig redisClusterConfig RedisClusterConfig once sync.Once cache *RedisCache isCluster bool ) // RedisCache define type RedisCache struct { c *redis.Client cc *redis.ClusterClient } // SetRedisConfig set func SetRedisConfig(cfg RedisConfig) { redisConfig = cfg } // SetRedisClusterConfig set func SetRedisClusterConfig(cfg RedisClusterConfig) { redisClusterConfig = cfg } // NewRedisCache new RedisCache object func NewRedisCache() *RedisCache { client := redis.NewClient(&redis.Options{ Addr: redisConfig.Addr, Username: redisConfig.UserName, Password: redisConfig.Password, DialTimeout: redisConfig.DialTimeout, ReadTimeout: redisConfig.ReadTimeout, WriteTimeout: redisConfig.WriteTimeout, PoolSize: redisConfig.PoolSize, PoolTimeout: redisConfig.PoolTimeout, }) client.Ping().Result() return &RedisCache{c: client} } // NewRedisClusterCache new RedisCluster object func NewRedisClusterCache() *RedisCache { var config redis.ClusterOptions config.Addrs = redisClusterConfig.Addrs config.Username = redisClusterConfig.UserName config.Password = redisClusterConfig.Password config.DialTimeout = redisClusterConfig.DialTimeout config.ReadTimeout = redisClusterConfig.ReadTimeout config.WriteTimeout = redisClusterConfig.WriteTimeout config.PoolSize = redisClusterConfig.PoolSize config.PoolTimeout = redisClusterConfig.PoolTimeout config.IdleTimeout = redisClusterConfig.IdleTimeout config.IdleCheckFrequency = redisClusterConfig.IdleCheckFrequency client := redis.NewClusterClient(&config) client.Ping() return &RedisCache{cc: client} } // Get get value from cache func (c RedisCache) Ping() (string, error) { if c.cc != nil { return c.cc.Ping().Result() } return c.c.Ping().Result() } // Get get value from cache func (c RedisCache) Get(key string) (string, error) { if c.cc != nil { return c.cc.Get(key).Result() } return c.c.Get(key).Result() } // Set set key-value to cache func (c RedisCache) Set(key, value string, expiration time.Duration) error { if c.cc != nil { return c.cc.Set(key, value, expiration).Err() } return c.c.Set(key, value, expiration).Err() } // Del Del value from cache func (c RedisCache) Del(key string) (int64, error) { if c.cc != nil { return c.cc.Del(key).Result() } return c.c.Del(key).Result() } // Info info func (c RedisCache) Info() (string, error) { if c.cc != nil { return c.cc.ClusterInfo().Result() } return c.c.Info().Result() } // Subscribe subscribe message func (c RedisCache) Subscribe(channels string, cb func(channel string, message string, err error)) error { pubsub := &redis.PubSub{} if c.cc != nil { pubsub = c.cc.Subscribe(channels) } else { pubsub = c.c.Subscribe(channels) } defer pubsub.Close() var ( msg *redis.Message err error ) msg, err = pubsub.ReceiveMessage() for err == nil { cb(msg.Channel, msg.Payload, nil) msg, err = pubsub.ReceiveMessage() } cb("", "", err) return err } // Publish publish message func (c RedisCache) Publish(channel, message string) error { if c.cc != nil { return c.cc.Publish(channel, message).Err() } return c.c.Publish(channel, message).Err() } func connect() (*RedisCache, error) { if cache != nil { return cache, nil } //* var err error once.Do(func() { isCluster = false if len(redisConfig.Addr) > 0 { cache = NewRedisCache() } if len(redisClusterConfig.Addrs) > 0 { cache = NewRedisClusterCache() isCluster = true } }) //*/ return cache, err } // Ping ping func Ping() (string, error) { c, err := connect() if err != nil { return "", err } return c.Ping() } // Info info func Info() (string, error) { c, err := connect() if err != nil { return "", err } return c.Info() } // Get key from cache func Get(key string) (string, error) { c, err := connect() if err != nil { return "", err } return c.Get(key) } // Set key-value to cache func Set(key, value string, expiration time.Duration) error { c, err := connect() if err != nil { return err } return c.Set(key, value, expiration) } // Del delete key from cache func Del(key string) (int64, error) { c, err := connect() if err != nil { return 0, err } return c.Del(key) } // Publish publish message func Publish(channel, message string) error { c, err := connect() if err != nil { return err } return c.Publish(channel, message) } func Subscribe(channels string, cb func(channel string, message string, err error)) error { c, err := connect() if err != nil { return err } return c.Subscribe(channels, cb) }