From 94560f6ca298d79452f619c51025cf80caac25fb Mon Sep 17 00:00:00 2001 From: Martin Edenhofer Date: Fri, 8 Jan 2016 15:31:47 +0100 Subject: [PATCH] Prepared tweet class for twitter streaming. --- app/models/channel/driver/twitter.rb | 153 ++++++++++++++++------- lib/http/uri.rb | 6 + lib/{tweet.rb => tweet_base.rb} | 64 +++++++--- lib/tweet_rest.rb | 23 ++++ lib/tweet_stream.rb | 27 ++++ test/integration/twitter_browser_test.rb | 58 +++++++-- test/integration/twitter_test.rb | 21 +++- 7 files changed, 279 insertions(+), 73 deletions(-) create mode 100644 lib/http/uri.rb rename lib/{tweet.rb => tweet_base.rb} (82%) create mode 100644 lib/tweet_rest.rb create mode 100644 lib/tweet_stream.rb diff --git a/app/models/channel/driver/twitter.rb b/app/models/channel/driver/twitter.rb index db83a32cf..4ad84a193 100644 --- a/app/models/channel/driver/twitter.rb +++ b/app/models/channel/driver/twitter.rb @@ -49,14 +49,16 @@ class Channel::Driver::Twitter options = check_external_credential(options) - @tweet = Tweet.new(options[:auth]) - @sync = options[:sync] - @channel = channel + # check if stream scheduler is already running and return + + @rest_client = TweetRest.new(options[:auth]) + @sync = options[:sync] + @channel = channel Rails.logger.debug 'twitter fetch started' - fetch_search fetch_mentions + fetch_search fetch_direct_messages disconnect @@ -65,6 +67,7 @@ class Channel::Driver::Twitter { result: 'ok', + notice: '', } end @@ -94,78 +97,136 @@ class Channel::Driver::Twitter options = check_external_credential(options) - @tweet = Tweet.new(options[:auth]) - tweet = @tweet.from_article(article) + @rest_client = TweetRest.new(options[:auth]) + tweet = @rest_client.from_article(article) disconnect tweet end def disconnect - @tweet.disconnect + @stream_client.disconnect if @stream_client + @rest_client.disconnect if @rest_client + end + + def stream_instance(channel) + @channel = channel + options = @channel.options + @stream_client = TweetStream.new(options[:auth]) + end + + def stream + hashtags = [] + @channel.options['sync']['search'].each {|item| + hashtags.push item['term'] + } + filter = { + track: hashtags.join(','), + } + if @channel.options['sync']['mentions']['group_id'] != '' + filter[:replies] = 'all' + end + + @stream_client.client.user(filter) do |tweet| + next if tweet.class != Twitter::Tweet && tweet.class != Twitter::DirectMessage + next if Ticket::Article.find_by(message_id: tweet.id) + + # check direct message + if tweet.class == Twitter::DirectMessage + if @channel.options['sync']['direct_messages']['group_id'] != '' + next if direct_message_limit_reached(tweet) + @stream_client.to_group(tweet, @channel.options['sync']['direct_messages']['group_id'], @channel) + end + next + end + + next if @stream_client.tweet_limit_reached(tweet) + + # check if it's mention + if @channel.options['sync']['mentions']['group_id'] != '' + hit = false + if tweet.user_mentions + tweet.user_mentions.each {|user| + if user.id.to_s == @channel.options['user']['id'].to_s + hit = true + end + } + end + if hit + @stream_client.to_group(tweet, @channel.options['sync']['mentions']['group_id'], @channel) + next + end + end + + # check hashtags + if @channel.options['sync']['search'] && tweet.hashtags + hit = false + @channel.options['sync']['search'].each {|item| + tweet.hashtags.each {|hashtag| + next if item['term'] !~ /^#/ + if item['term'].sub(/^#/, '') == hashtag.text + hit = item + end + } + } + if hit + @stream_client.to_group(tweet, hit['group_id'], @channel) + next + end + end + + # check stings + if @channel.options['sync']['search'] + hit = false + body = tweet.text + @channel.options['sync']['search'].each {|item| + next if item['term'] =~ /^#/ + if body =~ /#{item['term']}/ + hit = item + end + } + if hit + @stream_client.to_group(tweet, hit['group_id'], @channel) + end + end + + end end private def fetch_search - return if !@sync[:search] return if @sync[:search].empty? - - # search results @sync[:search].each { |search| - result_type = search[:type] || 'mixed' - Rails.logger.debug " - searching for '#{search[:term]}'" - - counter = 0 - @tweet.client.search(search[:term], result_type: result_type).collect { |tweet| - - break if @sync[:limit] && @sync[:limit] <= counter - break if Ticket::Article.find_by(message_id: tweet.id) - - @tweet.to_group(tweet, search[:group_id], @channel) - - counter += 1 + @rest_client.client.search(search[:term], result_type: result_type).collect { |tweet| + next if Ticket::Article.find_by(message_id: tweet.id) + break if @rest_client.tweet_limit_reached(tweet) + @rest_client.to_group(tweet, search[:group_id], @channel) } } end def fetch_mentions - return if !@sync[:mentions] return if @sync[:mentions].empty? - Rails.logger.debug ' - searching for mentions' - - counter = 0 - @tweet.client.mentions_timeline.each { |tweet| - - break if @sync[:limit] && @sync[:limit] <= counter - break if Ticket::Article.find_by(message_id: tweet.id) - - @tweet.to_group(tweet, @sync[:mentions][:group_id], @channel) - - counter += 1 + @rest_client.client.mentions_timeline.each { |tweet| + next if Ticket::Article.find_by(message_id: tweet.id) + break if @rest_client.tweet_limit_reached(tweet) + @rest_client.to_group(tweet, @sync[:mentions][:group_id], @channel) } end def fetch_direct_messages - return if !@sync[:direct_messages] return if @sync[:direct_messages].empty? - Rails.logger.debug ' - searching for direct_messages' - - counter = 0 - @tweet.client.direct_messages.each { |tweet| - - break if @sync[:limit] && @sync[:limit] <= counter - break if Ticket::Article.find_by(message_id: tweet.id) - - @tweet.to_group(tweet, @sync[:direct_messages][:group_id], @channel) - - counter += 1 + @rest_client.client.direct_messages.each { |tweet| + next if Ticket::Article.find_by(message_id: tweet.id) + break if @rest_client.direct_message_limit_reached(tweet) + @rest_client.to_group(tweet, @sync[:direct_messages][:group_id], @channel) } end diff --git a/lib/http/uri.rb b/lib/http/uri.rb new file mode 100644 index 000000000..f5aef9d77 --- /dev/null +++ b/lib/http/uri.rb @@ -0,0 +1,6 @@ +# Monkey-patch HTTP::URI +class HTTP::URI + def port + 443 if self.https? + end +end diff --git a/lib/tweet.rb b/lib/tweet_base.rb similarity index 82% rename from lib/tweet.rb rename to lib/tweet_base.rb index 55ad83cf2..ff68e94f8 100644 --- a/lib/tweet.rb +++ b/lib/tweet_base.rb @@ -1,28 +1,12 @@ # Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/ require 'twitter' +load 'lib/http/uri.rb' -class Tweet +class TweetBase attr_accessor :client - def initialize(auth) - - @client = Twitter::REST::Client.new do |config| - config.consumer_key = auth[:consumer_key] - config.consumer_secret = auth[:consumer_secret] - config.access_token = auth[:oauth_token] - config.access_token_secret = auth[:oauth_token_secret] - end - - end - - def disconnect - - return if !@client - @client = nil - end - def user(tweet) if tweet.class == Twitter::DirectMessage @@ -161,6 +145,16 @@ class Tweet elsif tweet.class == Twitter::Tweet article_type = 'twitter status' from = "@#{tweet.user.screen_name}" + if tweet.user_mentions + tweet.user_mentions.each {|local_user| + if !to + to = '' + else + to + ', ' + end + to += "@#{local_user.screen_name}" + } + end in_reply_to = tweet.in_reply_to_status_id else fail "Unknown tweet type '#{tweet.class}'" @@ -187,6 +181,9 @@ class Tweet ticket = nil # use transaction + if @connection_type == 'stream' + ActiveRecord::Base.connection.reconnect! + end ActiveRecord::Base.transaction do UserInfo.current_user_id = 1 @@ -217,6 +214,9 @@ class Tweet Observer::Ticket::Notification.transaction end + if @connection_type == 'stream' + ActiveRecord::Base.connection.close + end ticket end @@ -250,4 +250,32 @@ class Tweet tweet end + def tweet_limit_reached(tweet) + max_count = 60 + if @connection_type == 'stream' + max_count = 15 + end + type_id = Ticket::Article::Type.lookup(name: 'twitter status').id + created_at = Time.zone.now - 15.minutes + if Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count > max_count + Rails.logger.info "Tweet limit reached, ignored tweed id (#{tweet.id})" + return true + end + false + end + + def direct_message_limit_reached(tweet) + max_count = 100 + if @connection_type == 'stream' + max_count = 40 + end + type_id = Ticket::Article::Type.lookup(name: 'twitter direct-message').id + created_at = Time.zone.now - 15.minutes + if Ticket::Article.where('created_at > ? AND type_id = ?', created_at, type_id).count > max_count + Rails.logger.info "Tweet direct message limit reached, ignored tweed id (#{tweet.id})" + return true + end + false + end + end diff --git a/lib/tweet_rest.rb b/lib/tweet_rest.rb new file mode 100644 index 000000000..d38d95bfd --- /dev/null +++ b/lib/tweet_rest.rb @@ -0,0 +1,23 @@ +# Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/ + +class TweetRest < TweetBase + + attr_accessor :client + + def initialize(auth) + @connection_type = 'rest' + @client = Twitter::REST::Client.new do |config| + config.consumer_key = auth[:consumer_key] + config.consumer_secret = auth[:consumer_secret] + config.access_token = auth[:oauth_token] + config.access_token_secret = auth[:oauth_token_secret] + end + + end + + def disconnect + return if !@client + @client = nil + end + +end diff --git a/lib/tweet_stream.rb b/lib/tweet_stream.rb new file mode 100644 index 000000000..1abf1d15f --- /dev/null +++ b/lib/tweet_stream.rb @@ -0,0 +1,27 @@ +# Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/ + +class TweetStream < TweetBase + + attr_accessor :client + + def initialize(auth) + @connection_type = 'stream' + @client = Twitter::Streaming::ClientCustom.new do |config| + config.consumer_key = auth[:consumer_key] + config.consumer_secret = auth[:consumer_secret] + config.access_token = auth[:oauth_token] + config.access_token_secret = auth[:oauth_token_secret] + end + + end + + def disconnect + if @client && @client.custom_connection_handle + @client.custom_connection_handle.close + end + + return if !@client + @client = nil + end + +end diff --git a/test/integration/twitter_browser_test.rb b/test/integration/twitter_browser_test.rb index 0d8f4246c..6593c2b9a 100644 --- a/test/integration/twitter_browser_test.rb +++ b/test/integration/twitter_browser_test.rb @@ -17,12 +17,24 @@ class TwitterBrowserTest < TestCase if !ENV['TWITTER_USER_LOGIN'] fail "ERROR: Need TWITTER_USER_LOGIN - hint TWITTER_USER_LOGIN='1234'" end - twitter_user_loign = ENV['TWITTER_USER_LOGIN'] + twitter_user_login = ENV['TWITTER_USER_LOGIN'] if !ENV['TWITTER_USER_PW'] fail "ERROR: Need TWITTER_USER_PW - hint TWITTER_USER_PW='1234'" end - twitter_pw = ENV['TWITTER_USER_PW'] + twitter_user_pw = ENV['TWITTER_USER_PW'] + + if !ENV['TWITTER_CUSTOMER_TOKEN'] + fail "ERROR: Need TWITTER_CUSTOMER_TOKEN - hint TWITTER_CUSTOMER_TOKEN='1234'" + end + twitter_customer_token = ENV['TWITTER_CUSTOMER_TOKEN'] + + if !ENV['TWITTER_CUSTOMER_TOKEN_SECRET'] + fail "ERROR: Need TWITTER_CUSTOMER_TOKEN_SECRET - hint TWITTER_CUSTOMER_TOKEN_SECRET='1234'" + end + twitter_customer_token_secret = ENV['TWITTER_CUSTOMER_TOKEN_SECRET'] + + hash = "#sweetcheck#{rand(99_999)}" @browser = browser_instance login( @@ -103,12 +115,12 @@ class TwitterBrowserTest < TestCase set( css: '#username_or_email', - value: twitter_user_loign, + value: twitter_user_login, no_click: true, #