- Refactored clearbit enrichment backend to use dedicated objects for syncing and extracted mapping logic to ExternalSync.

- Replaced String keys with Symbols to reduce memory load.
- Added tests 🚀
- Added basic Group factory. Groups simplicity is perfect for usage in tests which need a simple model.
This commit is contained in:
Thorsten Eckel 2017-03-09 11:32:05 +01:00
parent aa6ba57cde
commit bd690f6039
7 changed files with 586 additions and 314 deletions

View file

@ -2,4 +2,83 @@
class ExternalSync < ApplicationModel
store :last_payload
class << self
def changed?(object:, previous_changes: {}, current_changes:)
changed = false
previous_changes ||= {}
current_changes.each { |attribute, value|
next if !object.attributes.key?(attribute.to_s)
next if object[attribute] == value
next if object[attribute].present? && object[attribute] != previous_changes[attribute]
begin
object[attribute] = value
changed ||= true
rescue => e
Rails.logger.error "ERROR: Unable to assign attribute #{attribute} to object #{object.class.name}: #{e.inspect}"
end
}
changed
end
def map(mapping: {}, source:)
information_source = if source.is_a?(Hash)
source.deep_symbolize_keys
else
source.clone
end
result = {}
mapping.each { |remote_key, local_key|
local_key_sym = local_key.to_sym
next if result[local_key_sym].present?
value = extract(remote_key, information_source)
next if value.blank?
result[local_key_sym] = value
}
result
end
private
def extract(remote_key, structure)
return if !structure
information_source = structure.clone
result = nil
information_path = remote_key.split('.')
information_path.each do |segment|
segment_sym = segment.to_sym
if information_source.is_a?(Hash)
value = information_source[segment_sym]
elsif information_source.respond_to?(segment_sym)
# prevent accessing non-attributes (e.g. destroy)
if information_source.respond_to?(:attributes)
break if !information_source.attributes.key?(segment)
end
value = information_source.send(segment_sym)
end
break if !value
storable = value.class.ancestors.any? do |ancestor|
%w(String Integer Float Bool).include?(ancestor.to_s)
end
if storable
result = value
break
end
information_source = value
end
result
end
end
end

View file

