From 83d4161537635797191118454e2eba5992553441 Mon Sep 17 00:00:00 2001 From: Manuel Plonski Date: Tue, 19 Dec 2023 09:58:48 +0100 Subject: [PATCH] adding leader ellections --- data/core.go | 2 ++ data/leaders.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ data/speaker.go | 2 +- 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 data/leaders.go diff --git a/data/core.go b/data/core.go index 8fc103c..5e5590a 100644 --- a/data/core.go +++ b/data/core.go @@ -33,6 +33,8 @@ func InitData(url, username, password string) error { } Client = cli initPlayout() + initSpeaker() + initLeader() dlog.Infoln("Data Repo init completed") return nil } diff --git a/data/leaders.go b/data/leaders.go new file mode 100644 index 0000000..2a92e9c --- /dev/null +++ b/data/leaders.go @@ -0,0 +1,60 @@ +package data + +import ( + "context" + + "github.com/sirupsen/logrus" + "go.etcd.io/etcd/client/v3/concurrency" +) + +const playout_controller_key string = "/v1/leader/speaker" +const primary_speaker_key string = "/v1/leader/playout" +const default_leader_ttl int64 = 5 + +var session *concurrency.Session +var llog *logrus.Entry + +func initLeader() error { + llog = dlog.WithField("component", "leader") + sess, err := concurrency.NewSession(Client, concurrency.WithTTL(int(default_leader_ttl))) + if err != nil { + llog.Errorln(err) + return err + } + llog.Debugln("started session lease (", sess.Lease(), ")") + return nil +} + +// BecomePlayoutController attemts to become the playout controller. Will block until became leader. No error if leader. +func BecomePlayoutController(leader_id string) error { + election := concurrency.NewElection(session, playout_controller_key) + ctx := context.Background() + return election.Campaign(ctx, leader_id) +} +func BecomePrimarySpeaker(leader_id string) error { + election := concurrency.NewElection(session, primary_speaker_key) + ctx := context.Background() + return election.Campaign(ctx, leader_id) +} +func GetPlayoutController() (string, error) { + election := concurrency.NewElection(session, playout_controller_key) + ctx := context.Background() + resp, err := election.Leader(ctx) + if err != nil { + llog.Errorln(err) + return "", err + } + return string(resp.Kvs[0].Value), nil +} + +// GetSpeakerLeader gets the primary speaker id +func GetSpeakerLeader() (string, error) { + election := concurrency.NewElection(session, primary_speaker_key) + ctx := context.Background() + resp, err := election.Leader(ctx) + if err != nil { + llog.Errorln(err) + return "", err + } + return string(resp.Kvs[0].Value), nil +} diff --git a/data/speaker.go b/data/speaker.go index 2fa93e4..1329d4e 100644 --- a/data/speaker.go +++ b/data/speaker.go @@ -60,7 +60,7 @@ func GetSpeaker(id string) (*api.Speaker, error) { // SendHeartbeat sends a speaker state to the db func SendHeartbeat(speaker *api.Speaker) error { - l := plog.WithField("method", "put") + l := slog.WithField("method", "put") data, err := yaml.Marshal(speaker) if err != nil {