# frozen_string_literal: true class ActivityPub # Procesar las actividades a medida que llegan class ProcessJob < ApplicationJob attr_reader :body # Procesa la actividad en segundo plano # # @param :body [String] # @param :initial_state [Symbol,String] def perform(site:, body:, initial_state: :paused) @body = body @site = site ActiveRecord::Base.connection_pool.with_connection do ::ActivityPub.transaction do # Crea todos los registros necesarios y actualiza el estado actor.present? instance.present? object.present? activity_pub.present? activity_pub.update(aasm_state: initial_state) activity.update_activity_pub_state! end end # Al generar una excepción, en lugar de seguir intentando, enviamos # el reporte. rescue Exception => e ExceptionNotifier.notify_exception(e, data: { site: site.name, activity: original_activity }) end private # Si el objeto ya viene incorporado en la actividad o lo tenemos # que traer remotamente. # # @return [Bool] def object_embedded? @object_embedded ||= original_activity[:object].is_a?(Hash) end # Encuentra la URI del objeto o falla si no la encuentra. # # @return [String] def object_uri @object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object]) ensure raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank? end # Atajo a la instancia # # @return [ActivityPub::Instance] def instance actor.instance end # Genera un objeto a partir de la actividad. Si el objeto ya # existe, actualiza su contenido. Si el objeto no viene # incorporado, obtenemos el contenido más tarde. # # @return [ActivityPub::Object] def object @object ||= ::ActivityPub::Object.lock.find_or_initialize_by(uri: object_uri).tap do |o| o.content = original_object if object_embedded? o.save! # XXX: el objeto necesita ser guardado antes de poder # procesarlo. No usamos GlobalID porque el tipo de objeto # cambia y produce un error de deserialización. ::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded? end end # Genera el seguimiento del estado del objeto con respecto al # sitio. # # @return [ActivityPub] def activity_pub @activity_pub ||= site.activity_pubs.lock.find_or_create_by!(site: site, actor: actor, instance: instance, object_id: object.id, object_type: object.type) end # Crea la actividad y la vincula con el estado # # @return [ActivityPub::Activity] def activity @activity ||= ::ActivityPub::Activity .type_from(original_activity) .lock .find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a| a.content = original_activity.dup a.content[:object] = object.uri a.save! end end # Actor, si no hay instancia, la crea en el momento, junto con # su estado de moderación. # # @return [Actor] def actor @actor ||= ::ActivityPub::Actor.lock.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| unless a.instance a.instance = ::ActivityPub::Instance.lock.find_or_create_by(hostname: URI.parse(a.uri).hostname) ::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance) end site.instance_moderations.lock.find_or_create_by(instance: a.instance) a.save! site.actor_moderations.lock.find_or_create_by(actor: a) ::ActivityPub::FetchJob.perform_later(site: site, object_id: a.object.id) end end # @return [Hash,String] def original_object @original_object ||= original_activity[:object].dup.tap do |o| o[:@context] = original_activity[:@context].dup end end # Descubre la actividad recibida, generando un error si la # actividad no está dirigida a nosotres. # # @todo Validar formato con Dry::Schema # @return [Hash] def original_activity @original_activity ||= FastJsonparser.parse(body).tap do |activity| raise '@context missing' unless activity[:@context].present? raise 'id missing' unless activity[:id].present? raise 'object missing' unless activity[:object].present? end end end end