@ -25,10 +25,8 @@ class Transaction::ClearbitEnrichment
# return if we run import mode
return if Setting.get('import_mode')
return if @item[:object] != 'User'
return if @item[:type] != 'create'
return if !Setting.get('clearbit_integration')
config = Setting.get('clearbit_config')
@ -38,317 +36,10 @@ class Transaction::ClearbitEnrichment
user = User.lookup(id: @item[:object_id])
return if !user
Transaction::ClearbitEnrichment.sync_user(user)
end
=begin
sync all users against clearbit
Transaction::ClearbitEnrichment.sync
=end
def self.sync
users = User.of_role(Role.signup_roles)
users.each { |user|
sync_user(user)
}
end
=begin
sync one users against clearbit
Transaction::ClearbitEnrichment.sync_user(user)
users = [...]
users.each {|user|
Transaction::ClearbitEnrichment.sync_user(user)
}
=end
def self.sync_user(user)
UserInfo.current_user_id = 1
return if user.email.empty?
data = fetch(user.email)
#p 'OO: ' + data.inspect
return if !data
config = Setting.get('clearbit_config')
return if !config
# get new user sync attributes
user_sync = config['user_sync']
user_sync_values = {}
if user_sync
user_sync.each { |callback, destination|
next if !user_sync_values[destination].empty?
value = _replace(callback, data)
next if !value
user_sync_values[destination] = value
}
end
# get new organization sync attributes
organization_sync = config['organization_sync']
organization_sync_values = {}
if organization_sync
organization_sync.each { |callback, destination|
next if !organization_sync_values[destination].empty?
value = _replace(callback, data)
next if !value
organization_sync_values[destination] = value
}
end
# get latest user synced attributes
external_syn_user = nil
user_sync_values_last_time = {}
if data && data['person'] && data['person']['id']
external_syn_user = ExternalSync.find_by(
source: 'clearbit',
source_id: data['person']['id'],
object: 'User',
o_id: user.id,
)
if external_syn_user && external_syn_user.last_payload
user_sync.each { |callback, destination|
next if !user_sync_values_last_time[destination].empty?
value = _replace(callback, external_syn_user.last_payload)
next if !value
user_sync_values_last_time[destination] = value
}
end
end
# if person record exists
user_has_changed = false
user_sync_values.each { |destination, value|
attribute = destination.sub(/^user\./, '')
next if user[attribute] == value
next if !user[attribute].empty? && user_sync_values_last_time[destination] != user[attribute]
begin
user[attribute] = value
rescue => e
Rails.logger.error "ERROR: Unable to assign user.#{attribute}: #{e.inspect}"
end
user_has_changed = true
}
if user_has_changed
user.updated_by_id = 1
if data['person'] && data['person']['id']
if external_syn_user
external_syn_user.last_payload = data
external_syn_user.save
else
external_syn_user = ExternalSync.create(
source: 'clearbit',
source_id: data['person']['id'],
object: 'User',
o_id: user.id,
last_payload: data,
)
end
end
end
# if no company record exists or no organization should be created
if !data['company'] || config['organization_autocreate'] != true
if user_has_changed
user.save
end
Observer::Transaction.commit
return
end
# if company record exists
external_syn_organization = ExternalSync.find_by(
source: 'clearbit',
source_id: data['company']['id'],
)
# create new organization
if !external_syn_organization
# if organization is already assigned, do not create a new one
if user.organization_id
if user_has_changed
user.save
Observer::Transaction.commit
end
return
end
# can't create organization without name
if organization_sync_values['organization.name'].empty?
Observer::Transaction.commit
return
end
# find by name
organization = Organization.find_by(name: organization_sync_values['organization.name'])
# create new organization
if !organization
organization = Organization.new(
shared: config['organization_shared'],
)
organization_sync_values.each { |destination, value|
attribute = destination.sub(/^organization\./, '')
next if !organization[attribute].empty?
begin
organization[attribute] = value
rescue => e
Rails.logger.error "ERROR: Unable to assign organization.#{attribute}: #{e.inspect}"
end
}
organization.save
end
ExternalSync.create(
source: 'clearbit',
source_id: data['company']['id'],
object: 'Organization',
o_id: organization.id,
last_payload: data,
)
# assign new organization to user
if !user.organization_id
user.organization_id = organization.id
user.save
end
Observer::Transaction.commit
return
end
# get latest organization synced attributes
organization_sync_values_last_time = {}
if external_syn_organization && external_syn_organization.last_payload
organization_sync.each { |callback, destination|
next if !organization_sync_values_last_time[destination].empty?
value = _replace(callback, external_syn_organization.last_payload)
next if !value
organization_sync_values_last_time[destination] = value
}
end
# update existing organization
organization = Organization.find(external_syn_organization[:o_id])
organization_has_changed = false
organization_sync_values.each { |destination, value|
attribute = destination.sub(/^organization\./, '')
next if organization[attribute] == value
next if !organization[attribute].empty? && organization_sync_values_last_time[destination] != organization[attribute]
begin
organization[attribute] = value
rescue => e
Rails.logger.error "ERROR: Unable to assign organization.#{attribute}: #{e.inspect}"
end
organization_has_changed = true
}
if organization_has_changed
organization.updated_by_id = 1
organization.save
external_syn_organization.last_payload = data
external_syn_organization.save
end
# assign new organization to user
if !user.organization_id
user_has_changed = true
user.organization_id = organization.id
end
if user_has_changed
user.save
end
user_enrichment = Enrichment::Clearbit::User.new(user)
return if !user_enrichment.synced?
Observer::Transaction.commit
true
end
def self._replace(callback, data)
object_name = nil
object_method = nil
placeholder = nil
if callback =~ /\A ( [\w]+ )\.( [\w\.]+ ) \z/x
object_name = $1
object_method = $2
end
return if !data
return if !data[object_name]
# do validaton, ignore some methodes
if callback =~ /(`|\.(|\s*)(save|destroy|delete|remove|drop|update\(|update_att|create\(|new|all|where|find))/i
placeholder = "#{callback} (not allowed)"
# get value based on object_name and object_method
elsif object_name && object_method
object_refs = data[object_name]
object_methods = object_method.split('.')
object_methods_s = ''
object_methods.each { |method|
if object_methods_s != ''
object_methods_s += '.'
end
object_methods_s += method
# if method exists
break if !object_refs.respond_to?(method.to_sym) && !object_refs[method]
object_refs = if object_refs.respond_to?(method.to_sym)
object_refs.send(method.to_sym)
else
object_refs[method]
end
}
if object_refs.class == String
placeholder = object_refs
end
end
placeholder
end
def self.fetch(email)
if !Rails.env.production?
filename = "#{Rails.root}/test/fixtures/clearbit/#{email}.json"
if File.exist?(filename)
data = IO.binread(filename)
return JSON.parse(data) if data
end
end
config = Setting.get('clearbit_config')
return if !config
return if config['api_key'].empty?
record = {
direction: 'out',
facility: 'clearbit',
url: "clearbit -> #{email}",
status: 200,
ip: nil,
request: { content: email },
response: {},
method: 'GET',
}
begin
Clearbit.key = config['api_key']
result = Clearbit::Enrichment.find(email: email, stream: true)
record[:response] = { code: 200, content: result.to_s }
rescue => e
record[:status] = 500
record[:response] = { code: 500, content: e.inspect }
end
HttpLog.create(record)
result
end
end

View file

@ -0,0 +1,144 @@
module Enrichment
module Clearbit
class Organization
def initialize(user:, payload:)
@user = user
@payload = payload
@source = 'clearbit'
@config = Setting.get('clearbit_config')
@object = 'Organization'
end
def synced?
return false if !@config
# TODO
UserInfo.current_user_id = 1
return false if !mapping?
return false if !changes?
# create new organization
return organization_created? if !remote_id? || !external_found?
# update existing organization
organization = existing_organization
return true if @user.organization_id
# assign new organization to user
update_user(organization)
end
private
def mapping?
@mapping = @config['organization_sync'].dup
return false if @mapping.blank?
# TODO: Refactoring:
# Currently all target keys are prefixed with
# organization.
# which is not necessary since the target object
# is allways an organization
@mapping.transform_values! { |value| value.sub('organization.', '') }
true
end
def changes?
@current_changes = ExternalSync.map(
mapping: @mapping,
source: @payload
)
@current_changes.present?
end
def remote_id?
return if !@payload['company']
@remote_id = @payload['company']['id']
end
def external_found?
return true if @external_organization
@external_organization = ExternalSync.find_by(
source: @source,
source_id: @remote_id,
object: @object,
)
@external_organization.present?
end
def organization_created?
# if organization is already assigned, do not create a new one
return false if @user.organization_id
# can't create organization without name
return false if @current_changes[:name].empty?
organization = create_current
# assign new organization to user
update_user(organization)
end
def create_current
organization = ::Organization.find_by(name: @current_changes[:name])
return organization if organization
organization = ::Organization.new(
shared: @config['organization_shared'],
)
return organization if !ExternalSync.changed?(
object: organization,
current_changes: @current_changes,
)
organization.save
ExternalSync.create(
source: @source,
source_id: @remote_id,
object: @object,
o_id: organization.id,
last_payload: @payload,
)
organization
end
def load_previous_changes
last_payload = @external_organization.last_payload
return if !last_payload
@previous_changes = ExternalSync.map(
mapping: @mapping,
source: last_payload
)
end
def existing_organization
load_previous_changes
organization = ::Organization.find(@external_organization[:o_id])
return organization if !ExternalSync.changed?(
object: organization,
previous_changes: @previous_changes,
current_changes: @current_changes,
)
organization.updated_by_id = 1
organization.save
@external_organization.last_payload = @payload
@external_organization.save
organization
end
def update_user(organization)
@user.organization_id = organization.id
true
end
end
end
end

View file

@ -0,0 +1,166 @@
module Enrichment
module Clearbit
class User
def initialize(user)
@local_user = user
@source = 'clearbit'
@config = Setting.get('clearbit_config')
end
def synced?
return false if !@config
return false if @local_user.email.blank?
# TODO
UserInfo.current_user_id = 1
return false if !mapping?
payload = fetch
return false if !payload
attributes_changed = attributes_changed?(payload)
organization_synced = false
if payload['company'] && @config['organization_autocreate']
organization = Enrichment::Clearbit::Organization.new(
user: @local_user,
payload: payload
)
organization_synced = organization.synced?
end
return false if !attributes_changed && !organization_synced
@local_user.save if attributes_changed || organization_synced
true
end
private
def mapping?
@mapping = @config['user_sync'].dup
return false if @mapping.blank?
# TODO: Refactoring:
# Currently all target keys are prefixed with
# user.
# which is not necessary since the target object
# is allways an user
@mapping.transform_values! { |value| value.sub('user.', '') }
true
end
def load_remote(data)
return if !remote_id?(data)
return if !external_found?
load_previous_changes
end
def remote_id?(data)
return if !data
return if !data['person']
@remote_id = data['person']['id']
end
def external_found?
return true if @external_user
@external_user = ExternalSync.find_by(
source: @source,
source_id: @remote_id,
object: @local_user.class.name,
o_id: @local_user.id,
)
@external_user.present?
end
def load_previous_changes
last_payload = @external_user.last_payload
return if !last_payload
@previous_changes = ExternalSync.map(
mapping: @mapping,
source: last_payload
)
end
def attributes_changed?(payload)
current_changes = ExternalSync.map(
mapping: @mapping,
source: payload
)
return false if !current_changes
previous_changes = load_remote(payload)
return false if !ExternalSync.changed?(
object: @local_user,
previous_changes: previous_changes,
current_changes: current_changes,
)
@local_user.updated_by_id = 1
return true if !@remote_id
store_current(payload)
true
end
def store_current(payload)
if !external_found?
@external_user = ExternalSync.new(
source: @source,
source_id: @remote_id,
object: @local_user.class.name,
o_id: @local_user.id,
)
end
@external_user.last_payload = payload
@external_user.save
end
def fetch
if !Rails.env.production?
filename = "#{Rails.root}/test/fixtures/clearbit/#{@local_user.email}.json"
if File.exist?(filename)
data = IO.binread(filename)
return JSON.parse(data) if data
end
end
return if @config['api_key'].empty?
record = {
direction: 'out',
facility: 'clearbit',
url: "clearbit -> #{@local_user.email}",
status: 200,
ip: nil,
request: { content: @local_user.email },
response: {},
method: 'GET',
}
begin
::Clearbit.key = @config['api_key']
result = ::Clearbit::Enrichment.find(email: @local_user.email, stream: true)
record[:response] = { code: 200, content: result.to_s }
rescue => e
record[:status] = 500
record[:response] = { code: 500, content: e.inspect }
end
HttpLog.create(record)
result
end
class << self
def all
users = User.of_role(Role.signup_roles)
users.each { |user|
new(user).synced?
}
end
end
end
end
end

175
spec/external_sync_spec.rb Normal file
View file

@ -0,0 +1,175 @@
require 'rails_helper'
RSpec.describe ExternalSync do
context '#changed?' do
it 'keeps ActiveRecord instance unchanged on local but no remote changes' do
object = create(:group)
previous_changes = { name: 'Changed' }
current_changes = previous_changes.dup
result = described_class.changed?(
object: object,
previous_changes: previous_changes,
current_changes: current_changes,
)
expect(result).to be false
expect(object.changed?).to be false
end
it 'keeps ActiveRecord instance unchanged on local and remote changes' do
object = create(:group)
previous_changes = { name: 'Initial' }
current_changes = { name: 'Changed' }
result = described_class.changed?(
object: object,
previous_changes: previous_changes,
current_changes: current_changes,
)
expect(result).to be false
expect(object.changed?).to be false
end
it 'changes ActiveRecord instance attribute(s) for remote changes' do
object = create(:group)
previous_changes = { name: object.name }
current_changes = { name: 'Changed' }
result = described_class.changed?(
object: object,
previous_changes: previous_changes,
current_changes: current_changes,
)
expect(result).to be true
expect(object.changed?).to be true
end
it 'prevents ActiveRecord method calls' do
object = create(:group)
previous_changes = { name: object.name }
current_changes = { destroy: 'Changed' }
result = described_class.changed?(
object: object,
previous_changes: previous_changes,
current_changes: current_changes,
)
expect(result).to be false
expect(object.changed?).to be false
expect(object.destroyed?).to be false
end
end
context '#map' do
it 'maps to symbol keys' do
mapping = {
'key' => 'key'
}
source = {
'key' => 'value'
}
result = {
key: 'value'
}
expect(described_class.map(mapping: mapping, source: source)).to eq(result)
end
it 'resolves deep structures' do
mapping = {
'sub.structure.key' => 'key',
}
source = {
'sub' => {
'structure' => {
'key' => 'value'
}
}
}
result = {
key: 'value'
}
expect(described_class.map(mapping: mapping, source: source)).to eq(result)
# check if sub structure is untouched
expect(source['sub'].key?('structure')).to be true
end
it 'skips irrelevant keys' do
mapping = {
'key' => 'key'
}
source = {
'key' => 'value',
'skipped' => 'skipped'
}
result = {
key: 'value'
}
expect(described_class.map(mapping: mapping, source: source)).to eq(result)
end
it 'can handle object instances' do
mapping = {
'name' => 'key'
}
source = double(name: 'value')
result = {
key: 'value'
}
expect(described_class.map(mapping: mapping, source: source)).to eq(result)
end
it 'can handle ActiveRecord instances' do
mapping = {
'name' => 'key'
}
source = create(:group, name: 'value')
result = {
key: 'value'
}
expect(described_class.map(mapping: mapping, source: source)).to eq(result)
end
it 'prevents ActiveRecord method calls' do
mapping = {
'name' => 'key',
'destroy' => 'evil'
}
source = create(:group, name: 'value')
result = {
key: 'value'
}
expect(described_class.map(mapping: mapping, source: source)).to eq(result)
expect(source.destroyed?).to be false
end
end
end

14
spec/factories/group.rb Normal file
View file

@ -0,0 +1,14 @@
FactoryGirl.define do
sequence :test_group_name do |n|
"TestGroup#{n}"
end
end
FactoryGirl.define do
factory :group do
name { generate(:test_group_name) }
created_by_id 1
updated_by_id 1
end
end

View file

@ -110,7 +110,8 @@ class ClearbitTest < ActiveSupport::TestCase
assert_equal('changed by my self', customer2_lookup.note)
assert_equal('Norsk-Data-Straße 1, 61352 Bad Homburg vor der Höhe, Germany', customer2_lookup.address)
Transaction::ClearbitEnrichment.sync_user(customer2)
customer2_enrichment = Enrichment::Clearbit::User.new(customer2)
customer2_enrichment.synced?
Scheduler.worker(true)
customer2_lookup = User.lookup(id: customer2.id)
@ -126,7 +127,8 @@ class ClearbitTest < ActiveSupport::TestCase
note: 'changed by my self',
)
Transaction::ClearbitEnrichment.sync_user(customer2)
customer2_enrichment = Enrichment::Clearbit::User.new(customer2)
customer2_enrichment.synced?
Scheduler.worker(true)
customer2_lookup = User.lookup(id: customer2.id)
@ -141,7 +143,8 @@ class ClearbitTest < ActiveSupport::TestCase
email: 'me2@example.com',
)
Transaction::ClearbitEnrichment.sync_user(customer2)
customer2_enrichment = Enrichment::Clearbit::User.new(customer2)
customer2_enrichment.synced?
Scheduler.worker(true)
customer2_lookup = User.lookup(id: customer2.id)