Начальный импорт

main v0.0.0
commit 1f205e32c8
  1. 21
      LICENSE
  2. 12
      go.mod
  3. 31
      go.sum
  4. 123
      reconnector.go
  5. 310
      rpc.go

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 Valery Stadchenko
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,12 @@
module github.com/PCManiac/rabbit_reconnector
go 1.17
require (
github.com/PCManiac/logrus_init v0.0.0
github.com/caarlos0/env/v6 v6.10.0 // indirect
github.com/google/uuid v1.3.0
github.com/sirupsen/logrus v1.9.0
github.com/streadway/amqp v1.0.0
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect
)

@ -0,0 +1,31 @@
github.com/PCManiac/logrus_init v0.0.0 h1:sn8aQyHLmN/AMTkZfKCQo4mOB5Ckhs+FL85ygNybyMw=
github.com/PCManiac/logrus_init v0.0.0/go.mod h1:mDbCY64MUIrl+gcI37TQGpb22ADZlY8WBJ6tR2pteqk=
github.com/caarlos0/env/v6 v6.10.0 h1:lA7sxiGArZ2KkiqpOQNf8ERBRWI+v8MWIH+eGjSN22I=
github.com/caarlos0/env/v6 v6.10.0/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY=
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -0,0 +1,123 @@
package rabbit_reconnector
import (
"errors"
"time"
_ "github.com/PCManiac/logrus_init"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
type Reconnector interface {
RabbitConnect()
GetConnection() *amqp.Connection
SubscribeTo(exhangeName string, queueName string, routingKey string) (<-chan amqp.Delivery, *amqp.Channel, error)
publishResponse(ch *amqp.Channel, exchangeName string, replyTo string, correlationId string, body string, sessionId string, content_type string) (err error)
executeRPC(ch *amqp.Channel, exhangeName string, body []byte, strTimeout string, routingKey string) (response []byte, err error)
publishTo(ch *amqp.Channel, exhangeName string, routingKey string, body string) (err error)
}
type ReconnectorEventHandler interface {
AfterReconnect(AmqpConnection *amqp.Connection, ExitSignal chan bool) error
}
type server struct {
AmqpConnection *amqp.Connection
AmqpCloseError chan *amqp.Error
ExitSignal chan bool
amqpHostName string
handler ReconnectorEventHandler
}
func (s *server) DoRabbitConnect() (err error) {
log.WithFields(log.Fields{
"proc": "RabbitConnect",
}).Warn("RabbitConnect started")
var rabbit *amqp.Connection
for i := 0; i < 60; i++ {
rabbit, err = amqp.Dial(s.amqpHostName)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"stage": "dial",
"proc": "RabbitConnect",
}).Error("AMQP error")
time.Sleep(500 * time.Millisecond)
continue
}
s.AmqpConnection = rabbit
go func() {
<-rabbit.NotifyClose(make(chan *amqp.Error))
log.WithFields(log.Fields{
"proc": "RabbitConnect goroutine",
}).Error("NotifyClose received. Sending to reconnect.")
s.AmqpCloseError <- amqp.ErrClosed
}()
if s.handler != nil {
s.ExitSignal = make(chan bool, 1)
if err := s.handler.AfterReconnect(s.AmqpConnection, s.ExitSignal); err != nil {
log.WithFields(log.Fields{
"error": err,
"proc": "RabbitConnect",
}).Error("Failed to AfterReconnect")
return err
}
}
log.WithFields(log.Fields{
"proc": "RabbitConnect",
}).Warn("RabbitConnect Ok")
return nil
}
log.WithFields(log.Fields{
"proc": "RabbitConnect",
}).Error("AMQP connect error")
return errors.New("amqp connect error")
}
func (s *server) RabbitConnect() {
for {
<-s.AmqpCloseError
log.WithFields(log.Fields{
"proc": "RabbitReConnector",
}).Error("amqpCloseError received. Reconnecting.")
s.ExitSignal <- true
err := s.DoRabbitConnect()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"stage": "RabbitConnect",
"proc": "RabbitReConnector",
}).Panic("AMQP error")
}
}
}
func (s *server) GetConnection() *amqp.Connection {
return s.AmqpConnection
}
func New(amqpHost string, handler ReconnectorEventHandler) Reconnector {
s := server{
AmqpCloseError: make(chan *amqp.Error),
amqpHostName: amqpHost,
ExitSignal: make(chan bool, 1),
}
if handler != nil {
s.handler = handler
}
return &s
}

