This commit is contained in:
aprilthepink 2024-04-09 19:48:18 +02:00
parent 1c09eb793d
commit b90a332b3c
13 changed files with 1572 additions and 1112 deletions

2077
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,18 +7,22 @@ authors = ["April John <aprl@acab.dev>"]
[dependencies] [dependencies]
tokio = { version = "1.20.0", features = ["rt", "macros"] } tokio = { version = "1.20.0", features = ["rt", "macros"] }
sea-orm = { version = "0.12.12", features = [
"sqlx-postgres",
"runtime-tokio-native-tls",
"with-json",
] }
serde = { version = "1.0.130", features = ["derive"] } serde = { version = "1.0.130", features = ["derive"] }
actix-web = "4" actix-web = "4"
env_logger = "0.11.0" env_logger = "0.11.0"
clap = { version = "4.3.14", features = ["derive"] } clap = { version = "4.3.14", features = ["derive"] }
activitypub_federation = "0.5.2"
[target.'cfg(unix)'.dependencies] anyhow = "1.0.81"
openssl = { version = "0.10.63", features = ["vendored"] } url = "2.5.0"
rand = "0.8.5"
tracing = "0.1.40"
async-trait = "0.1.79"
enum_delegate = "0.2.0"
chrono = "0.4.37"
activitystreams-kinds = "0.3.0"
thiserror = "1.0.58"
num_cpus = "1.16.0"
actix-web-prom = { version = "0.8.0", features = ["process"] }
[build-dependencies] [build-dependencies]
vcpkg = "0.2.15" vcpkg = "0.2.15"

View file

@ -33,6 +33,11 @@
inherit (cargoToml.package) name version; inherit (cargoToml.package) name version;
src = ./.; src = ./.;
cargoLock.lockFile = ./Cargo.lock; cargoLock.lockFile = ./Cargo.lock;
buildInputs = nonRustDeps;
nativeBuildInputs = with pkgs; [
rust-toolchain
pkg-config
];
}; };
# Rust dev environment # Rust dev environment

View file

@ -0,0 +1,74 @@
use crate::{
database::DatabaseHandle,
error::Error,
objects::{person::DbUser, post::Note},
utils::generate_object_id,
objects::post::DbPost,
};
use activitypub_federation::{
activity_sending::SendActivityTask,
config::Data,
fetch::object_id::ObjectId,
kinds::activity::CreateType,
protocol::{context::WithContext, helpers::deserialize_one_or_many},
traits::{ActivityHandler, Object},
};
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct CreatePost {
pub(crate) actor: ObjectId<DbUser>,
#[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>,
pub(crate) object: Note,
#[serde(rename = "type")]
pub(crate) kind: CreateType,
pub(crate) id: Url,
}
impl CreatePost {
pub async fn send(note: Note, inbox: Url, data: &Data<DatabaseHandle>) -> Result<(), Error> {
print!("Sending reply to {}", &note.attributed_to);
let create = CreatePost {
actor: note.attributed_to.clone(),
to: note.to.clone(),
object: note,
kind: CreateType::Create,
id: generate_object_id(data.domain())?,
};
let create_with_context = WithContext::new_default(create);
let sends =
SendActivityTask::prepare(&create_with_context, &data.local_user(), vec![inbox], data)
.await?;
for send in sends {
send.sign_and_send(data).await?;
}
Ok(())
}
}
#[async_trait::async_trait]
impl ActivityHandler for CreatePost {
type DataType = DatabaseHandle;
type Error = crate::error::Error;
fn id(&self) -> &Url {
&self.id
}
fn actor(&self) -> &Url {
self.actor.inner()
}
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
DbPost::verify(&self.object, &self.id, data).await?;
Ok(())
}
async fn receive(self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
DbPost::from_json(self.object, data).await?;
Ok(())
}
}

1
src/activities/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod create_post;

27
src/database.rs Normal file
View file

@ -0,0 +1,27 @@
use crate::{objects::person::DbUser, error::Error};
use anyhow::anyhow;
use std::sync::{Arc, Mutex};
pub type DatabaseHandle = Arc<Database>;
/// Our "database" which contains all known users (local and federated)
#[derive(Debug)]
pub struct Database {
pub users: Mutex<Vec<DbUser>>,
}
impl Database {
pub fn local_user(&self) -> DbUser {
let lock = self.users.lock().unwrap();
lock.first().unwrap().clone()
}
pub fn read_user(&self, name: &str) -> Result<DbUser, Error> {
let db_user = self.local_user();
if name == db_user.name {
Ok(db_user)
} else {
Err(anyhow!("Invalid user {name}").into())
}
}
}

