You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
123 lines
2.9 KiB
123 lines
2.9 KiB
package rabbit_reconnector
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
|
|
_ "github.com/PCManiac/logrus_init"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
type Reconnector interface {
|
|
RabbitConnect()
|
|
GetConnection() *amqp.Connection
|
|
SubscribeTo(exhangeName string, queueName string, routingKey string) (<-chan amqp.Delivery, *amqp.Channel, error)
|
|
PublishResponse(ch *amqp.Channel, exchangeName string, replyTo string, correlationId string, body string, sessionId string, content_type string) (err error)
|
|
ExecuteRPC(ch *amqp.Channel, exhangeName string, body []byte, strTimeout string, routingKey string) (response []byte, err error)
|
|
PublishTo(ch *amqp.Channel, exhangeName string, routingKey string, body string) (err error)
|
|
}
|
|
|
|
type ReconnectorEventHandler interface {
|
|
AfterReconnect(AmqpConnection *amqp.Connection, ExitSignal chan bool) error
|
|
}
|
|
|
|
type server struct {
|
|
AmqpConnection *amqp.Connection
|
|
AmqpCloseError chan *amqp.Error
|
|
ExitSignal chan bool
|
|
amqpHostName string
|
|
handler ReconnectorEventHandler
|
|
}
|
|
|
|
func (s *server) DoRabbitConnect() (err error) {
|
|
log.WithFields(log.Fields{
|
|
"proc": "RabbitConnect",
|
|
}).Warn("RabbitConnect started")
|
|
|
|
var rabbit *amqp.Connection
|
|
|
|
for i := 0; i < 60; i++ {
|
|
rabbit, err = amqp.Dial(s.amqpHostName)
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"stage": "dial",
|
|
"proc": "RabbitConnect",
|
|
}).Error("AMQP error")
|
|
|
|
time.Sleep(500 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
s.AmqpConnection = rabbit
|
|
|
|
go func() {
|
|
<-rabbit.NotifyClose(make(chan *amqp.Error))
|
|
log.WithFields(log.Fields{
|
|
"proc": "RabbitConnect goroutine",
|
|
}).Error("NotifyClose received. Sending to reconnect.")
|
|
s.AmqpCloseError <- amqp.ErrClosed
|
|
}()
|
|
|
|
if s.handler != nil {
|
|
s.ExitSignal = make(chan bool, 1)
|
|
if err := s.handler.AfterReconnect(s.AmqpConnection, s.ExitSignal); err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"proc": "RabbitConnect",
|
|
}).Error("Failed to AfterReconnect")
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.WithFields(log.Fields{
|
|
"proc": "RabbitConnect",
|
|
}).Warn("RabbitConnect Ok")
|
|
return nil
|
|
}
|
|
|
|
log.WithFields(log.Fields{
|
|
"proc": "RabbitConnect",
|
|
}).Error("AMQP connect error")
|
|
return errors.New("amqp connect error")
|
|
}
|
|
|
|
func (s *server) RabbitConnect() {
|
|
for {
|
|
<-s.AmqpCloseError
|
|
|
|
log.WithFields(log.Fields{
|
|
"proc": "RabbitReConnector",
|
|
}).Error("amqpCloseError received. Reconnecting.")
|
|
|
|
s.ExitSignal <- true
|
|
|
|
err := s.DoRabbitConnect()
|
|
if err != nil {
|
|
log.WithFields(log.Fields{
|
|
"error": err,
|
|
"stage": "RabbitConnect",
|
|
"proc": "RabbitReConnector",
|
|
}).Panic("AMQP error")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *server) GetConnection() *amqp.Connection {
|
|
return s.AmqpConnection
|
|
}
|
|
|
|
func New(amqpHost string, handler ReconnectorEventHandler) Reconnector {
|
|
s := server{
|
|
AmqpCloseError: make(chan *amqp.Error),
|
|
amqpHostName: amqpHost,
|
|
ExitSignal: make(chan bool, 1),
|
|
}
|
|
|
|
if handler != nil {
|
|
s.handler = handler
|
|
}
|
|
|
|
return &s
|
|
}
|
|
|