loadbalancer.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package http
  2. import (
  3. "context"
  4. "io"
  5. "net/url"
  6. "strings"
  7. "time"
  8. "github.com/go-kit/kit/endpoint"
  9. "github.com/go-kit/kit/log"
  10. "github.com/go-kit/kit/sd"
  11. "github.com/go-kit/kit/sd/lb"
  12. consulsd "github.com/go-kit/kit/sd/consul"
  13. )
  14. // dec DecodeRequestFunc, enc EncodeResponseFunc
  15. // LoadBalancer endpoint
  16. type LoadBalancer struct {
  17. Ctx context.Context
  18. Inst *consulsd.Instancer
  19. Route string
  20. Dec DecodeResponseFunc
  21. Logger log.Logger
  22. }
  23. var (
  24. // retry max
  25. retryMax = 3
  26. // retry timeout
  27. retryTimeout = 500 * time.Millisecond
  28. )
  29. func factory(ctx context.Context, method, router string, dec DecodeResponseFunc) sd.Factory {
  30. return func(instance string) (endpoint.Endpoint, io.Closer, error) {
  31. if !strings.HasPrefix(instance, "http") {
  32. instance = "http://" + instance
  33. }
  34. tgt, err := url.Parse(instance)
  35. if err != nil {
  36. return nil, nil, err
  37. }
  38. return ClientRequestEndpoint(ctx, tgt, method, router, dec), nil, nil
  39. }
  40. }
  41. // FactoryLoadBalancer factory load balance
  42. func FactoryLoadBalancer(ctx context.Context, instancer *consulsd.Instancer, method, route string, dec DecodeResponseFunc, logger log.Logger) endpoint.Endpoint {
  43. endpointer := sd.NewEndpointer(
  44. instancer,
  45. factory(ctx, method, route, dec),
  46. logger)
  47. balancer := lb.NewRoundRobin(endpointer)
  48. return lb.Retry(retryMax, retryTimeout, balancer)
  49. }
  50. // GetLoadBalancer add GET router to load balance
  51. func GetLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  52. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `GET`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  53. balancer := lb.NewRoundRobin(endpointer)
  54. return lb.Retry(retryMax, retryTimeout, balancer)
  55. }
  56. // PostLoadBalancer add POST router to load balance
  57. func PostLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  58. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `POST`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  59. balancer := lb.NewRoundRobin(endpointer)
  60. return lb.Retry(retryMax, retryTimeout, balancer)
  61. }
  62. // PutLoadBalancer add PUT router to load balance
  63. func PutLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  64. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `PUT`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  65. balancer := lb.NewRoundRobin(endpointer)
  66. return lb.Retry(retryMax, retryTimeout, balancer)
  67. }
  68. // HeadLoadBalancer add HEAD router to load balance
  69. func HeadLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  70. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `HEAD`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  71. balancer := lb.NewRoundRobin(endpointer)
  72. return lb.Retry(retryMax, retryTimeout, balancer)
  73. }
  74. // DeleteLoadBalancer add DELETE router to load balance
  75. func DeleteLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  76. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `DELETE`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  77. balancer := lb.NewRoundRobin(endpointer)
  78. return lb.Retry(retryMax, retryTimeout, balancer)
  79. }
  80. // OptionsLoadBalancer add OPTIONS router to load balance
  81. func OptionsLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  82. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `OPTIONS`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  83. balancer := lb.NewRoundRobin(endpointer)
  84. return lb.Retry(retryMax, retryTimeout, balancer)
  85. }
  86. // PatchLoadBalancer add PATCH router to load balance
  87. func PatchLoadBalancer(loadbalancer LoadBalancer) endpoint.Endpoint {
  88. endpointer := sd.NewEndpointer(loadbalancer.Inst, factory(loadbalancer.Ctx, `PATCH`, loadbalancer.Route, loadbalancer.Dec), loadbalancer.Logger)
  89. balancer := lb.NewRoundRobin(endpointer)
  90. return lb.Retry(retryMax, retryTimeout, balancer)
  91. }