2024-08-11 03:51:22 +02:00
|
|
|
package main
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"crypto/ed25519"
|
|
|
|
|
"crypto/rand"
|
2024-08-20 22:43:26 +02:00
|
|
|
"crypto/tls"
|
2024-08-11 03:51:22 +02:00
|
|
|
"database/sql"
|
|
|
|
|
"database/sql/driver"
|
|
|
|
|
"net/http"
|
|
|
|
|
"os"
|
|
|
|
|
"os/signal"
|
2024-08-29 00:46:32 +02:00
|
|
|
"slices"
|
2024-08-11 03:51:22 +02:00
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
|
|
"entgo.io/ent/dialect"
|
|
|
|
|
entsql "entgo.io/ent/dialect/sql"
|
|
|
|
|
"git.devminer.xyz/devminer/unitel"
|
2024-08-29 00:46:32 +02:00
|
|
|
"git.devminer.xyz/devminer/unitel/unitelhttp"
|
|
|
|
|
"git.devminer.xyz/devminer/unitel/unitelsql"
|
2024-08-11 03:51:22 +02:00
|
|
|
"github.com/go-logr/logr"
|
|
|
|
|
"github.com/go-logr/zerologr"
|
|
|
|
|
pgx "github.com/jackc/pgx/v5/stdlib"
|
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
|
"github.com/rs/zerolog/log"
|
2024-08-28 00:25:25 +02:00
|
|
|
"github.com/versia-pub/versia-go/config"
|
|
|
|
|
"github.com/versia-pub/versia-go/ent"
|
2024-08-29 00:46:32 +02:00
|
|
|
"github.com/versia-pub/versia-go/ent/instancemetadata"
|
2024-08-28 00:25:25 +02:00
|
|
|
"github.com/versia-pub/versia-go/internal/database"
|
2024-08-29 00:46:32 +02:00
|
|
|
"github.com/versia-pub/versia-go/internal/repository"
|
|
|
|
|
"github.com/versia-pub/versia-go/internal/repository/repo_impls"
|
|
|
|
|
"github.com/versia-pub/versia-go/internal/service/svc_impls"
|
|
|
|
|
"github.com/versia-pub/versia-go/internal/task"
|
|
|
|
|
"github.com/versia-pub/versia-go/internal/task/task_impls"
|
2024-08-28 00:25:25 +02:00
|
|
|
"github.com/versia-pub/versia-go/internal/utils"
|
2024-08-29 00:46:32 +02:00
|
|
|
"github.com/versia-pub/versia-go/internal/validators/val_impls"
|
2024-08-28 00:25:25 +02:00
|
|
|
"github.com/versia-pub/versia-go/pkg/taskqueue"
|
2024-08-11 03:51:22 +02:00
|
|
|
"modernc.org/sqlite"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix
|
|
|
|
|
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func main() {
|
2024-08-29 00:46:32 +02:00
|
|
|
rootCtx, cancelRoot := context.WithCancel(context.Background())
|
|
|
|
|
|
2024-08-11 03:51:22 +02:00
|
|
|
zerolog.SetGlobalLevel(zerolog.TraceLevel)
|
|
|
|
|
zerologr.NameFieldName = "logger"
|
|
|
|
|
zerologr.NameSeparator = "/"
|
|
|
|
|
zerologr.SetMaxV(2)
|
|
|
|
|
|
|
|
|
|
config.Load()
|
|
|
|
|
|
|
|
|
|
tel, err := unitel.Initialize(config.C.Telemetry)
|
|
|
|
|
if err != nil {
|
2024-08-20 22:43:26 +02:00
|
|
|
log.Fatal().Err(err).Msg("Failed to initialize telemetry")
|
2024-08-11 03:51:22 +02:00
|
|
|
}
|
|
|
|
|
|
2024-08-20 22:43:26 +02:00
|
|
|
httpClient := &http.Client{
|
2024-08-24 01:26:56 +02:00
|
|
|
Transport: unitelhttp.NewTracedTransport(
|
|
|
|
|
tel,
|
2024-08-20 22:43:26 +02:00
|
|
|
&http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: config.C.Telemetry.Environment == "development"}},
|
|
|
|
|
[]string{"origin", "x-nonce", "x-signature", "x-signed-by", "sentry-trace", "sentry-baggage"},
|
2024-08-24 14:35:19 +02:00
|
|
|
[]string{"host", "x-nonce", "x-signature", "x-signed-by", "sentry-trace", "sentry-baggage"},
|
2024-08-24 01:26:56 +02:00
|
|
|
unitelhttp.WithLogger(zerologr.New(&log.Logger).WithName("http-client")),
|
2024-08-24 14:35:19 +02:00
|
|
|
unitelhttp.WithTracePropagation(shouldPropagate),
|
2024-08-11 03:51:22 +02:00
|
|
|
),
|
2024-08-20 22:43:26 +02:00
|
|
|
}
|
|
|
|
|
|
2024-08-11 03:51:22 +02:00
|
|
|
log.Debug().Msg("Opening database connection")
|
|
|
|
|
var db *ent.Client
|
|
|
|
|
if strings.HasPrefix(config.C.DatabaseURI, "postgres://") {
|
|
|
|
|
db, err = openDB(tel, false, config.C.DatabaseURI)
|
|
|
|
|
} else {
|
|
|
|
|
db, err = openDB(tel, true, config.C.DatabaseURI)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
2024-08-20 22:43:26 +02:00
|
|
|
log.Fatal().Err(err).Msg("Failed opening connection to the database")
|
2024-08-11 03:51:22 +02:00
|
|
|
}
|
|
|
|
|
defer db.Close()
|
|
|
|
|
|
|
|
|
|
nc, err := nats.Connect(config.C.NATSURI)
|
|
|
|
|
if err != nil {
|
2024-08-20 22:43:26 +02:00
|
|
|
log.Fatal().Err(err).Msg("Failed to connect to NATS")
|
2024-08-11 03:51:22 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug().Msg("Starting taskqueue client")
|
2024-08-29 00:46:32 +02:00
|
|
|
tq, err := taskqueue.NewClient(config.C.NATSStreamName, nc, tel, zerologr.New(&log.Logger).WithName("taskqueue-client"))
|
2024-08-11 03:51:22 +02:00
|
|
|
if err != nil {
|
2024-08-20 22:43:26 +02:00
|
|
|
log.Fatal().Err(err).Msg("Failed to create taskqueue client")
|
2024-08-11 03:51:22 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Debug().Msg("Running schema migration")
|
|
|
|
|
if err := migrateDB(db, zerologr.New(&log.Logger).WithName("migrate-db"), tel); err != nil {
|
2024-08-20 22:43:26 +02:00
|
|
|
log.Fatal().Err(err).Msg("Failed to run schema migration")
|
2024-08-11 03:51:22 +02:00
|
|
|
}
|
|
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
log.Debug().Msg("Initializing instance")
|
|
|
|
|
if err := initInstance(db, tel); err != nil {
|
|
|
|
|
log.Fatal().Err(err).Msg("Failed to initialize instance")
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-11 03:51:22 +02:00
|
|
|
// Stateless services
|
|
|
|
|
|
2024-08-20 22:43:26 +02:00
|
|
|
requestSigner := svc_impls.NewRequestSignerImpl(tel, zerologr.New(&log.Logger).WithName("request-signer"))
|
2024-08-24 01:26:56 +02:00
|
|
|
federationService := svc_impls.NewFederationServiceImpl(httpClient, tel, zerologr.New(&log.Logger).WithName("federation-service"))
|
2024-08-11 03:51:22 +02:00
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
// Repositories
|
2024-08-11 03:51:22 +02:00
|
|
|
|
2024-08-20 22:43:26 +02:00
|
|
|
repos := repo_impls.NewManagerImpl(
|
|
|
|
|
db, tel, zerologr.New(&log.Logger).WithName("repositories"),
|
|
|
|
|
func(db *ent.Client, log logr.Logger, telemetry *unitel.Telemetry) repository.UserRepository {
|
|
|
|
|
return repo_impls.NewUserRepositoryImpl(federationService, db, log, telemetry)
|
|
|
|
|
},
|
|
|
|
|
repo_impls.NewNoteRepositoryImpl,
|
|
|
|
|
repo_impls.NewFollowRepositoryImpl,
|
|
|
|
|
func(db *ent.Client, log logr.Logger, telemetry *unitel.Telemetry) repository.InstanceMetadataRepository {
|
|
|
|
|
return repo_impls.NewInstanceMetadataRepositoryImpl(federationService, db, log, telemetry)
|
|
|
|
|
},
|
|
|
|
|
)
|
2024-08-11 03:51:22 +02:00
|
|
|
|
|
|
|
|
// Validators
|
|
|
|
|
|
|
|
|
|
bodyValidator := val_impls.NewBodyValidator(zerologr.New(&log.Logger).WithName("validation-service"))
|
|
|
|
|
requestValidator := val_impls.NewRequestValidator(repos, tel, zerologr.New(&log.Logger).WithName("request-validator"))
|
|
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
// Task handlers
|
|
|
|
|
|
|
|
|
|
notes := task_impls.NewNoteHandler(federationService, repos, tel, zerologr.New(&log.Logger).WithName("task-note-handler"))
|
|
|
|
|
notesSet := registerTaskHandler(rootCtx, "notes", tq, notes)
|
|
|
|
|
|
|
|
|
|
taskManager := task_impls.NewManager(notes, tel, zerologr.New(&log.Logger).WithName("task-manager"))
|
|
|
|
|
|
2024-08-11 03:51:22 +02:00
|
|
|
// Services
|
|
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
taskService := svc_impls.NewTaskServiceImpl(taskManager, tel, zerologr.New(&log.Logger).WithName("task-service"))
|
2024-08-11 03:51:22 +02:00
|
|
|
userService := svc_impls.NewUserServiceImpl(repos, federationService, tel, zerologr.New(&log.Logger).WithName("user-service"))
|
|
|
|
|
noteService := svc_impls.NewNoteServiceImpl(federationService, taskService, repos, tel, zerologr.New(&log.Logger).WithName("note-service"))
|
|
|
|
|
followService := svc_impls.NewFollowServiceImpl(federationService, repos, tel, zerologr.New(&log.Logger).WithName("follow-service"))
|
2024-08-20 22:43:26 +02:00
|
|
|
inboxService := svc_impls.NewInboxService(federationService, repos, tel, zerologr.New(&log.Logger).WithName("inbox-service"))
|
|
|
|
|
instanceMetadataService := svc_impls.NewInstanceMetadataServiceImpl(federationService, repos, tel, zerologr.New(&log.Logger).WithName("instance-metadata-service"))
|
2024-08-11 03:51:22 +02:00
|
|
|
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
if config.C.Mode == config.ModeWeb || config.C.Mode == config.ModeCombined {
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
if err := server(
|
|
|
|
|
rootCtx,
|
|
|
|
|
tel,
|
|
|
|
|
db,
|
|
|
|
|
nc,
|
|
|
|
|
federationService,
|
|
|
|
|
requestSigner,
|
|
|
|
|
bodyValidator,
|
|
|
|
|
requestValidator,
|
|
|
|
|
userService,
|
|
|
|
|
noteService,
|
|
|
|
|
followService,
|
|
|
|
|
instanceMetadataService,
|
|
|
|
|
inboxService,
|
|
|
|
|
); err != nil {
|
|
|
|
|
log.Fatal().Err(err).Msg("Failed to start server")
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
2024-08-24 16:59:36 +02:00
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
maybeRunTaskHandler(rootCtx, "notes", notesSet, &wg)
|
2024-08-11 03:51:22 +02:00
|
|
|
|
|
|
|
|
signalCh := make(chan os.Signal, 1)
|
|
|
|
|
signal.Notify(signalCh, os.Interrupt)
|
|
|
|
|
<-signalCh
|
|
|
|
|
|
|
|
|
|
log.Info().Msg("Shutting down")
|
|
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
cancelRoot()
|
2024-08-11 03:51:22 +02:00
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-24 01:26:56 +02:00
|
|
|
func openDB(tel *unitel.Telemetry, isSqlite bool, uri string) (*ent.Client, error) {
|
|
|
|
|
s := tel.StartSpan(context.Background(), "function", "main.openDB")
|
2024-08-11 03:51:22 +02:00
|
|
|
defer s.End()
|
|
|
|
|
|
|
|
|
|
var drv driver.Driver
|
|
|
|
|
var dialectType string
|
|
|
|
|
var dbType string
|
|
|
|
|
|
|
|
|
|
if isSqlite {
|
|
|
|
|
log.Debug().Msg("Opening SQLite database connection")
|
|
|
|
|
drv = &sqliteDriver{Driver: &sqlite.Driver{}}
|
|
|
|
|
dialectType = dialect.SQLite
|
|
|
|
|
dbType = "sqlite"
|
|
|
|
|
} else {
|
|
|
|
|
log.Debug().Msg("Opening PostgreSQL database connection")
|
|
|
|
|
drv = &pgx.Driver{}
|
|
|
|
|
dialectType = dialect.Postgres
|
|
|
|
|
dbType = "postgres"
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-24 01:26:56 +02:00
|
|
|
sql.Register(dialectType+"-traced", unitelsql.NewTracedSQL(tel, drv, dbType))
|
2024-08-11 03:51:22 +02:00
|
|
|
|
|
|
|
|
db, err := sql.Open(dialectType+"-traced", uri)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
entDrv := entsql.OpenDB(dialectType, db)
|
|
|
|
|
return ent.NewClient(ent.Driver(entDrv)), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func migrateDB(db *ent.Client, log logr.Logger, telemetry *unitel.Telemetry) error {
|
|
|
|
|
s := telemetry.StartSpan(context.Background(), "function", "main.migrateDB")
|
|
|
|
|
defer s.End()
|
|
|
|
|
ctx := s.Context()
|
|
|
|
|
|
|
|
|
|
log.V(1).Info("Migrating database schema")
|
|
|
|
|
if err := db.Schema.Create(ctx); err != nil {
|
|
|
|
|
log.Error(err, "Failed to migrate database schema")
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.V(1).Info("Database migration complete")
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
func initInstance(db *ent.Client, telemetry *unitel.Telemetry) error {
|
|
|
|
|
s := telemetry.StartSpan(context.Background(), "function", "main.initInstance")
|
2024-08-11 03:51:22 +02:00
|
|
|
defer s.End()
|
|
|
|
|
ctx := s.Context()
|
|
|
|
|
|
|
|
|
|
tx, err := database.BeginTx(ctx, db, telemetry)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer func(tx *database.Tx) {
|
|
|
|
|
if err := tx.Finish(); err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("Failed to finish transaction")
|
|
|
|
|
}
|
|
|
|
|
}(tx)
|
|
|
|
|
ctx = tx.Context()
|
|
|
|
|
|
2024-08-20 22:43:26 +02:00
|
|
|
pub, priv, err := ed25519.GenerateKey(rand.Reader)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("Failed to generate keypair")
|
2024-08-11 03:51:22 +02:00
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2024-08-20 22:43:26 +02:00
|
|
|
err = tx.InstanceMetadata.Create().
|
|
|
|
|
SetIsRemote(false).
|
|
|
|
|
SetURI(utils.InstanceMetadataAPIURL().String()).
|
|
|
|
|
SetName(config.C.InstanceName).
|
|
|
|
|
SetNillableDescription(config.C.InstanceDescription).
|
|
|
|
|
SetHost(config.C.Host).
|
|
|
|
|
SetPrivateKey(priv).
|
|
|
|
|
SetPublicKey(pub).
|
|
|
|
|
SetPublicKeyAlgorithm("ed25519").
|
|
|
|
|
SetSoftwareName("versia-go").
|
|
|
|
|
SetSoftwareVersion("0.0.1").
|
|
|
|
|
SetSharedInboxURI(utils.SharedInboxAPIURL().String()).
|
|
|
|
|
SetAdminsURI(utils.InstanceMetadataAdminsAPIURL().String()).
|
|
|
|
|
SetModeratorsURI(utils.InstanceMetadataModeratorsAPIURL().String()).
|
|
|
|
|
SetSupportedVersions([]string{"0.4.0"}).
|
|
|
|
|
SetSupportedExtensions([]string{}).
|
|
|
|
|
//
|
|
|
|
|
OnConflictColumns(instancemetadata.FieldHost).
|
|
|
|
|
UpdateName().
|
|
|
|
|
UpdateDescription().
|
|
|
|
|
UpdateHost().
|
|
|
|
|
UpdateSoftwareName().
|
|
|
|
|
UpdateSoftwareVersion().
|
|
|
|
|
UpdateSharedInboxURI().
|
|
|
|
|
UpdateAdminsURI().
|
|
|
|
|
UpdateModeratorsURI().
|
|
|
|
|
UpdateSupportedVersions().
|
|
|
|
|
UpdateSupportedExtensions().
|
|
|
|
|
Exec(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error().Err(err).Msg("Failed to create server metadata")
|
|
|
|
|
return err
|
2024-08-11 03:51:22 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tx.MarkForCommit()
|
|
|
|
|
|
|
|
|
|
return tx.Finish()
|
|
|
|
|
}
|
2024-08-25 00:14:03 +02:00
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
func registerTaskHandler[T task.Handler](ctx context.Context, name string, tq *taskqueue.Client, handler T) *taskqueue.Set {
|
|
|
|
|
s, err := tq.Set(ctx, name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatal().Err(err).Str("handler", name).Msg("Could not create taskset for task handler")
|
|
|
|
|
}
|
2024-08-25 00:14:03 +02:00
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
handler.Register(s)
|
2024-08-25 00:14:03 +02:00
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
return s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func maybeRunTaskHandler(ctx context.Context, name string, set *taskqueue.Set, wg *sync.WaitGroup) {
|
|
|
|
|
l := log.With().Str("handler", name).Logger()
|
|
|
|
|
|
|
|
|
|
if config.C.Mode == config.ModeWeb {
|
|
|
|
|
l.Warn().Strs("requested", config.C.Consumers).Msg("Not starting task handler, as this process is running in web mode")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if config.C.Mode == config.ModeConsumer && !slices.Contains(config.C.Consumers, name) {
|
|
|
|
|
l.Warn().Strs("requested", config.C.Consumers).Msg("Not starting task handler, as it wasn't requested")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wg.Add(1)
|
2024-08-25 00:14:03 +02:00
|
|
|
|
2024-08-29 00:46:32 +02:00
|
|
|
c := set.Consumer(name)
|
|
|
|
|
if err := c.Start(ctx); err != nil {
|
|
|
|
|
l.Fatal().Err(err).Msg("Could not start task handler")
|
2024-08-25 00:14:03 +02:00
|
|
|
}
|
2024-08-29 00:46:32 +02:00
|
|
|
|
|
|
|
|
l.Info().Msg("Started task handler")
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
|
|
<-ctx.Done()
|
|
|
|
|
l.Debug().Msg("Got signal to stop task handler")
|
|
|
|
|
|
|
|
|
|
c.Close()
|
|
|
|
|
|
|
|
|
|
l.Info().Msg("Stopped task handler")
|
|
|
|
|
}()
|
2024-08-25 00:14:03 +02:00
|
|
|
}
|