From 0ed702992b509cbd6d435474db050228cb88354f Mon Sep 17 00:00:00 2001 From: f Date: Tue, 19 Mar 2024 12:39:41 -0300 Subject: [PATCH] fix: prevenir objetos multiplicados! parece que la forma en que estabamos creando indices unicos ya no funciona (??) asi que a veces estabamos creando objetos duplicados en threads. de paso actorfetchjob ya no es necesario. closes #15621 closes #15622 closes #15623 closes #15729 closes #15730 closes #15731 --- app/jobs/activity_pub/actor_fetch_job.rb | 26 ---------- app/jobs/activity_pub/fetch_job.rb | 2 +- app/jobs/activity_pub/process_job.rb | 17 +++---- app/models/activity_pub/actor.rb | 2 +- ...240319144735_add_missing_unique_indexes.rb | 48 +++++++++++++++++++ db/structure.sql | 28 +++++++++-- 6 files changed, 82 insertions(+), 41 deletions(-) delete mode 100644 app/jobs/activity_pub/actor_fetch_job.rb create mode 100644 db/migrate/20240319144735_add_missing_unique_indexes.rb diff --git a/app/jobs/activity_pub/actor_fetch_job.rb b/app/jobs/activity_pub/actor_fetch_job.rb deleted file mode 100644 index 598ecd83..00000000 --- a/app/jobs/activity_pub/actor_fetch_job.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -# Obtiene o actualiza el contenido de un objeto, usando las credenciales -# del sitio. -# -# XXX: Esto usa las credenciales del sitio para volver el objeto -# disponible para todo el CMS. Asumimos que el objeto devuelto es el -# mismo para todo el mundo y las credenciales solo son para -# autenticación. -class ActivityPub - class ActorFetchJob < ApplicationJob - self.priority = 50 - - def perform(site:, actor:) - ActivityPub::Actor.transaction do - response = site.social_inbox.dereferencer.get(uri: actor.uri) - - # @todo Fallar cuando la respuesta no funcione? - return unless response.success? - return if response.miss? && actor.content.present? - - actor.object.update(content: FastJsonparser.parse(response.body)) - end - end - end -end diff --git a/app/jobs/activity_pub/fetch_job.rb b/app/jobs/activity_pub/fetch_job.rb index beb585ff..e0033439 100644 --- a/app/jobs/activity_pub/fetch_job.rb +++ b/app/jobs/activity_pub/fetch_job.rb @@ -30,7 +30,7 @@ class ActivityPub current_type = object.type content = FastJsonparser.parse(response.body) - object.update!(content: content, type: ActivityPub::Object.type_from(content).name) + object.lock.update!(content: content, type: ActivityPub::Object.type_from(content).name) object = ::ActivityPub::Object.find(object_id) # Actualiza la mención diff --git a/app/jobs/activity_pub/process_job.rb b/app/jobs/activity_pub/process_job.rb index 9b72be43..4e278797 100644 --- a/app/jobs/activity_pub/process_job.rb +++ b/app/jobs/activity_pub/process_job.rb @@ -63,7 +63,7 @@ class ActivityPub # # @return [ActivityPub::Object] def object - @object ||= ::ActivityPub::Object.find_or_initialize_by(uri: object_uri).tap do |o| + @object ||= ::ActivityPub::Object.lock.find_or_initialize_by(uri: object_uri).tap do |o| o.content = original_object if object_embedded? o.save! @@ -80,8 +80,8 @@ class ActivityPub # # @return [ActivityPub] def activity_pub - @activity_pub ||= site.activity_pubs.find_or_create_by!(site: site, actor: actor, instance: instance, - object_id: object.id, object_type: object.type) + @activity_pub ||= site.activity_pubs.lock.find_or_create_by!(site: site, actor: actor, instance: instance, + object_id: object.id, object_type: object.type) end # Crea la actividad y la vincula con el estado @@ -91,6 +91,7 @@ class ActivityPub @activity ||= ::ActivityPub::Activity .type_from(original_activity) + .lock .find_or_initialize_by(uri: original_activity[:id], activity_pub: activity_pub, actor: actor).tap do |a| a.content = original_activity.dup a.content[:object] = object.uri @@ -103,20 +104,20 @@ class ActivityPub # # @return [Actor] def actor - @actor ||= ::ActivityPub::Actor.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| + @actor ||= ::ActivityPub::Actor.lock.find_or_initialize_by(uri: original_activity[:actor]).tap do |a| unless a.instance - a.instance = ::ActivityPub::Instance.find_or_create_by(hostname: URI.parse(a.uri).hostname) + a.instance = ::ActivityPub::Instance.lock.find_or_create_by(hostname: URI.parse(a.uri).hostname) ::ActivityPub::InstanceFetchJob.perform_later(site: site, instance: a.instance) end - site.instance_moderations.find_or_create_by(instance: a.instance) + site.instance_moderations.lock.find_or_create_by(instance: a.instance) a.save! - site.actor_moderations.find_or_create_by(actor: a) + site.actor_moderations.lock.find_or_create_by(actor: a) - ::ActivityPub::ActorFetchJob.perform_later(site: site, actor: a) + ::ActivityPub::FetchJob.perform_later(site: site, actor: a.object) end end diff --git a/app/models/activity_pub/actor.rb b/app/models/activity_pub/actor.rb index a8cc0d5d..6a284025 100644 --- a/app/models/activity_pub/actor.rb +++ b/app/models/activity_pub/actor.rb @@ -33,7 +33,7 @@ class ActivityPub end def object - @object ||= ActivityPub::Object.find_or_initialize_by(uri: uri) + @object ||= ActivityPub::Object.lock.find_or_create_by(uri: uri) end def content diff --git a/db/migrate/20240319144735_add_missing_unique_indexes.rb b/db/migrate/20240319144735_add_missing_unique_indexes.rb new file mode 100644 index 00000000..7d18c8e8 --- /dev/null +++ b/db/migrate/20240319144735_add_missing_unique_indexes.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +# Parece que la sintaxis que veníamos usando para los índices únicos ya +# no es válida y por eso teníamos objetos duplicados. +class AddMissingUniqueIndexes < ActiveRecord::Migration[6.1] + def up + ActivityPub::Object.group(:uri).count.select { |_, v| v > 1 }.keys.each do |uri| + objects = ActivityPub::Object.where(uri: uri) + deleted_ids = objects[1..].map(&:delete).map(&:id) + + ActivityPub.where(object_id: deleted_ids).update_all(object_id: objects.first.id, updated_at: Time.now) + end + + ActivityPub::Actor.group(:uri).count.select { |_, v| v > 1 }.keys.each do |uri| + objects = ActivityPub::Actor.where(uri: uri) + deleted_ids = objects[1..].map(&:delete).map(&:id) + + ActivityPub.where(actor_id: deleted_ids).update_all(actor_id: objects.first.id, updated_at: Time.now) + ActorModeration.where(actor_id: deleted_ids).update_all(actor_id: objects.first.id, updated_at: Time.now) + ActivityPub::Activity.where(actor_id: deleted_ids).update_all(actor_id: objects.first.id, updated_at: Time.now) + ActivityPub::RemoteFlag.where(actor_id: deleted_ids).update_all(actor_id: objects.first.id, updated_at: Time.now) + end + + ActivityPub::Instance.group(:hostname).count.select { |_, v| v > 1 }.keys.each do |hostname| + objects = ActivityPub::Instance.where(hostname: hostname) + deleted_ids = objects[1..].map(&:delete).map(&:id) + + ActivityPub.where(instance_id: deleted_ids).update_all(instance_id: objects.first.id, updated_at: Time.now) + InstanceModeration.where(instance_id: deleted_ids).update_all(instance_id: objects.first.id, updated_at: Time.now) + ActivityPub::Actor.where(instance_id: deleted_ids).update_all(instance_id: objects.first.id, updated_at: Time.now) + end + + remove_index :activity_pub_instances, :hostname + remove_index :activity_pub_actors, :uri + add_index :activity_pub_instances, :hostname, unique: true + add_index :activity_pub_objects, :uri, unique: true + add_index :activity_pub_actors, :uri, unique: true + end + + def down + remove_index :activity_pub_instances, :hostname, unique: true + remove_index :activity_pub_objects, :uri, unique: true + remove_index :activity_pub_actors, :uri, unique: true + add_index :activity_pub_instances, :hostname + add_index :activity_pub_objects, :uri + add_index :activity_pub_actors, :uri + end +end diff --git a/db/structure.sql b/db/structure.sql index ed58ebec..21cf04d0 100644 --- a/db/structure.sql +++ b/db/structure.sql @@ -498,7 +498,8 @@ CREATE TABLE public.activity_pub_actors ( created_at timestamp(6) without time zone NOT NULL, updated_at timestamp(6) without time zone NOT NULL, instance_id uuid NOT NULL, - uri character varying NOT NULL + uri character varying NOT NULL, + mention character varying ); @@ -512,7 +513,7 @@ CREATE TABLE public.activity_pub_fediblocks ( updated_at timestamp(6) without time zone NOT NULL, title character varying NOT NULL, url character varying NOT NULL, - download_url character varying NOT NULL, + download_url character varying, format character varying NOT NULL, hostnames jsonb DEFAULT '[]'::jsonb ); @@ -557,6 +558,7 @@ CREATE TABLE public.activity_pub_remote_flags ( site_id bigint, actor_id uuid, message text, + content jsonb, aasm_state character varying DEFAULT 'waiting'::character varying NOT NULL ); @@ -2138,14 +2140,21 @@ CREATE INDEX index_activity_pub_actors_on_instance_id ON public.activity_pub_act -- Name: index_activity_pub_actors_on_uri; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX index_activity_pub_actors_on_uri ON public.activity_pub_actors USING btree (uri); +CREATE UNIQUE INDEX index_activity_pub_actors_on_uri ON public.activity_pub_actors USING btree (uri); -- -- Name: index_activity_pub_instances_on_hostname; Type: INDEX; Schema: public; Owner: - -- -CREATE INDEX index_activity_pub_instances_on_hostname ON public.activity_pub_instances USING btree (hostname); +CREATE UNIQUE INDEX index_activity_pub_instances_on_hostname ON public.activity_pub_instances USING btree (hostname); + + +-- +-- Name: index_activity_pub_objects_on_uri; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX index_activity_pub_objects_on_uri ON public.activity_pub_objects USING btree (uri); -- @@ -2701,6 +2710,15 @@ INSERT INTO "schema_migrations" (version) VALUES ('20240305164653'), ('20240305184854'), ('20240307201510'), -('20240307203039'); +('20240307203039'), +('20240313192134'), +('20240313204105'), +('20240314141536'), +('20240314153017'), +('20240314205923'), +('20240316203721'), +('20240318183846'), +('20240319124212'), +('20240319144735');