26
src/error.rs Normal file
View file

@ -0,0 +1,26 @@
use std::fmt::{Display, Formatter};
/// Necessary because of this issue: https://github.com/actix/actix-web/issues/1711
#[derive(Debug)]
pub struct Error(pub(crate) anyhow::Error);
impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl actix_web::ResponseError for Error {
fn error_response(&self) -> actix_web::HttpResponse {
actix_web::HttpResponse::InternalServerError().body(self.to_string())
}
}
impl<T> From<T> for Error
where
T: Into<anyhow::Error>,
{
fn from(t: T) -> Self {
Error(t.into())
}
}

97
src/http.rs Normal file
View file

@ -0,0 +1,97 @@
use crate::{
error::Error,
database::DatabaseHandle,
objects::person::{DbUser, PersonAcceptedActivities},
};
use activitypub_federation::{
actix_web::{inbox::receive_activity, signing_actor},
config::{Data, FederationConfig, FederationMiddleware},
fetch::webfinger::{build_webfinger_response, extract_webfinger_name},
protocol::context::WithContext,
traits::{Actor, Object},
FEDERATION_CONTENT_TYPE,
};
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
use anyhow::anyhow;
use serde::Deserialize;
use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain();
info!("Listening with actix-web on {hostname}");
let config = config.clone();
let server = HttpServer::new(move || {
App::new()
.wrap(FederationMiddleware::new(config.clone()))
//.route("/", web::get().to(http_get_system_user))
.route("/{user}", web::get().to(http_get_user))
.route("/{user}/inbox", web::post().to(http_post_user_inbox))
.route("/.well-known/webfinger", web::get().to(webfinger))
})
.bind(hostname)?
.run();
tokio::spawn(server);
Ok(())
}
/// Handles requests to fetch system user json over HTTP
/*pub async fn http_get_system_user(data: Data<DatabaseHandle>) -> Result<HttpResponse, Error> {
let json_user = data.system_user.clone().into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
.json(WithContext::new_default(json_user)))
}*/
/// Handles requests to fetch user json over HTTP
pub async fn http_get_user(
request: HttpRequest,
user_name: web::Path<String>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let signed_by = signing_actor::<DbUser>(&request, None, &data).await?;
// here, checks can be made on the actor or the domain to which
// it belongs, to verify whether it is allowed to access this resource
info!(
"Fetch user request is signed by system account {}",
signed_by.id()
);
let db_user = data.local_user();
if user_name.into_inner() == db_user.name {
let json_user = db_user.into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
.json(WithContext::new_default(json_user)))
} else {
Err(anyhow!("Invalid user").into())
}
}
/// Handles messages received in user inbox
pub async fn http_post_user_inbox(
request: HttpRequest,
body: Bytes,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
request, body, &data,
)
.await
}
#[derive(Deserialize)]
pub struct WebfingerQuery {
resource: String,
}
pub async fn webfinger(
query: web::Query<WebfingerQuery>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let name = extract_webfinger_name(&query.resource, &data)?;
let db_user = data.read_user(name)?;
Ok(HttpResponse::Ok().json(build_webfinger_response(
query.resource.clone(),
db_user.ap_id.into_inner(),
)))
}

View file

