adding leader ellections

This commit is contained in:
2023-12-19 09:58:48 +01:00
parent c894e3438f
commit 83d4161537
3 changed files with 63 additions and 1 deletions

View File

@@ -33,6 +33,8 @@ func InitData(url, username, password string) error {
}
Client = cli
initPlayout()
initSpeaker()
initLeader()
dlog.Infoln("Data Repo init completed")
return nil
}

60
data/leaders.go Normal file
View File

@@ -0,0 +1,60 @@
package data
import (
"context"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/client/v3/concurrency"
)
const playout_controller_key string = "/v1/leader/speaker"
const primary_speaker_key string = "/v1/leader/playout"
const default_leader_ttl int64 = 5
var session *concurrency.Session
var llog *logrus.Entry
func initLeader() error {
llog = dlog.WithField("component", "leader")
sess, err := concurrency.NewSession(Client, concurrency.WithTTL(int(default_leader_ttl)))
if err != nil {
llog.Errorln(err)
return err
}
llog.Debugln("started session lease (", sess.Lease(), ")")
return nil
}
// BecomePlayoutController attemts to become the playout controller. Will block until became leader. No error if leader.
func BecomePlayoutController(leader_id string) error {
election := concurrency.NewElection(session, playout_controller_key)
ctx := context.Background()
return election.Campaign(ctx, leader_id)
}
func BecomePrimarySpeaker(leader_id string) error {
election := concurrency.NewElection(session, primary_speaker_key)
ctx := context.Background()
return election.Campaign(ctx, leader_id)
}
func GetPlayoutController() (string, error) {
election := concurrency.NewElection(session, playout_controller_key)
ctx := context.Background()
resp, err := election.Leader(ctx)
if err != nil {
llog.Errorln(err)
return "", err
}
return string(resp.Kvs[0].Value), nil
}
// GetSpeakerLeader gets the primary speaker id
func GetSpeakerLeader() (string, error) {
election := concurrency.NewElection(session, primary_speaker_key)
ctx := context.Background()
resp, err := election.Leader(ctx)
if err != nil {
llog.Errorln(err)
return "", err
}
return string(resp.Kvs[0].Value), nil
}

View File

@@ -60,7 +60,7 @@ func GetSpeaker(id string) (*api.Speaker, error) {
// SendHeartbeat sends a speaker state to the db
func SendHeartbeat(speaker *api.Speaker) error {
l := plog.WithField("method", "put")
l := slog.WithField("method", "put")
data, err := yaml.Marshal(speaker)
if err != nil {