From adfd6c78bbb583ea0dc8936358152d771771fded Mon Sep 17 00:00:00 2001 From: f Date: Sat, 16 Mar 2024 14:15:32 -0300 Subject: [PATCH] feat: procesar actividades en segundo plano MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit en lugar de hacerlo en el momento, respondemos lo más rápido posible a la social inbox, ya que un webhook fallado genera un error en la social inbox, que genera un error en la instancia remota, que va a volver a intentar muchas veces. ahora recibimos una vez, si falla el procesamiento, lo detenemos para que alguien humane actúe al respecto. --- .../v1/webhooks/social_inbox_controller.rb | 171 ++---------------- app/jobs/activity_pub/process_job.rb | 154 ++++++++++++++++ 2 files changed, 167 insertions(+), 158 deletions(-) create mode 100644 app/jobs/activity_pub/process_job.rb diff --git a/app/controllers/api/v1/webhooks/social_inbox_controller.rb b/app/controllers/api/v1/webhooks/social_inbox_controller.rb index 7b17c47b..6ac91a51 100644 --- a/app/controllers/api/v1/webhooks/social_inbox_controller.rb +++ b/app/controllers/api/v1/webhooks/social_inbox_controller.rb @@ -5,196 +5,51 @@ module Api module Webhooks # Recibe webhooks de la Social Inbox # - # @todo Mover todo a un Job que obtenga el objeto remoto antes de - # instanciar el objeto localmente en lugar de arreglarlo después y - # poder responder lo más rápido posible el webhook. # @see {https://www.w3.org/TR/activitypub/} class SocialInboxController < BaseController include Api::V1::Webhooks::Concerns::WebhookConcern + # Validar que el token sea correcto + before_action :usuarie + # Cuando una actividad ingresa en la cola de moderación, la # recibimos por acá # - # Vamos a recibir Create, Update, Delete, Follow, Undo y obtener - # el objeto dentro de cada una para guardar un estado asociado - # al sitio. + # Vamos a recibir Create, Update, Delete, Follow, Undo, + # Announce, Like y obtener el objeto dentro de cada una para + # guardar un estado asociado al sitio. # # El objeto del estado puede ser un objeto o une actore, # dependiendo de la actividad. def moderationqueued - # Devuelve un error si el token no es válido - usuarie.present? + process! :paused - ::ActivityPub.transaction do - - # Crea todos los registros necesarios y actualiza el estado - actor.present? - instance.present? - object.present? - activity_pub.present? - - activity.update_activity_pub_state! - end - rescue ActiveRecord::RecordInvalid => e - ExceptionNotifier.notify_exception(e, - data: { site: site.name, usuarie: usuarie.email, - activity: original_activity }) - ensure head :accepted end # Cuando la Social Inbox acepta una actividad, la recibimos # igual y la guardamos por si cambiamos de idea. - # - # @todo DRY def onapproved - ::ActivityPub.transaction do - actor.present? - instance.present? - object.present? - activity.present? - activity_pub.update(aasm_state: 'approved') - activity.update_activity_pub_state! - end + process! :approved head :accepted end # Cuando la Social Inbox rechaza una actividad, la recibimos # igual y la guardamos por si cambiamos de idea. - # - # @todo DRY def onrejected - ::ActivityPub.transaction do - actor.present? - instance.present? - object.present? - activity.present? - activity_pub.update(aasm_state: 'rejected') - end + process! :rejected head :accepted end private - # Si el objeto ya viene incorporado en la actividad o lo tenemos - # que traer remotamente. + # Envía la actividad para procesamiento por separado. # - # @return [Bool] - def object_embedded? - @object_embedded ||= original_activity[:object].is_a?(Hash) - end - - # Encuentra la URI del objeto o falla si no la encuentra. - # - # @return [String] - def object_uri - @object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object]) - ensure - raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank? - end - - # Atajo a la instancia - # - # @return [ActivityPub::Instance] - def instance - actor.instance - end - - # Genera un objeto a partir de la actividad. Si el objeto ya - # existe, actualiza su contenido. Si el objeto no viene - # incorporado, obtenemos el contenido más tarde. - # - # @return [ActivityPub::Object] - def object - @object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o| - # XXX: Si el objeto es una actividad, esto siempre va a ser - # Generic - o.type ||= 'ActivityPub::Object::Generic' - - if object_embedded? - o.content = original_object - begin - type = original_object[:type].presence - o.type = "ActivityPub::Object::#{type}".constantize if type - rescue NameError - end - end - - o.save! - - # XXX: el objeto necesita ser guardado antes de poder - # procesarlo. No usamos GlobalID porque el tipo de objeto - # cambia y produce un error de deserialización. - ::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded? - end - end - - # Genera el seguimiento del estado del objeto con respecto al - # sitio. - # - # @return [ActivityPub] - def activity_pub - @activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance, object_id: object.id, object_type: object.type) - end - - # Crea la actividad y la vincula con el estado - # - # @return [ActivityPub::Activity] - def activity - @activity ||= - ::ActivityPub::Activity - .type_from(original_activity) - .find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a| - a.content = original_activity.dup - a.content[:object] = object.uri - a.save! - end - end - - # Actor, si no hay instancia, la crea en el momento, junto con - # su estado de moderación. - # - # @return [Actor] - def actor - @actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| - unless a.instance - a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname) - - ::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance) - end - - site.instance_moderations.find_or_create_by(instance: a.instance) - - a.save! - - site.actor_moderations.find_or_create_by(actor: a) - - ::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a) - end - end - - # Descubre la actividad recibida, generando un error si la - # actividad no está dirigida a nosotres. - # - # @todo Validar formato - # @return [Hash] - def original_activity - @original_activity ||= FastJsonparser.parse(request.raw_post).tap do |activity| - raise '@context missing' unless activity[:@context].presence - raise 'id missing' unless activity[:id].presence - raise 'object missing' unless activity[:object].presence - rescue RuntimeError => e - raise ActiveRecord::RecordNotFound, e.message - end - end - - # @return [Hash,String] - def original_object - @original_object ||= original_activity[:object].dup.tap do |o| - o[:@context] = original_activity[:@context].dup - end + # @param initial_state [Symbol] + def process!(initial_state) + ::ActivityPub::ProcessJob.perform_later(site: site, body: request.raw_post, initial_state: initial_state) end end end diff --git a/app/jobs/activity_pub/process_job.rb b/app/jobs/activity_pub/process_job.rb new file mode 100644 index 00000000..6554b44d --- /dev/null +++ b/app/jobs/activity_pub/process_job.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +class ActivityPub + # Procesar las actividades a medida que llegan + class ProcessJob < ApplicationJob + attr_reader :body + + # Procesa la actividad en segundo plano + # + # @param :body [String] + # @param :initial_state [Symbol,String] + def perform(site:, body:, initial_state: :paused) + @body = body + @site = site + + ActiveRecord::Base.connection_pool.with_connection do + ::ActivityPub.transaction do + # Crea todos los registros necesarios y actualiza el estado + actor.present? + instance.present? + object.present? + activity_pub.present? + activity_pub.update(aasm_state: initial_state) + + activity.update_activity_pub_state! + end + end + # Al generar una excepción, en lugar de seguir intentando, enviamos + # el reporte. + rescue Exception => e + ExceptionNotifier.notify_exception(e, data: { site: site.name, activity: original_activity }) + end + + private + + # Si el objeto ya viene incorporado en la actividad o lo tenemos + # que traer remotamente. + # + # @return [Bool] + def object_embedded? + @object_embedded ||= original_activity[:object].is_a?(Hash) + end + + # Encuentra la URI del objeto o falla si no la encuentra. + # + # @return [String] + def object_uri + @object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object]) + ensure + raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank? + end + + # Atajo a la instancia + # + # @return [ActivityPub::Instance] + def instance + actor.instance + end + + # Genera un objeto a partir de la actividad. Si el objeto ya + # existe, actualiza su contenido. Si el objeto no viene + # incorporado, obtenemos el contenido más tarde. + # + # @return [ActivityPub::Object] + def object + @object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o| + # XXX: Si el objeto es una actividad, esto siempre va a ser + # Generic + o.type ||= 'ActivityPub::Object::Generic' + + if object_embedded? + o.content = original_object + begin + type = original_object[:type].presence + o.type = "ActivityPub::Object::#{type}".constantize if type + rescue NameError + end + end + + o.save! + + # XXX: el objeto necesita ser guardado antes de poder + # procesarlo. No usamos GlobalID porque el tipo de objeto + # cambia y produce un error de deserialización. + ::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded? + end + end + + # Genera el seguimiento del estado del objeto con respecto al + # sitio. + # + # @return [ActivityPub] + def activity_pub + @activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance, + object_id: object.id, object_type: object.type) + end + + # Crea la actividad y la vincula con el estado + # + # @return [ActivityPub::Activity] + def activity + @activity ||= + ::ActivityPub::Activity + .type_from(original_activity) + .find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a| + a.content = original_activity.dup + a.content[:object] = object.uri + a.save! + end + end + + # Actor, si no hay instancia, la crea en el momento, junto con + # su estado de moderación. + # + # @return [Actor] + def actor + @actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| + unless a.instance + a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname) + + ::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance) + end + + site.instance_moderations.find_or_create_by(instance: a.instance) + + a.save! + + site.actor_moderations.find_or_create_by(actor: a) + + ::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a) + end + end + + # @return [Hash,String] + def original_object + @original_object ||= original_activity[:object].dup.tap do |o| + o[:@context] = original_activity[:@context].dup + end + end + + # Descubre la actividad recibida, generando un error si la + # actividad no está dirigida a nosotres. + # + # @todo Validar formato con Dry::Schema + # @return [Hash] + def original_activity + @original_activity ||= FastJsonparser.parse(body).tap do |activity| + raise '@context missing' unless activity[:@context].present? + raise 'id missing' unless activity[:id].present? + raise 'object missing' unless activity[:object].present? + end + end + end +end