@ -1,12 +1,26 @@
use actix_web::{get, middleware, web, App, Error, HttpResponse, HttpServer}; use activitypub_federation::config::{FederationConfig, FederationMiddleware};
use sea_orm::{Database, DatabaseConnection}; use actix_web::{get, http::KeepAlive, middleware, web, App, Error, HttpResponse, HttpServer};
use actix_web_prom::PrometheusMetricsBuilder;
use clap::Parser;
use database::Database;
use http::{http_get_user, http_post_user_inbox, webfinger};
use objects::person::DbUser;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::env; use tokio::signal;
use std::time::Duration; use std::{
collections::HashMap, env, net::ToSocketAddrs, sync::{Arc, Mutex}
};
mod database;
mod objects;
mod activities;
mod utils;
mod error;
mod http;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct State { struct State {
db: DatabaseConnection, database: Arc<Database>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -14,41 +28,83 @@ struct Response {
health: bool, health: bool,
} }
#[derive(Parser, Debug)]
#[clap(author = "April John", version, about)]
/// Application configuration
struct Args {
/// whether to be verbose
#[arg(short = 'v')]
verbose: bool,
/// optional parse arg for config file
#[arg()]
config_file: Option<String>,
}
#[get("/")] #[get("/")]
async fn index(_: web::Data<State>) -> actix_web::Result<HttpResponse, Error> { async fn index(_: web::Data<State>) -> actix_web::Result<HttpResponse, Error> {
Ok(HttpResponse::Ok().json(Response { health: true })) Ok(HttpResponse::Ok().json(Response { health: true }))
} }
const DOMAIN: &str = "example.com";
const LOCAL_USER_NAME: &str = "example";
#[actix_web::main] #[actix_web::main]
async fn main() -> actix_web::Result<()> { async fn main() -> actix_web::Result<(), anyhow::Error> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let server_url = env::var("LISTEN").unwrap_or("127.0.0.1:8080".to_string()); let server_url = env::var("LISTEN").unwrap_or("127.0.0.1:8080".to_string());
let mut opts = let local_user = DbUser::new(env::var("FEDERATED_DOMAIN").unwrap_or(DOMAIN.to_string()).as_str(), env::var("LOCAL_USER_NAME").unwrap_or(LOCAL_USER_NAME.to_string()).as_str()).unwrap();
sea_orm::ConnectOptions::new(env::var("DATABASE_URL").expect("DATABASE_URL ust be set"));
opts.max_connections(5)
.min_connections(1)
.connect_timeout(Duration::from_secs(8))
.acquire_timeout(Duration::from_secs(8))
.idle_timeout(Duration::from_secs(8))
.max_lifetime(Duration::from_secs(8));
let db: DatabaseConnection = Database::connect(opts) let database = Arc::new(Database {
.await users: Mutex::new(vec![local_user]),
.expect("Failed to connect to database"); });
let state = State { db }; let state = State { database };
let _ = HttpServer::new(move || { let data = FederationConfig::builder()
.domain(env::var("FEDERATED_DOMAIN").expect("FEDERATED_DOMAIN must be set"))
.app_data(state.clone().database)
.build().await?;
let mut labels = HashMap::new();
labels.insert("domain".to_string(), env::var("FEDERATED_DOMAIN").expect("FEDERATED_DOMAIN must be set").to_string());
labels.insert("name".to_string(), env::var("LOCAL_USER_NAME").expect("LOCAL_USER_NAME must be set").to_string());
let prometheus = PrometheusMetricsBuilder::new("api")
.endpoint("/metrics")
.const_labels(labels)
.build()
.unwrap();
let http_server = HttpServer::new(move || {
App::new() App::new()
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
.wrap(middleware::Logger::default()) // enable logger .wrap(middleware::Logger::default()) // enable logger
.wrap(prometheus.clone())
.wrap(FederationMiddleware::new(data.clone()))
.route("/{user}", web::get().to(http_get_user))
.route("/{user}/inbox", web::post().to(http_post_user_inbox))
.route("/.well-known/webfinger", web::get().to(webfinger))
.service(index) .service(index)
}) })
.bind(&server_url)? .bind(&server_url)?
.run() .workers(num_cpus::get())
.await?; .shutdown_timeout(20)
.keep_alive(KeepAlive::Os)
.run();
tokio::spawn(http_server);
match signal::ctrl_c().await {
Ok(()) => {},
Err(err) => {
eprintln!("Unable to listen for shutdown signal: {}", err);
// we also shut down in case of error
},
}
Ok(()) Ok(())
} }

2
src/objects/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod person;
pub mod post;

140
src/objects/person.rs Normal file
View file

