Archived
1
0

Configuration loading and NATS messaging tests.

This commit is contained in:
2019-05-19 14:41:03 +05:00
parent 1839a73717
commit 2e747d51ef
35 changed files with 8575 additions and 18 deletions

View File

@@ -2,6 +2,7 @@ package nats
import (
// stdlib
"errors"
"log"
"sync"
@@ -51,26 +52,26 @@ func messageHandler(msg *nats.Msg) {
}
// Shutdown unsubscribes from topic and disconnects from NATS.
func Shutdown() {
func Shutdown() error {
log.Println("Unsuscribing from NATS topic...")
err := natsSubscription.Unsubscribe()
if err != nil {
log.Println("ERROR unsubscribing", Topic, "topic:", err.Error())
return errors.New("ERROR unsubscribing " + Topic + " topic: " + err.Error())
}
if natsConn != nil {
log.Println("Closing connection to NATS...")
natsConn.Close()
} else {
log.Println("Connection to NATS wasn't established")
}
return nil
}
// StartListening connects to NATS and starts to listen for messages.
func StartListening() {
func StartListening() error {
nc, err := nats.Connect(config.Cfg.NATS.ConnectionString)
if err != nil {
log.Fatalln("Failed to connect to NATS:", err.Error())
return errors.New("Failed to connect to NATS:" + err.Error())
}
natsConn = nc
@@ -81,7 +82,10 @@ func StartListening() {
// ffmpeger will receive this message!
sub, err1 := nc.Subscribe(Topic, messageHandler)
if err1 != nil {
log.Fatalln("Failed to subscribe to", Topic, "topic:", err1.Error())
return errors.New("Failed to subscribe to " + Topic + " topic: " + err1.Error())
}
natsSubscription = sub
log.Println("Subscribed to topic", Topic)
return nil
}

112
nats/nats_test.go Normal file
View File

@@ -0,0 +1,112 @@
package nats
import (
// stdlib
"flag"
"testing"
// local
"github.com/pztrn/ffmpeger/config"
// other
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/require"
)
func TestNATSInitialization(t *testing.T) {
Initialize()
require.NotNil(t, handlers)
require.Empty(t, handlers)
}
func TestNATSStartListeningAndShutdown(t *testing.T) {
flag.CommandLine = flag.NewFlagSet("ffmpeger-test-nats", flag.ExitOnError)
Initialize()
require.NotNil(t, handlers)
require.Empty(t, handlers)
config.Initialize()
config.Cfg.NATS.ConnectionString = "nats://127.0.0.1:14222"
err := StartListening()
require.Nil(t, err)
err1 := Shutdown()
require.Nil(t, err1)
}
func TestNATSShutdownWithoutConnection(t *testing.T) {
Initialize()
require.NotNil(t, handlers)
require.Empty(t, handlers)
err := Shutdown()
require.NotNil(t, err)
}
func TestNATSConnectToWrongAddress(t *testing.T) {
flag.CommandLine = flag.NewFlagSet("ffmpeger-test-nats", flag.ExitOnError)
Initialize()
require.NotNil(t, handlers)
require.Empty(t, handlers)
config.Initialize()
config.Cfg.NATS.ConnectionString = "nats://127.0.0.1:14223"
err := StartListening()
require.NotNil(t, err)
}
func TestNATSAddHandler(t *testing.T) {
d := func(data []byte) {}
Initialize()
require.NotNil(t, handlers)
require.Empty(t, handlers)
hndl := &Handler{
Name: "testhandler",
Func: d,
}
AddHandler(hndl)
}
func TestNATSReceiveMessage(t *testing.T) {
flag.CommandLine = flag.NewFlagSet("ffmpeger-test-nats", flag.ExitOnError)
Initialize()
require.NotNil(t, handlers)
require.Empty(t, handlers)
config.Initialize()
config.Cfg.NATS.ConnectionString = "nats://127.0.0.1:14222"
err := StartListening()
require.Nil(t, err)
received := make(chan bool, 1)
d := func(data []byte) {
t.Log("Received data:", data)
received <- true
}
hndl := &Handler{
Name: "testhandler",
Func: d,
}
AddHandler(hndl)
// Send message.
nc, err1 := nats.Connect(config.Cfg.NATS.ConnectionString)
require.Nil(t, err1)
err2 := nc.Publish(Topic, []byte("Hello, world!"))
require.Nil(t, err2)
<-received
nc.Close()
err3 := Shutdown()
require.Nil(t, err3)
}