From 429b95e65dadcb015b0d86e88c9a5bfdf59fae1b Mon Sep 17 00:00:00 2001 From: Thorsten Eckel Date: Mon, 16 Sep 2019 11:15:12 +0200 Subject: [PATCH] Maintenance: Make ES connection establishment more stable by: - performing a retry when Net::OpenTimeout exceptions occur - cache the result of ES version check --- lib/search_index_backend.rb | 86 +++++++++++++++++++--------------- lib/tasks/search_index_es.rake | 12 +++-- lib/user_agent.rb | 45 +++++++++++++----- 3 files changed, 87 insertions(+), 56 deletions(-) diff --git a/lib/search_index_backend.rb b/lib/search_index_backend.rb index 1c8dfac71..cbb346678 100644 --- a/lib/search_index_backend.rb +++ b/lib/search_index_backend.rb @@ -19,11 +19,12 @@ info about used search index machine url, {}, { - json: true, - open_timeout: 8, - read_timeout: 14, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 8, + read_timeout: 14, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) Rails.logger.info "# #{response.code}" @@ -82,11 +83,12 @@ update processors response = UserAgent.delete( url, { - json: true, - open_timeout: 8, - read_timeout: 60, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 8, + read_timeout: 60, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) Rails.logger.info "# #{response.code}" @@ -106,11 +108,12 @@ update processors url, item, { - json: true, - open_timeout: 8, - read_timeout: 60, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 8, + read_timeout: 60, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) Rails.logger.info "# #{response.code}" @@ -179,11 +182,12 @@ create/update/delete index url, data[:data], { - json: true, - open_timeout: 8, - read_timeout: 60, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 8, + read_timeout: 60, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) Rails.logger.info "# #{response.code}" @@ -217,11 +221,12 @@ add new object to search index url, data, { - json: true, - open_timeout: 8, - read_timeout: 60, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 8, + read_timeout: 60, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) Rails.logger.info "# #{response.code}" @@ -254,10 +259,11 @@ remove whole data from index response = UserAgent.delete( url, { - open_timeout: 8, - read_timeout: 60, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + open_timeout: 8, + read_timeout: 60, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) Rails.logger.info "# #{response.code}" @@ -365,11 +371,12 @@ remove whole data from index url, query_data, { - json: true, - open_timeout: 5, - read_timeout: 14, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 5, + read_timeout: 14, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) @@ -512,11 +519,12 @@ example for aggregations within one year url, data, { - json: true, - open_timeout: 5, - read_timeout: 14, - user: Setting.get('es_user'), - password: Setting.get('es_password'), + json: true, + open_timeout: 5, + read_timeout: 14, + open_socket_tries: 3, + user: Setting.get('es_user'), + password: Setting.get('es_password'), } ) diff --git a/lib/tasks/search_index_es.rake b/lib/tasks/search_index_es.rake index 9d9b044cb..a1282b156 100644 --- a/lib/tasks/search_index_es.rake +++ b/lib/tasks/search_index_es.rake @@ -275,12 +275,14 @@ end # get es version def es_version - info = SearchIndexBackend.info - number = nil - if info.present? - number = info['version']['number'].to_s + @es_version ||= begin + info = SearchIndexBackend.info + number = nil + if info.present? + number = info['version']['number'].to_s + end + number end - number end # no es_pipeline for elasticsearch 5.5 and lower diff --git a/lib/user_agent.rb b/lib/user_agent.rb index 86ee27502..748b65d25 100644 --- a/lib/user_agent.rb +++ b/lib/user_agent.rb @@ -59,9 +59,12 @@ returns # start http call begin total_timeout = options[:total_timeout] || 60 - Timeout.timeout(total_timeout) do - response = http.request(request) - return process(request, response, uri, count, params, options) + + handled_open_timeout(options[:open_socket_tries]) do + Timeout.timeout(total_timeout) do + response = http.request(request) + return process(request, response, uri, count, params, options) + end end rescue => e log(url, request, nil, options) @@ -112,9 +115,12 @@ returns # start http call begin total_timeout = options[:total_timeout] || 60 - Timeout.timeout(total_timeout) do - response = http.request(request) - return process(request, response, uri, count, params, options) + + handled_open_timeout(options[:open_socket_tries]) do + Timeout.timeout(total_timeout) do + response = http.request(request) + return process(request, response, uri, count, params, options) + end end rescue => e log(url, request, nil, options) @@ -164,9 +170,12 @@ returns # start http call begin total_timeout = options[:total_timeout] || 60 - Timeout.timeout(total_timeout) do - response = http.request(request) - return process(request, response, uri, count, params, options) + + handled_open_timeout(options[:open_socket_tries]) do + Timeout.timeout(total_timeout) do + response = http.request(request) + return process(request, response, uri, count, params, options) + end end rescue => e log(url, request, nil, options) @@ -209,9 +218,11 @@ returns # start http call begin total_timeout = options[:total_timeout] || 60 - Timeout.timeout(total_timeout) do - response = http.request(request) - return process(request, response, uri, count, {}, options) + handled_open_timeout(options[:open_socket_tries]) do + Timeout.timeout(total_timeout) do + response = http.request(request) + return process(request, response, uri, count, {}, options) + end end rescue => e log(url, request, nil, options) @@ -488,6 +499,16 @@ returns ) end + def self.handled_open_timeout(tries) + tries ||= 1 + + tries.times do |index| + yield + rescue Net::OpenTimeout + raise if (index + 1) == tries + end + end + class Result attr_reader :error