310
rpc.go

@ -0,0 +1,310 @@
package rabbit_reconnector
import (
"errors"
"time"
_ "github.com/PCManiac/logrus_init"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
func (s *server) SubscribeTo(exhangeName string, queueName string, routingKey string) (<-chan amqp.Delivery, *amqp.Channel, error) {
ch, err := s.AmqpConnection.Channel()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"stage": "Channel create",
"proc": "RabbitConnect",
}).Error("AMQP error")
return nil, nil, err
}
err = ch.ExchangeDeclare(
exhangeName, // name
"direct", // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "exchange declare",
}).Error("AMQP error.")
return nil, nil, err
}
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
true, // delete when unused
queueName != "", // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "queue declare",
}).Error("AMQP error.")
return nil, nil, err
}
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exhangeName, // exchange
false,
nil,
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "bind",
}).Error("AMQP error.")
return nil, nil, err
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
queueName != "", // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "consume",
}).Error("AMQP error.")
return nil, nil, err
}
return msgs, ch, nil
}
func (s *server) publishResponse(ch *amqp.Channel, exchangeName string, replyTo string, correlationId string, body string, sessionId string, content_type string) (err error) {
err1 := ch.Publish(
exchangeName, // exchange
replyTo, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: content_type,
CorrelationId: correlationId,
Body: []byte(body),
})
if err1 != nil {
log.WithFields(log.Fields{
"error": err1,
"session_id": sessionId,
"proc": "publishResponse",
}).Error("Failed to publish to RPC")
return err
} else {
log.WithFields(log.Fields{
"body": body,
"session_id": sessionId,
"proc": "publishResponse",
"exchange": exchangeName,
}).Trace("Published to RPC")
return nil
}
}
func (s *server) executeRPC(ch *amqp.Channel, exhangeName string, body []byte, strTimeout string, routingKey string) (response []byte, err error) {
err = ch.ExchangeDeclare(
exhangeName, // name
"direct", // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "exchange declare",
"proc": "executeRPC",
}).Error("AMQP error.")
return nil, err
}
q, err := ch.QueueDeclare(
"", // name
true, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "queue declare",
"proc": "executeRPC",
}).Error("AMQP error.")
return nil, err
}
err = ch.QueueBind(
q.Name, // queue name
q.Name, // routing key
exhangeName, // exchange
false,
nil)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "bind",
"proc": "executeRPC",
}).Error("AMQP error.")
return nil, err
}
corrID := uuid.NewString()
err = ch.Publish(
exhangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrID,
ReplyTo: q.Name,
Body: body,
})
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "publish",
"proc": "executeRPC",
}).Error("AMQP error.")
return nil, err
} else {
log.WithFields(log.Fields{
"body": body,
"exchange": exhangeName,
"stage": "publish",
"proc": "executeRPC",
}).Trace("RPC published")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "consume",
"proc": "executeRPC",
}).Error("AMQP error.")
return nil, err
}
timeout_interval, errt := time.ParseDuration(strTimeout)
if errt != nil {
timeout_interval, _ = time.ParseDuration("10s")
log.WithFields(log.Fields{
"config_timeout": strTimeout,
"proc": "executeRPC",
}).Warn("Failed to parse timeout")
}
timeout := time.After(timeout_interval)
for {
select {
case d := <-msgs:
if corrID == d.CorrelationId {
res := string(d.Body)
log.WithFields(log.Fields{
"response": res,
"exchange": exhangeName,
"stage": "for loop",
"proc": "executeRPC",
}).Trace("AMQP RPC response")
return d.Body, nil
}
return
case <-timeout:
log.WithFields(log.Fields{
"error": "AMQP RPC timeout",
"exchange": exhangeName,
"stage": "for loop",
"proc": "executeRPC",
}).Error("AMQP timeout")
return nil, errors.New("timeout")
}
}
}
func (s *server) publishTo(ch *amqp.Channel, exhangeName string, routingKey string, body string) (err error) {
err = ch.ExchangeDeclare(
exhangeName, // name
"direct", // type
true, // durable
true, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "exchange declare",
}).Error("AMQP error.")
return err
}
err = ch.Publish(
exhangeName, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.WithFields(log.Fields{
"error": err,
"exchange": exhangeName,
"stage": "publish",
}).Error("AMQP error.")
return err
} else {
log.WithFields(log.Fields{
"body": body,
"exchange": exhangeName,
"stage": "publish",
}).Trace("Data published")
}
return nil
}
Loading…
Cancel
Save