xmpp-twit
Last-modified: 2014-08-16 (土) 12:41:50 (1940d)

概要 †
XMPPで送信したインスタントメッセージを、twitterに登録するボット。
フォローしている人のツイートの表示と、こちらからのツイートの登録のみ可能。
ダウンロード †
http://ytyng.com/develop/xmpp-twit.1.zip 7KB
動作環境 †
Python2.6で動作します。
動作には、python-xmpp パッケージが必要です。
Ubuntu の場合は、
$ sudo apt-get install python-xmpp
でインストールできます。
設定 †
config.py をテキストエディタで直接編集してください。
起動方法 †
$ twitter-control.sh start
を実行すると、指定したXMPPアカウントにログインし、常駐します。
ツイートを登録するには †
このボットのアカウントに向かって、適当に長い文字(5文字以上)を発言すると、Twitterにツイートとして登録します。
フォローしている人のツイートをみるには †
このボットのアカウントに向かって、"t"と発言すると、フォローしている人のツイートをタイムラインで最新5件を取得し、返してきます。
また、t以外の短い文字("a"など)を発言すると、フォローしている人別の最新1件のツイートを、最新5件表示します。(1ユーザーにつき1つのツイートのみ表示)
ライセンス †
フリーです。ご自由にどうぞ。
本スクリプトを使用したことによる損害について、作者ytyng.comは一切の責任を負いません。
ソースコード †
config.py †
#!/usr/bin/python # -*- coding: utf-8 -*- account = { # xmpp(Jabber)のログインID 'JID' : 'yourid@jabber', # xmpp(Jabber)のログインパスワード 'Password': 'yourpassword', # xmpp(Jabber)サービスが稼動しているサーバ 'Server' : '127.0.0.1', # xmpp(Jabber)サービスのポート 'Port' : 5222, # xmpp(Jabber)のログインリソース名(任意) 'Resource': 'xmpppy-bot', } # twitterのログインID twitterAccount = "" # twitterのログインパスワード twitterPassword = "" #この文字数以上だとつぶやき statusPostThreshold = 5 timelineCount = 5 helpText = u"""\ 長い文字の入力:つぶやき t:タイムライン h:ヘルプ その他:ユーザーの最新つぶやきリスト """
xmpp-twitter-bot.py (メインスクリプト) †
#!/usr/bin/python # -*- coding: utf-8 -*- import xmpp import re import config as CONFIG import Twitter class ConnectionError: pass class AuthorizationError: pass class XmppBot: twitterAccount = "" twitterPassword = "" RE_RETWEET = re.compile(r"^(\d+)(@\S+)\s+(.+)$") def __init__(self): """ コンストラクタ """ jid = xmpp.JID(CONFIG.account['JID']) self.connection = xmpp.Client(jid.getDomain(), debug=[]) result = self.connection.connect(server=(CONFIG.account['Server'],CONFIG.account['Port'])) if result is None: raise ConnectionError result = self.connection.auth(jid.getNode(), CONFIG.account['Password'],CONFIG.account['Resource']) if result is None: raise AuthorizationError self.connection.RegisterHandler('message',self.parseMessage) self.connection.sendInitPresence() def loop(self): """ メインループ """ try: while self.connection.Process(1): pass except KeyboardInterrupt: pass def getSender(self,mess) : """ 送信者を取得 """ senderFull = unicode(mess.getFrom()) if senderFull.find('/')+1: sender,resource=senderFull.split('/',1) else: sender,resource=senderFull,'' return sender def parseMessage(self,conn,mess): """ 受信したメッセージを解析する """ body=mess.getBody() if body is None: return regExpRetweet = self.RE_RETWEET.search(body) if regExpRetweet: #RTモード rtId = regExpRetweet.group(1) rtTo = regExpRetweet.group(2) rtBody = regExpRetweet.group(3) rtBody += " "+rtTo (headers,responseBody) = Twitter.updateStatus(CONFIG.twitterAccount,CONFIG.twitterPassword,rtBody,rtId) reply = u"登録しました(RT %s) … %s" % (rtId,headers['Status']) elif len(body) >= CONFIG.statusPostThreshold: #つぶやく (headers,responseBody) = Twitter.updateStatus(CONFIG.twitterAccount,CONFIG.twitterPassword,body) reply = u"登録しました … %s" % headers['Status'] elif body == "t" : #タイムライン表示 reply = Twitter.getFriendsTimelineText(CONFIG.twitterAccount,CONFIG.twitterPassword,CONFIG.timelineCount) elif body == "h" : #ヘルプ表示 reply = CONFIG.helpText else: #ステータスリストを取得 #reply = Twitter.getFriendsTimelineText(CONFIG.twitterAccount,CONFIG.twitterPassword,CONFIG.timelineCount) reply = Twitter.getStatusFriendsText(CONFIG.twitterAccount,CONFIG.twitterPassword) conn.send(xmpp.Message(mess.getFrom(),reply)) bot = XmppBot() bot.twitterAccount = CONFIG.twitterAccount bot.twitterPassword = CONFIG.twitterPassword bot.loop()
Twitter.py †
#!/usr/bin/python # -*- coding: utf-8 -*- import urllib2 import urllib import time import datetime from xml.dom.minidom import parseString OUTPUT_DATE_FORMAT = "%Y-%m-%d %H:%M" def updateStatus(userId,password,status,replyId=""): """ twitterを更新する """ API_URL="https://twitter.com/statuses/update.xml" if not type(status) == str: status = status.encode('UTF-8','ignore') passwordManager = urllib2.HTTPPasswordMgr() passwordManager.add_password('Twitter API', 'https://twitter.com/',userId, password) authHandler = urllib2.HTTPBasicAuthHandler(passwordManager) opener = urllib2.build_opener(authHandler) params = { "status" : status } if len(replyId) > 0: print("replyId=%s"%replyId) params["in_reply_to_status_id"] = replyId print params response = opener.open(API_URL,urllib.urlencode(params)) headers = response.headers responseBody = response.read() response.close() return headers,responseBody def getFriendsTimelineXml(userId,password,count): """ タイムラインXMLを取得 """ API_URL="https://twitter.com/statuses/friends_timeline.xml?count=%s" % count passwordManager = urllib2.HTTPPasswordMgr() passwordManager.add_password('Twitter API', 'https://twitter.com/',userId, password) authHandler = urllib2.HTTPBasicAuthHandler(passwordManager) opener = urllib2.build_opener(authHandler) response = opener.open(API_URL) headers = response.headers responseBody = response.read() response.close() return headers,responseBody def getFriendsTimelineText(userId,password,count): """ タイムラインを取得し、簡易パースして出力 """ (headers,xml) = getFriendsTimelineXml(userId,password,count) bufferArray = [] dom = parseString(xml) for status in dom.getElementsByTagName("status"): buffer = u"" createdAt = getText(status.getElementsByTagName("created_at")[0]) text = getText(status.getElementsByTagName("text")[0]) id = getText(status.getElementsByTagName("id")[0]) dateString = parseDateToDateTime(createdAt).strftime(OUTPUT_DATE_FORMAT) for user in status.getElementsByTagName("user"): name = getText(user.getElementsByTagName("name")[0]) buffer += "[%s] %s@%s\n" % (dateString,id,name) buffer += text + "\n" bufferArray.append(buffer) bufferArray.reverse() buffer = "----------\n".join(bufferArray) return buffer def getStatusFriendsXml(userId,password,count): """ フレンドごとの新着ステータスXMLを取得 """ API_URL="https://api.twitter.com/1/statuses/friends.xml" passwordManager = urllib2.HTTPPasswordMgr() passwordManager.add_password('Twitter API', 'https://api.twitter.com/',userId, password) authHandler = urllib2.HTTPBasicAuthHandler(passwordManager) opener = urllib2.build_opener(authHandler) response = opener.open(API_URL) headers = response.headers responseBody = response.read() response.close() return headers,responseBody def getStatusFriendsText(userId,password,count=""): """ フレンドごとの新着ステータスXMLを取得し、簡易パースして出力 """ (headers,xml) = getStatusFriendsXml(userId,password,count) userList = [] dom = parseString(xml) for user in dom.getElementsByTagName("user"): name = getText(user.getElementsByTagName("screen_name")[0]) for status in user.getElementsByTagName("status"): createdAt = getText(status.getElementsByTagName("created_at")[0]) text = getText(status.getElementsByTagName("text")[0]) id = getText(status.getElementsByTagName("id")[0]) dtCreate = parseDateToDateTime(createdAt) dateString = dtCreate.strftime(OUTPUT_DATE_FORMAT) userList.append({ 'name' : name, 'id' : id, 'text' : text, 'dtCreate' : dtCreate, 'dateString' : dateString, }) break #1つだけで良い #userList.sort(lambda a,b:b['dtCreate']-a['dtCreate']) userList.sort(compareUserList) bufferArray = [] for user in userList: buffer = u"" buffer += "[%s] %s@%s\n" % (user['dateString'],user['id'],user['name']) buffer += user['text'] + "\n" bufferArray.append(buffer) buffer = "----------\n".join(bufferArray) return buffer def compareUserList(a,b): if b['dtCreate'] > a['dtCreate']: return -1 elif b['dtCreate'] < a['dtCreate']: return +1 else: return 0 def getText(node): """ XMLエレメントの内包テキストを取得 """ rc = "" for n in node.childNodes: if n.nodeType == n.TEXT_NODE: rc = rc + n.data return rc def parseDateToDateTime(dateString): """ 日付を解析してDateTimeにする """ DATE_FORMAT="%a %b %d %H:%M:%S +0000 %Y" #t=time.strptime(dateString,DATE_FORMAT) #d=datetime.datetime(t.tm_year,t.tm_mon,t.tm_mday,t.tm_hour,t.tm_min,t.tm_sec) d=datetime.datetime.strptime(dateString,DATE_FORMAT) d = d.replace(tzinfo=UTC()).astimezone(JST()) return d #http://mitc.xrea.jp/diary/096 class UTC(datetime.tzinfo): def utcoffset(self, dt): return datetime.timedelta(0) def tzname(self, dt): return "UTC" def dst(self, dt): return datetime.timedelta(0) class JST(datetime.tzinfo): def utcoffset(self,dt): return datetime.timedelta(hours=9) def dst(self,dt): return datetime.timedelta(0) def tzname(self,dt): return "JST"