diff --git a/Gemfile b/Gemfile index 20ab04d5..765a1118 100644 --- a/Gemfile +++ b/Gemfile @@ -83,6 +83,7 @@ gem 'rubanok' gem 'after_commit_everywhere', '~> 1.0' gem 'aasm' +gem 'que-web' # database gem 'hairtrigger' diff --git a/Gemfile.lock b/Gemfile.lock index 5e7c5d52..1b176336 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -97,6 +97,7 @@ GEM ast (2.4.2) autoprefixer-rails (10.4.13.0) execjs (~> 2) + base64 (0.2.0) bcrypt (3.1.20-x86_64-linux-musl) bcrypt_pbkdf (1.1.0-x86_64-linux-musl) benchmark-ips (2.12.0) @@ -371,6 +372,8 @@ GEM i18n (>= 0.6.10, < 2) request_store (~> 1.0) multi_xml (0.6.0) + mustermann (3.0.0) + ruby2_keywords (~> 0.0.1) net-imap (0.4.9) date net-protocol @@ -410,12 +413,18 @@ GEM pundit (2.3.1) activesupport (>= 3.0.0) que (2.2.1) + que-web (0.10.0) + que (>= 1) + sinatra racc (1.7.3-x86_64-linux-musl) rack (2.2.8) rack-cors (2.0.1) rack (>= 2.0.0) rack-mini-profiler (3.1.0) rack (>= 1.2.0) + rack-protection (3.2.0) + base64 (>= 0.1.0) + rack (~> 2.2, >= 2.2.4) rack-proxy (0.7.7) rack rack-test (2.1.0) @@ -514,6 +523,7 @@ GEM ruby-statistics (3.0.2) ruby-vips (2.2.0) ffi (~> 1.12) + ruby2_keywords (0.0.5) ruby2ruby (2.5.0) ruby_parser (~> 3.1) sexp_processor (~> 4.6) @@ -540,6 +550,11 @@ GEM sexp_processor (4.17.0) simpleidn (0.2.1) unf (~> 0.1.4) + sinatra (3.2.0) + mustermann (~> 3.0) + rack (~> 2.2, >= 2.2.4) + rack-protection (= 3.2.0) + tilt (~> 2.0) sourcemap (0.1.1) spring (4.1.1) spring-watcher-listen (2.1.0) @@ -627,7 +642,7 @@ DEPENDENCIES devise devise-i18n devise_invitable - distributed-press-api-client (~> 0.4.0) + distributed-press-api-client (~> 0.4.1) dotenv-rails down ed25519 @@ -671,6 +686,7 @@ DEPENDENCIES puma pundit que + que-web rack-cors rack-mini-profiler rails (~> 6.1.0) diff --git a/app/controllers/api/v1/webhooks/social_inbox_controller.rb b/app/controllers/api/v1/webhooks/social_inbox_controller.rb index 7b17c47b..6ac91a51 100644 --- a/app/controllers/api/v1/webhooks/social_inbox_controller.rb +++ b/app/controllers/api/v1/webhooks/social_inbox_controller.rb @@ -5,196 +5,51 @@ module Api module Webhooks # Recibe webhooks de la Social Inbox # - # @todo Mover todo a un Job que obtenga el objeto remoto antes de - # instanciar el objeto localmente en lugar de arreglarlo después y - # poder responder lo más rápido posible el webhook. # @see {https://www.w3.org/TR/activitypub/} class SocialInboxController < BaseController include Api::V1::Webhooks::Concerns::WebhookConcern + # Validar que el token sea correcto + before_action :usuarie + # Cuando una actividad ingresa en la cola de moderación, la # recibimos por acá # - # Vamos a recibir Create, Update, Delete, Follow, Undo y obtener - # el objeto dentro de cada una para guardar un estado asociado - # al sitio. + # Vamos a recibir Create, Update, Delete, Follow, Undo, + # Announce, Like y obtener el objeto dentro de cada una para + # guardar un estado asociado al sitio. # # El objeto del estado puede ser un objeto o une actore, # dependiendo de la actividad. def moderationqueued - # Devuelve un error si el token no es válido - usuarie.present? + process! :paused - ::ActivityPub.transaction do - - # Crea todos los registros necesarios y actualiza el estado - actor.present? - instance.present? - object.present? - activity_pub.present? - - activity.update_activity_pub_state! - end - rescue ActiveRecord::RecordInvalid => e - ExceptionNotifier.notify_exception(e, - data: { site: site.name, usuarie: usuarie.email, - activity: original_activity }) - ensure head :accepted end # Cuando la Social Inbox acepta una actividad, la recibimos # igual y la guardamos por si cambiamos de idea. - # - # @todo DRY def onapproved - ::ActivityPub.transaction do - actor.present? - instance.present? - object.present? - activity.present? - activity_pub.update(aasm_state: 'approved') - activity.update_activity_pub_state! - end + process! :approved head :accepted end # Cuando la Social Inbox rechaza una actividad, la recibimos # igual y la guardamos por si cambiamos de idea. - # - # @todo DRY def onrejected - ::ActivityPub.transaction do - actor.present? - instance.present? - object.present? - activity.present? - activity_pub.update(aasm_state: 'rejected') - end + process! :rejected head :accepted end private - # Si el objeto ya viene incorporado en la actividad o lo tenemos - # que traer remotamente. + # Envía la actividad para procesamiento por separado. # - # @return [Bool] - def object_embedded? - @object_embedded ||= original_activity[:object].is_a?(Hash) - end - - # Encuentra la URI del objeto o falla si no la encuentra. - # - # @return [String] - def object_uri - @object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object]) - ensure - raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank? - end - - # Atajo a la instancia - # - # @return [ActivityPub::Instance] - def instance - actor.instance - end - - # Genera un objeto a partir de la actividad. Si el objeto ya - # existe, actualiza su contenido. Si el objeto no viene - # incorporado, obtenemos el contenido más tarde. - # - # @return [ActivityPub::Object] - def object - @object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o| - # XXX: Si el objeto es una actividad, esto siempre va a ser - # Generic - o.type ||= 'ActivityPub::Object::Generic' - - if object_embedded? - o.content = original_object - begin - type = original_object[:type].presence - o.type = "ActivityPub::Object::#{type}".constantize if type - rescue NameError - end - end - - o.save! - - # XXX: el objeto necesita ser guardado antes de poder - # procesarlo. No usamos GlobalID porque el tipo de objeto - # cambia y produce un error de deserialización. - ::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded? - end - end - - # Genera el seguimiento del estado del objeto con respecto al - # sitio. - # - # @return [ActivityPub] - def activity_pub - @activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance, object_id: object.id, object_type: object.type) - end - - # Crea la actividad y la vincula con el estado - # - # @return [ActivityPub::Activity] - def activity - @activity ||= - ::ActivityPub::Activity - .type_from(original_activity) - .find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a| - a.content = original_activity.dup - a.content[:object] = object.uri - a.save! - end - end - - # Actor, si no hay instancia, la crea en el momento, junto con - # su estado de moderación. - # - # @return [Actor] - def actor - @actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| - unless a.instance - a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname) - - ::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance) - end - - site.instance_moderations.find_or_create_by(instance: a.instance) - - a.save! - - site.actor_moderations.find_or_create_by(actor: a) - - ::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a) - end - end - - # Descubre la actividad recibida, generando un error si la - # actividad no está dirigida a nosotres. - # - # @todo Validar formato - # @return [Hash] - def original_activity - @original_activity ||= FastJsonparser.parse(request.raw_post).tap do |activity| - raise '@context missing' unless activity[:@context].presence - raise 'id missing' unless activity[:id].presence - raise 'object missing' unless activity[:object].presence - rescue RuntimeError => e - raise ActiveRecord::RecordNotFound, e.message - end - end - - # @return [Hash,String] - def original_object - @original_object ||= original_activity[:object].dup.tap do |o| - o[:@context] = original_activity[:@context].dup - end + # @param initial_state [Symbol] + def process!(initial_state) + ::ActivityPub::ProcessJob.perform_later(site: site, body: request.raw_post, initial_state: initial_state) end end end diff --git a/app/jobs/activity_pub/process_job.rb b/app/jobs/activity_pub/process_job.rb new file mode 100644 index 00000000..6554b44d --- /dev/null +++ b/app/jobs/activity_pub/process_job.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +class ActivityPub + # Procesar las actividades a medida que llegan + class ProcessJob < ApplicationJob + attr_reader :body + + # Procesa la actividad en segundo plano + # + # @param :body [String] + # @param :initial_state [Symbol,String] + def perform(site:, body:, initial_state: :paused) + @body = body + @site = site + + ActiveRecord::Base.connection_pool.with_connection do + ::ActivityPub.transaction do + # Crea todos los registros necesarios y actualiza el estado + actor.present? + instance.present? + object.present? + activity_pub.present? + activity_pub.update(aasm_state: initial_state) + + activity.update_activity_pub_state! + end + end + # Al generar una excepción, en lugar de seguir intentando, enviamos + # el reporte. + rescue Exception => e + ExceptionNotifier.notify_exception(e, data: { site: site.name, activity: original_activity }) + end + + private + + # Si el objeto ya viene incorporado en la actividad o lo tenemos + # que traer remotamente. + # + # @return [Bool] + def object_embedded? + @object_embedded ||= original_activity[:object].is_a?(Hash) + end + + # Encuentra la URI del objeto o falla si no la encuentra. + # + # @return [String] + def object_uri + @object_uri ||= ::ActivityPub.uri_from_object(original_activity[:object]) + ensure + raise ActiveRecord::RecordNotFound, 'object id missing' if @object_uri.blank? + end + + # Atajo a la instancia + # + # @return [ActivityPub::Instance] + def instance + actor.instance + end + + # Genera un objeto a partir de la actividad. Si el objeto ya + # existe, actualiza su contenido. Si el objeto no viene + # incorporado, obtenemos el contenido más tarde. + # + # @return [ActivityPub::Object] + def object + @object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o| + # XXX: Si el objeto es una actividad, esto siempre va a ser + # Generic + o.type ||= 'ActivityPub::Object::Generic' + + if object_embedded? + o.content = original_object + begin + type = original_object[:type].presence + o.type = "ActivityPub::Object::#{type}".constantize if type + rescue NameError + end + end + + o.save! + + # XXX: el objeto necesita ser guardado antes de poder + # procesarlo. No usamos GlobalID porque el tipo de objeto + # cambia y produce un error de deserialización. + ::ActivityPub::FetchJob.perform_later(site: site, object_id: o.id) unless object_embedded? + end + end + + # Genera el seguimiento del estado del objeto con respecto al + # sitio. + # + # @return [ActivityPub] + def activity_pub + @activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance, + object_id: object.id, object_type: object.type) + end + + # Crea la actividad y la vincula con el estado + # + # @return [ActivityPub::Activity] + def activity + @activity ||= + ::ActivityPub::Activity + .type_from(original_activity) + .find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a| + a.content = original_activity.dup + a.content[:object] = object.uri + a.save! + end + end + + # Actor, si no hay instancia, la crea en el momento, junto con + # su estado de moderación. + # + # @return [Actor] + def actor + @actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| + unless a.instance + a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname) + + ::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance) + end + + site.instance_moderations.find_or_create_by(instance: a.instance) + + a.save! + + site.actor_moderations.find_or_create_by(actor: a) + + ::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a) + end + end + + # @return [Hash,String] + def original_object + @original_object ||= original_activity[:object].dup.tap do |o| + o[:@context] = original_activity[:@context].dup + end + end + + # Descubre la actividad recibida, generando un error si la + # actividad no está dirigida a nosotres. + # + # @todo Validar formato con Dry::Schema + # @return [Hash] + def original_activity + @original_activity ||= FastJsonparser.parse(body).tap do |activity| + raise '@context missing' unless activity[:@context].present? + raise 'id missing' unless activity[:id].present? + raise 'object missing' unless activity[:object].present? + end + end + end +end diff --git a/app/views/components/_btn_base.haml b/app/views/components/_btn_base.haml index f9227482..faa5c85f 100644 --- a/app/views/components/_btn_base.haml +++ b/app/views/components/_btn_base.haml @@ -3,7 +3,7 @@ - local_assigns[:method] ||= 'patch' - local_assigns[:class] ||= 'btn-secondary' - local_assigns[:class] = "btn #{local_assigns[:class]}" +- local_assigns.delete(:text) --# @todo path es obligatorio -= button_to local_assigns[:path], **local_assigns.compact do += button_to(path, **local_assigns.compact) do = text diff --git a/app/views/components/_dropdown_button.haml b/app/views/components/_dropdown_button.haml index c0f12754..d6de6c8e 100644 --- a/app/views/components/_dropdown_button.haml +++ b/app/views/components/_dropdown_button.haml @@ -1,4 +1,6 @@ -# @param name [String] @param value [String] -%button.dropdown-item{type: 'submit', data: { target: 'dropdown.item' }, name: name, value: value, **local_assigns.compact } + @param text [String] +- local_assigns.delete(:text) +%button.dropdown-item{type: 'submit', data: { target: 'dropdown.item' }, name: name, value: value, **local_assigns.compact }= text diff --git a/config/initializers/que_web.rb b/config/initializers/que_web.rb new file mode 100644 index 00000000..192256db --- /dev/null +++ b/config/initializers/que_web.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +Que::Web.use(Rack::Auth::Basic) do |user, password| + [user, password] == [ENV['HTTP_BASIC_USER'], ENV['HTTP_BASIC_PASSWORD']] +end diff --git a/config/routes.rb b/config/routes.rb index 054b7f4d..4d43d66a 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -4,6 +4,9 @@ Rails.application.routes.draw do devise_for :usuaries get '/.well-known/change-password', to: redirect('/usuaries/edit') + require 'que/web' + mount Que::Web => '/que' + root 'application#index' constraints(Constraints::ApiSubdomain.new) do