mirror of
https://github.com/versia-pub/versia-go.git
synced 2026-03-13 04:29:15 +01:00
refactor!: working WD-4 user discovery
This commit is contained in:
parent
cf0053312d
commit
61891d891a
91 changed files with 12768 additions and 5562 deletions
|
|
@ -71,15 +71,15 @@ type Client struct {
|
|||
log logr.Logger
|
||||
}
|
||||
|
||||
func NewClient(ctx context.Context, name string, natsClient *nats.Conn, telemetry *unitel.Telemetry, log logr.Logger) (*Client, error) {
|
||||
func NewClient(ctx context.Context, streamName string, natsClient *nats.Conn, telemetry *unitel.Telemetry, log logr.Logger) (*Client, error) {
|
||||
js, err := jetstream.New(natsClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s, err := js.CreateStream(ctx, jetstream.StreamConfig{
|
||||
Name: name,
|
||||
Subjects: []string{name + ".*"},
|
||||
Name: streamName,
|
||||
Subjects: []string{streamName + ".*"},
|
||||
MaxConsumers: -1,
|
||||
MaxMsgs: -1,
|
||||
Discard: jetstream.DiscardOld,
|
||||
|
|
@ -89,7 +89,7 @@ func NewClient(ctx context.Context, name string, natsClient *nats.Conn, telemetr
|
|||
AllowDirect: true,
|
||||
})
|
||||
if errors.Is(err, nats.ErrStreamNameAlreadyInUse) {
|
||||
s, err = js.Stream(ctx, name)
|
||||
s, err = js.Stream(ctx, streamName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -100,8 +100,8 @@ func NewClient(ctx context.Context, name string, natsClient *nats.Conn, telemetr
|
|||
stopCh := make(chan struct{})
|
||||
|
||||
c := &Client{
|
||||
name: name,
|
||||
subject: name + ".tasks",
|
||||
name: streamName,
|
||||
subject: streamName + ".tasks",
|
||||
|
||||
handlers: map[string][]Handler{},
|
||||
|
||||
|
|
@ -145,7 +145,7 @@ func (c *Client) Submit(ctx context.Context, task Task) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.log.V(1).Info("submitted task", "id", task.ID, "type", task.Type, "sequence", msg.Sequence)
|
||||
c.log.V(2).Info("Submitted task", "id", task.ID, "type", task.Type, "sequence", msg.Sequence)
|
||||
|
||||
s.AddAttribute("messaging.message.id", msg.Sequence)
|
||||
|
||||
|
|
@ -153,7 +153,7 @@ func (c *Client) Submit(ctx context.Context, task Task) error {
|
|||
}
|
||||
|
||||
func (c *Client) RegisterHandler(type_ string, handler Handler) {
|
||||
c.log.V(2).Info("registering handler", "type", type_)
|
||||
c.log.V(2).Info("Registering handler", "type", type_)
|
||||
|
||||
if _, ok := c.handlers[type_]; !ok {
|
||||
c.handlers[type_] = []Handler{}
|
||||
|
|
@ -161,13 +161,11 @@ func (c *Client) RegisterHandler(type_ string, handler Handler) {
|
|||
c.handlers[type_] = append(c.handlers[type_], handler)
|
||||
}
|
||||
|
||||
func (c *Client) Start(ctx context.Context) error {
|
||||
c.log.Info("starting")
|
||||
func (c *Client) StartConsumer(ctx context.Context, consumerGroup string) error {
|
||||
c.log.Info("Starting consumer")
|
||||
|
||||
sub, err := c.js.CreateConsumer(ctx, c.name, jetstream.ConsumerConfig{
|
||||
// TODO: set name properly
|
||||
Name: "versia-go",
|
||||
Durable: "versia-go",
|
||||
Durable: consumerGroup,
|
||||
DeliverPolicy: jetstream.DeliverAllPolicy,
|
||||
ReplayPolicy: jetstream.ReplayInstantPolicy,
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
|
|
@ -191,16 +189,16 @@ func (c *Client) Start(ctx context.Context) error {
|
|||
msg, err := m.Next()
|
||||
if err != nil {
|
||||
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
|
||||
c.log.Info("stopping")
|
||||
c.log.Info("Stopping")
|
||||
return
|
||||
}
|
||||
|
||||
c.log.Error(err, "failed to get next message")
|
||||
c.log.Error(err, "Failed to get next message")
|
||||
break
|
||||
}
|
||||
|
||||
if err := c.handleTask(ctx, msg); err != nil {
|
||||
c.log.Error(err, "failed to handle task")
|
||||
c.log.Error(err, "Failed to handle task")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -224,7 +222,7 @@ func (c *Client) handleTask(ctx context.Context, msg jetstream.Msg) error {
|
|||
var w taskWrapper
|
||||
if err := json.Unmarshal(data, &w); err != nil {
|
||||
if err := msg.Nak(); err != nil {
|
||||
c.log.Error(err, "failed to nak message")
|
||||
c.log.Error(err, "Failed to nak message")
|
||||
}
|
||||
|
||||
return err
|
||||
|
|
@ -246,21 +244,21 @@ func (c *Client) handleTask(ctx context.Context, msg jetstream.Msg) error {
|
|||
|
||||
handlers, ok := c.handlers[w.Task.Type]
|
||||
if !ok {
|
||||
c.log.V(1).Info("no handler for task", "type", w.Task.Type)
|
||||
c.log.V(2).Info("No handler for task", "type", w.Task.Type)
|
||||
return msg.Nak()
|
||||
}
|
||||
|
||||
var errs CombinedError
|
||||
for _, handler := range handlers {
|
||||
if err := handler(ctx, w.Task); err != nil {
|
||||
c.log.Error(err, "handler failed", "type", w.Task.Type)
|
||||
c.log.Error(err, "Handler failed", "type", w.Task.Type)
|
||||
errs.Errors = append(errs.Errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs.Errors) > 0 {
|
||||
if err := msg.Nak(); err != nil {
|
||||
c.log.Error(err, "failed to nak message")
|
||||
c.log.Error(err, "Failed to nak message")
|
||||
errs.Errors = append(errs.Errors, err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue