5
0
Fork 0
mirror of https://0xacab.org/sutty/sutty synced 2024-11-22 14:56:22 +00:00

feat: procesar actividades en segundo plano

en lugar de hacerlo en el momento, respondemos lo más rápido posible a
la social inbox, ya que un webhook fallado genera un error en la social
inbox, que genera un error en la instancia remota, que va a volver a
intentar muchas veces.

ahora recibimos una vez, si falla el procesamiento, lo detenemos para
que alguien humane actúe al respecto.
This commit is contained in:
f 2024-03-16 14:15:32 -03:00
parent 3a0f1584c1
commit adfd6c78bb
No known key found for this signature in database
2 changed files with 167 additions and 158 deletions

View file

@ -5,196 +5,51 @@ module Api
module Webhooks module Webhooks
# Recibe webhooks de la Social Inbox # Recibe webhooks de la Social Inbox
# #
# @todo Mover todo a un Job que obtenga el objeto remoto antes de
# instanciar el objeto localmente en lugar de arreglarlo después y
# poder responder lo más rápido posible el webhook.
# @see {https://www.w3.org/TR/activitypub/} # @see {https://www.w3.org/TR/activitypub/}
class SocialInboxController < BaseController class SocialInboxController < BaseController
include Api::V1::Webhooks::Concerns::WebhookConcern include Api::V1::Webhooks::Concerns::WebhookConcern
# Validar que el token sea correcto
before_action :usuarie
# Cuando una actividad ingresa en la cola de moderación, la # Cuando una actividad ingresa en la cola de moderación, la
# recibimos por acá # recibimos por acá
# #
# Vamos a recibir Create, Update, Delete, Follow, Undo y obtener # Vamos a recibir Create, Update, Delete, Follow, Undo,
# el objeto dentro de cada una para guardar un estado asociado # Announce, Like y obtener el objeto dentro de cada una para
# al sitio. # guardar un estado asociado al sitio.
# #
# El objeto del estado puede ser un objeto o une actore, # El objeto del estado puede ser un objeto o une actore,
# dependiendo de la actividad. # dependiendo de la actividad.
def moderationqueued def moderationqueued
# Devuelve un error si el token no es válido process! :paused
usuarie.present?
::ActivityPub.transaction do
# Crea todos los registros necesarios y actualiza el estado
actor.present?
instance.present?
object.present?
activity_pub.present?
activity.update_activity_pub_state!
end
rescue ActiveRecord::RecordInvalid => e
ExceptionNotifier.notify_exception(e,
data: { site: site.name, usuarie: usuarie.email,
activity: original_activity })
ensure
head :accepted head :accepted
end end
# Cuando la Social Inbox acepta una actividad, la recibimos # Cuando la Social Inbox acepta una actividad, la recibimos
# igual y la guardamos por si cambiamos de idea. # igual y la guardamos por si cambiamos de idea.
#
# @todo DRY
def onapproved def onapproved
::ActivityPub.transaction do process! :approved
actor.present?
instance.present?
object.present?
activity.present?
activity_pub.update(aasm_state: 'approved')
activity.update_activity_pub_state!
end
head :accepted head :accepted
end end
# Cuando la Social Inbox rechaza una actividad, la recibimos # Cuando la Social Inbox rechaza una actividad, la recibimos
# igual y la guardamos por si cambiamos de idea. # igual y la guardamos por si cambiamos de idea.
#
# @todo DRY
def onrejected def onrejected
::ActivityPub.transaction do process! :rejected
actor.present?
instance.present?
object.present?
activity.present?
activity_pub.update(aasm_state: 'rejected')
end
head :accepted head :accepted
end end
private private
# Si el objeto ya viene incorporado en la actividad o lo tenemos # Envía la actividad para procesamiento por separado.
# que traer remotamente.
# #
# @return [Bool] # @param initial_state [Symbol]
def object_embedded? def process!(initial_state)
@object_embedded ||= original_activity[:object].is_a?(Hash) ::ActivityPub::ProcessJob.perform_later(site: site, body: request.raw_post, initial_state: initial_state)
end
# Encuentra la URI del objeto o falla si no la encuentra.
#
# @return [String]
def object_uri
@object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object])
ensure
raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank?
end
# Atajo a la instancia
#
# @return [ActivityPub::Instance]
def instance
actor.instance
end
# Genera un objeto a partir de la actividad. Si el objeto ya
# existe, actualiza su contenido. Si el objeto no viene
# incorporado, obtenemos el contenido más tarde.
#
# @return [ActivityPub::Object]
def object
@object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o|
# XXX: Si el objeto es una actividad, esto siempre va a ser
# Generic
o.type ||= 'ActivityPub::Object::Generic'
if object_embedded?
o.content = original_object
begin
type = original_object[:type].presence
o.type = "ActivityPub::Object::#{type}".constantize if type
rescue NameError
end
end
o.save!
# XXX: el objeto necesita ser guardado antes de poder
# procesarlo. No usamos GlobalID porque el tipo de objeto
# cambia y produce un error de deserialización.
::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded?
end
end
# Genera el seguimiento del estado del objeto con respecto al
# sitio.
#
# @return [ActivityPub]
def activity_pub
@activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance, object_id: object.id, object_type: object.type)
end
# Crea la actividad y la vincula con el estado
#
# @return [ActivityPub::Activity]
def activity
@activity ||=
::ActivityPub::Activity
.type_from(original_activity)
.find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a|
a.content = original_activity.dup
a.content[:object] = object.uri
a.save!
end
end
# Actor, si no hay instancia, la crea en el momento, junto con
# su estado de moderación.
#
# @return [Actor]
def actor
@actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a|
unless a.instance
a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname)
::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance)
end
site.instance_moderations.find_or_create_by(instance: a.instance)
a.save!
site.actor_moderations.find_or_create_by(actor: a)
::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a)
end
end
# Descubre la actividad recibida, generando un error si la
# actividad no está dirigida a nosotres.
#
# @todo Validar formato
# @return [Hash]
def original_activity
@original_activity ||= FastJsonparser.parse(request.raw_post).tap do |activity|
raise '@context missing' unless activity[:@context].presence
raise 'id missing' unless activity[:id].presence
raise 'object missing' unless activity[:object].presence
rescue RuntimeError => e
raise ActiveRecord::RecordNotFound, e.message
end
end
# @return [Hash,String]
def original_object
@original_object ||= original_activity[:object].dup.tap do |o|
o[:@context] = original_activity[:@context].dup
end
end end
end end
end end

View file

@ -0,0 +1,154 @@
# frozen_string_literal: true
class ActivityPub
# Procesar las actividades a medida que llegan
class ProcessJob < ApplicationJob
attr_reader :body
# Procesa la actividad en segundo plano
#
# @param :body [String]
# @param :initial_state [Symbol,String]
def perform(site:, body:, initial_state: :paused)
@body = body
@site = site
ActiveRecord::Base.connection_pool.with_connection do
::ActivityPub.transaction do
# Crea todos los registros necesarios y actualiza el estado
actor.present?
instance.present?
object.present?
activity_pub.present?
activity_pub.update(aasm_state: initial_state)
activity.update_activity_pub_state!
end
end
# Al generar una excepción, en lugar de seguir intentando, enviamos
# el reporte.
rescue Exception => e
ExceptionNotifier.notify_exception(e, data: { site: site.name, activity: original_activity })
end
private
# Si el objeto ya viene incorporado en la actividad o lo tenemos
# que traer remotamente.
#
# @return [Bool]
def object_embedded?
@object_embedded ||= original_activity[:object].is_a?(Hash)
end
# Encuentra la URI del objeto o falla si no la encuentra.
#
# @return [String]
def object_uri
@object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object])
ensure
raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank?
end
# Atajo a la instancia
#
# @return [ActivityPub::Instance]
def instance
actor.instance
end
# Genera un objeto a partir de la actividad. Si el objeto ya
# existe, actualiza su contenido. Si el objeto no viene
# incorporado, obtenemos el contenido más tarde.
#
# @return [ActivityPub::Object]
def object
@object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o|
# XXX: Si el objeto es una actividad, esto siempre va a ser
# Generic
o.type ||= 'ActivityPub::Object::Generic'
if object_embedded?
o.content = original_object
begin
type = original_object[:type].presence
o.type = "ActivityPub::Object::#{type}".constantize if type
rescue NameError
end
end
o.save!
# XXX: el objeto necesita ser guardado antes de poder
# procesarlo. No usamos GlobalID porque el tipo de objeto
# cambia y produce un error de deserialización.
::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded?
end
end
# Genera el seguimiento del estado del objeto con respecto al
# sitio.
#
# @return [ActivityPub]
def activity_pub
@activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance,
object_id: object.id, object_type: object.type)
end
# Crea la actividad y la vincula con el estado
#
# @return [ActivityPub::Activity]
def activity
@activity ||=
::ActivityPub::Activity
.type_from(original_activity)
.find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a|
a.content = original_activity.dup
a.content[:object] = object.uri
a.save!
end
end
# Actor, si no hay instancia, la crea en el momento, junto con
# su estado de moderación.
#
# @return [Actor]
def actor
@actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a|
unless a.instance
a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname)
::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance)
end
site.instance_moderations.find_or_create_by(instance: a.instance)
a.save!
site.actor_moderations.find_or_create_by(actor: a)
::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a)
end
end
# @return [Hash,String]
def original_object
@original_object ||= original_activity[:object].dup.tap do |o|
o[:@context] = original_activity[:@context].dup
end
end
# Descubre la actividad recibida, generando un error si la
# actividad no está dirigida a nosotres.
#
# @todo Validar formato con Dry::Schema
# @return [Hash]
def original_activity
@original_activity ||= FastJsonparser.parse(body).tap do |activity|
raise '@context missing' unless activity[:@context].present?
raise 'id missing' unless activity[:id].present?
raise 'object missing' unless activity[:object].present?
end
end
end
end