commit 1f205e32c87d23fb0085af18a3f643554e58c4c3 Author: Валерий Стадченко Date: Mon Sep 5 10:46:14 2022 +0300 Начальный импорт diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..882ca7f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 Valery Stadchenko + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..486794e --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module github.com/PCManiac/rabbit_reconnector + +go 1.17 + +require ( + github.com/PCManiac/logrus_init v0.0.0 + github.com/caarlos0/env/v6 v6.10.0 // indirect + github.com/google/uuid v1.3.0 + github.com/sirupsen/logrus v1.9.0 + github.com/streadway/amqp v1.0.0 + golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..04cb62a --- /dev/null +++ b/go.sum @@ -0,0 +1,31 @@ +github.com/PCManiac/logrus_init v0.0.0 h1:sn8aQyHLmN/AMTkZfKCQo4mOB5Ckhs+FL85ygNybyMw= +github.com/PCManiac/logrus_init v0.0.0/go.mod h1:mDbCY64MUIrl+gcI37TQGpb22ADZlY8WBJ6tR2pteqk= +github.com/caarlos0/env/v6 v6.10.0 h1:lA7sxiGArZ2KkiqpOQNf8ERBRWI+v8MWIH+eGjSN22I= +github.com/caarlos0/env/v6 v6.10.0/go.mod h1:hvp/ryKXKipEkcuYjs9mI4bBCg+UI0Yhgm5Zu0ddvwc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 h1:v6hYoSR9T5oet+pMXwUWkbiVqx/63mlHjefrHmxwfeY= +golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/reconnector.go b/reconnector.go new file mode 100644 index 0000000..36eb3ee --- /dev/null +++ b/reconnector.go @@ -0,0 +1,123 @@ +package rabbit_reconnector + +import ( + "errors" + "time" + + _ "github.com/PCManiac/logrus_init" + log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" +) + +type Reconnector interface { + RabbitConnect() + GetConnection() *amqp.Connection + SubscribeTo(exhangeName string, queueName string, routingKey string) (<-chan amqp.Delivery, *amqp.Channel, error) + publishResponse(ch *amqp.Channel, exchangeName string, replyTo string, correlationId string, body string, sessionId string, content_type string) (err error) + executeRPC(ch *amqp.Channel, exhangeName string, body []byte, strTimeout string, routingKey string) (response []byte, err error) + publishTo(ch *amqp.Channel, exhangeName string, routingKey string, body string) (err error) +} + +type ReconnectorEventHandler interface { + AfterReconnect(AmqpConnection *amqp.Connection, ExitSignal chan bool) error +} + +type server struct { + AmqpConnection *amqp.Connection + AmqpCloseError chan *amqp.Error + ExitSignal chan bool + amqpHostName string + handler ReconnectorEventHandler +} + +func (s *server) DoRabbitConnect() (err error) { + log.WithFields(log.Fields{ + "proc": "RabbitConnect", + }).Warn("RabbitConnect started") + + var rabbit *amqp.Connection + + for i := 0; i < 60; i++ { + rabbit, err = amqp.Dial(s.amqpHostName) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "stage": "dial", + "proc": "RabbitConnect", + }).Error("AMQP error") + + time.Sleep(500 * time.Millisecond) + continue + } + + s.AmqpConnection = rabbit + + go func() { + <-rabbit.NotifyClose(make(chan *amqp.Error)) + log.WithFields(log.Fields{ + "proc": "RabbitConnect goroutine", + }).Error("NotifyClose received. Sending to reconnect.") + s.AmqpCloseError <- amqp.ErrClosed + }() + + if s.handler != nil { + s.ExitSignal = make(chan bool, 1) + if err := s.handler.AfterReconnect(s.AmqpConnection, s.ExitSignal); err != nil { + log.WithFields(log.Fields{ + "error": err, + "proc": "RabbitConnect", + }).Error("Failed to AfterReconnect") + return err + } + } + + log.WithFields(log.Fields{ + "proc": "RabbitConnect", + }).Warn("RabbitConnect Ok") + return nil + } + + log.WithFields(log.Fields{ + "proc": "RabbitConnect", + }).Error("AMQP connect error") + return errors.New("amqp connect error") +} + +func (s *server) RabbitConnect() { + for { + <-s.AmqpCloseError + + log.WithFields(log.Fields{ + "proc": "RabbitReConnector", + }).Error("amqpCloseError received. Reconnecting.") + + s.ExitSignal <- true + + err := s.DoRabbitConnect() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "stage": "RabbitConnect", + "proc": "RabbitReConnector", + }).Panic("AMQP error") + } + } +} + +func (s *server) GetConnection() *amqp.Connection { + return s.AmqpConnection +} + +func New(amqpHost string, handler ReconnectorEventHandler) Reconnector { + s := server{ + AmqpCloseError: make(chan *amqp.Error), + amqpHostName: amqpHost, + ExitSignal: make(chan bool, 1), + } + + if handler != nil { + s.handler = handler + } + + return &s +} diff --git a/rpc.go b/rpc.go new file mode 100644 index 0000000..bc657c2 --- /dev/null +++ b/rpc.go @@ -0,0 +1,310 @@ +package rabbit_reconnector + +import ( + "errors" + "time" + + _ "github.com/PCManiac/logrus_init" + "github.com/google/uuid" + log "github.com/sirupsen/logrus" + "github.com/streadway/amqp" +) + +func (s *server) SubscribeTo(exhangeName string, queueName string, routingKey string) (<-chan amqp.Delivery, *amqp.Channel, error) { + ch, err := s.AmqpConnection.Channel() + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "stage": "Channel create", + "proc": "RabbitConnect", + }).Error("AMQP error") + return nil, nil, err + } + + err = ch.ExchangeDeclare( + exhangeName, // name + "direct", // type + true, // durable + true, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "exchange declare", + }).Error("AMQP error.") + return nil, nil, err + } + + q, err := ch.QueueDeclare( + queueName, // name + true, // durable + true, // delete when unused + queueName != "", // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "queue declare", + }).Error("AMQP error.") + return nil, nil, err + } + err = ch.QueueBind( + q.Name, // queue name + routingKey, // routing key + exhangeName, // exchange + false, + nil, + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "bind", + }).Error("AMQP error.") + return nil, nil, err + } + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + queueName != "", // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "consume", + }).Error("AMQP error.") + return nil, nil, err + } + return msgs, ch, nil +} + +func (s *server) publishResponse(ch *amqp.Channel, exchangeName string, replyTo string, correlationId string, body string, sessionId string, content_type string) (err error) { + err1 := ch.Publish( + exchangeName, // exchange + replyTo, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: content_type, + CorrelationId: correlationId, + Body: []byte(body), + }) + + if err1 != nil { + log.WithFields(log.Fields{ + "error": err1, + "session_id": sessionId, + "proc": "publishResponse", + }).Error("Failed to publish to RPC") + return err + } else { + log.WithFields(log.Fields{ + "body": body, + "session_id": sessionId, + "proc": "publishResponse", + "exchange": exchangeName, + }).Trace("Published to RPC") + return nil + } +} + +func (s *server) executeRPC(ch *amqp.Channel, exhangeName string, body []byte, strTimeout string, routingKey string) (response []byte, err error) { + err = ch.ExchangeDeclare( + exhangeName, // name + "direct", // type + true, // durable + true, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "exchange declare", + "proc": "executeRPC", + }).Error("AMQP error.") + return nil, err + } + + q, err := ch.QueueDeclare( + "", // name + true, // durable + true, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "queue declare", + "proc": "executeRPC", + }).Error("AMQP error.") + return nil, err + } + + err = ch.QueueBind( + q.Name, // queue name + q.Name, // routing key + exhangeName, // exchange + false, + nil) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "bind", + "proc": "executeRPC", + }).Error("AMQP error.") + return nil, err + } + + corrID := uuid.NewString() + + err = ch.Publish( + exhangeName, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: corrID, + ReplyTo: q.Name, + Body: body, + }) + + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "publish", + "proc": "executeRPC", + }).Error("AMQP error.") + return nil, err + } else { + log.WithFields(log.Fields{ + "body": body, + "exchange": exhangeName, + "stage": "publish", + "proc": "executeRPC", + }).Trace("RPC published") + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + true, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "consume", + "proc": "executeRPC", + }).Error("AMQP error.") + return nil, err + } + + timeout_interval, errt := time.ParseDuration(strTimeout) + if errt != nil { + timeout_interval, _ = time.ParseDuration("10s") + log.WithFields(log.Fields{ + "config_timeout": strTimeout, + "proc": "executeRPC", + }).Warn("Failed to parse timeout") + } + + timeout := time.After(timeout_interval) + for { + select { + case d := <-msgs: + if corrID == d.CorrelationId { + res := string(d.Body) + + log.WithFields(log.Fields{ + "response": res, + "exchange": exhangeName, + "stage": "for loop", + "proc": "executeRPC", + }).Trace("AMQP RPC response") + + return d.Body, nil + } + return + case <-timeout: + log.WithFields(log.Fields{ + "error": "AMQP RPC timeout", + "exchange": exhangeName, + "stage": "for loop", + "proc": "executeRPC", + }).Error("AMQP timeout") + return nil, errors.New("timeout") + } + } +} + +func (s *server) publishTo(ch *amqp.Channel, exhangeName string, routingKey string, body string) (err error) { + err = ch.ExchangeDeclare( + exhangeName, // name + "direct", // type + true, // durable + true, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "exchange declare", + }).Error("AMQP error.") + return err + } + + err = ch.Publish( + exhangeName, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body), + }) + + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "exchange": exhangeName, + "stage": "publish", + }).Error("AMQP error.") + return err + } else { + log.WithFields(log.Fields{ + "body": body, + "exchange": exhangeName, + "stage": "publish", + }).Trace("Data published") + } + + return nil +}