@ -0,0 +1,140 @@
use crate::{activities::create_post::CreatePost, database::DatabaseHandle, error::Error};
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
http_signatures::generate_actor_keypair,
kinds::actor::PersonType,
protocol::{public_key::PublicKey, verification::verify_domains_match},
traits::{ActivityHandler, Actor, Object},
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use url::Url;
#[derive(Debug, Clone)]
pub struct DbUser {
pub name: String,
pub ap_id: ObjectId<DbUser>,
pub inbox: Url,
// exists for all users (necessary to verify http signatures)
pub public_key: String,
// exists only for local users
pub private_key: Option<String>,
last_refreshed_at: DateTime<Utc>,
pub followers: Vec<Url>,
pub local: bool,
}
/// List of all activities which this actor can receive.
#[derive(Deserialize, Serialize, Debug)]
#[serde(untagged)]
#[enum_delegate::implement(ActivityHandler)]
pub enum PersonAcceptedActivities {
CreateNote(CreatePost),
}
impl DbUser {
pub fn new(hostname: &str, name: &str) -> Result<DbUser, Error> {
let ap_id = Url::parse(&format!("https://{}/{}", hostname, &name))?.into();
let inbox = Url::parse(&format!("https://{}/{}/inbox", hostname, &name))?;
let keypair = generate_actor_keypair()?;
Ok(DbUser {
name: name.to_string(),
ap_id,
inbox,
public_key: keypair.public_key,
private_key: Some(keypair.private_key),
last_refreshed_at: Utc::now(),
followers: vec![],
local: true,
})
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Person {
#[serde(rename = "type")]
kind: PersonType,
preferred_username: String,
id: ObjectId<DbUser>,
inbox: Url,
public_key: PublicKey,
}
#[async_trait::async_trait]
impl Object for DbUser {
type DataType = DatabaseHandle;
type Kind = Person;
type Error = Error;
fn last_refreshed_at(&self) -> Option<DateTime<Utc>> {
Some(self.last_refreshed_at)
}
async fn read_from_id(
object_id: Url,
data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
let users = data.users.lock().unwrap();
let res = users
.clone()
.into_iter()
.find(|u| u.ap_id.inner() == &object_id);
Ok(res)
}
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
Ok(Person {
preferred_username: self.name.clone(),
kind: Default::default(),
id: self.ap_id.clone(),
inbox: self.inbox.clone(),
public_key: self.public_key(),
})
}
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
}
async fn from_json(
json: Self::Kind,
_data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> {
Ok(DbUser {
name: json.preferred_username,
ap_id: json.id,
inbox: json.inbox,
public_key: json.public_key.public_key_pem,
private_key: None,
last_refreshed_at: Utc::now(),
followers: vec![],
local: false,
})
}
}
impl Actor for DbUser {
fn id(&self) -> Url {
self.ap_id.inner().clone()
}
fn public_key_pem(&self) -> &str {
&self.public_key
}
fn private_key_pem(&self) -> Option<String> {
self.private_key.clone()
}
fn inbox(&self) -> Url {
self.inbox.clone()
}
}

104
src/objects/post.rs Normal file
View file

@ -0,0 +1,104 @@
use crate::{
activities::create_post::CreatePost,
database::DatabaseHandle,
error::Error,
utils::generate_object_id,
objects::person::DbUser,
};
use activitypub_federation::{
config::Data,
fetch::object_id::ObjectId,
kinds::{object::NoteType, public},
protocol::{helpers::deserialize_one_or_many, verification::verify_domains_match},
traits::{Actor, Object},
};
use activitystreams_kinds::link::MentionType;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Clone, Debug)]
pub struct DbPost {
pub text: String,
pub ap_id: ObjectId<DbPost>,
pub creator: ObjectId<DbUser>,
pub local: bool,
}
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Note {
#[serde(rename = "type")]
kind: NoteType,
id: ObjectId<DbPost>,
pub(crate) attributed_to: ObjectId<DbUser>,
#[serde(deserialize_with = "deserialize_one_or_many")]
pub(crate) to: Vec<Url>,
content: String,
in_reply_to: Option<ObjectId<DbPost>>,
tag: Vec<Mention>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Mention {
pub href: Url,
#[serde(rename = "type")]
pub kind: MentionType,
}
#[async_trait::async_trait]
impl Object for DbPost {
type DataType = DatabaseHandle;
type Kind = Note;
type Error = Error;
async fn read_from_id(
_object_id: Url,
_data: &Data<Self::DataType>,
) -> Result<Option<Self>, Self::Error> {
Ok(None)
}
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
unimplemented!()
}
async fn verify(
json: &Self::Kind,
expected_domain: &Url,
_data: &Data<Self::DataType>,
) -> Result<(), Self::Error> {
verify_domains_match(json.id.inner(), expected_domain)?;
Ok(())
}
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
println!(
"Received post with content {} and id {}",
&json.content, &json.id
);
let creator = json.attributed_to.dereference(data).await?;
let post = DbPost {
text: json.content,
ap_id: json.id.clone(),
creator: json.attributed_to.clone(),
local: false,
};
let mention = Mention {
href: creator.ap_id.clone().into_inner(),
kind: Default::default(),
};
let note = Note {
kind: Default::default(),
id: generate_object_id(data.domain())?.into(),
attributed_to: data.local_user().ap_id,
to: vec![public()],
content: format!("Hello {}", creator.name),
in_reply_to: Some(json.id.clone()),
tag: vec![mention],
};
CreatePost::send(note, creator.shared_inbox_or_inbox(), data).await?;
Ok(post)
}
}

13
src/utils.rs Normal file
View file

@ -0,0 +1,13 @@
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use url::{ParseError, Url};
/// Just generate random url as object id. In a real project, you probably want to use
/// an url which contains the database id for easy retrieval (or store the random id in db).
pub fn generate_object_id(domain: &str) -> Result<Url, ParseError> {
let id: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(7)
.map(char::from)
.collect();
Url::parse(&format!("https://{}/objects/{}", domain, id))
}