前回、昔、書いた[Google App Engine][twitter][oauth]ログインを自作するから少し仕様が変わったみたいで、少し見やすくして、新しくクラスを作ってみたのですが、ツイートするメソッドを追加しました。
#twitter.py
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2007 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from google.appengine.api import urlfetch import datetime import time import random import urllib import hmac import hashlib import logging class LoginTwitter(object): def __init__(self,consumer_secret,consumer_key): self.consumer_secret = consumer_secret self.access_token_secret = "" self.method = "POST" self.request_token_url = "https://api.twitter.com/oauth/request_token" self.oauth_authenticate_url = "https://api.twitter.com/oauth/authenticate?oauth_token=" self.access_token_url = "https://api.twitter.com/oauth/access_token" self.statuses_update_url = "https://api.twitter.com/1.1/statuses/update.json" self.prms = { "oauth_consumer_key":consumer_key, "oauth_nonce":LoginTwitter._nonce(), "oauth_signature_method":'HMAC-SHA1', "oauth_version":'1.0', "oauth_timestamp":LoginTwitter._timeStamp() } @classmethod def _timeStamp(cls): _d = datetime.datetime.today() _d = time.mktime(_d.timetuple()) _d = str(int(_d)) return _d @classmethod def _nonce(cls): _s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz123456789[]{}!$%&'()-^\:;*+><" _l = list(_s) _n = "" for i in range(20): _x = random.randint(0,len(_l)-1) _n += _l[_x] return _n def _signature(self,request_url): _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "&" + i + "=" + urllib.quote_plus(self.prms[i]) else: _prms = _prms[1:] _prms = urllib.quote_plus(self.method) + "&" + urllib.quote_plus(request_url) + "&" + urllib.quote_plus(_prms) _h = hmac.new("%s&%s" % (urllib.quote(self.consumer_secret), urllib.quote(self.access_token_secret)), _prms, hashlib.sha1) _sig = _h.digest().encode("base64").strip() return _sig def getOauthToken(self): if self.prms.has_key("oauth_token"): return self.prms["oauth_token"] else: return None def getRedirectOauthAuthenticateUrl(self): if self.prms.has_key("oauth_token"): _url = self.oauth_authenticate_url + self.prms["oauth_token"] return _url else: return None def requestAccessToken(self,oauth_token,oauth_verifier): self.prms["oauth_token"] = oauth_token _response = self._request(self.access_token_url,{"oauth_verifier":oauth_verifier}) if _response: _response = _response.split("&") _result = {} for i in _response: key_value = i.split("=") _result[key_value[0]] = key_value[1] else: return _response def requestOAuthToken(self,oauth_callback_url): self.prms["oauth_callback"] = oauth_callback_url _response = self._request(self.request_token_url,{"oauth_callback":self.prms["oauth_callback"]}) if _response is None: return False _response = _response.split("&") _result = {} for i in _response: key_value = i.split("=") _result[key_value[0]] = key_value[1] if _result["oauth_callback_confirmed"] == "true": self.prms["oauth_token"] = _result["oauth_token"] return True else: return False def _request(self,request_url,request_prms): self.prms['oauth_signature'] = self._signature(request_url) if self.method == 'POST': _headers = { 'Content-Type': 'application/x-www-form-urlencoded' } _prms_keys = sorted(self.prms.keys()) _prms = '' for i in _prms_keys: _prms = _prms + "," + i + "=\"" + urllib.quote_plus(self.prms[i]) + "\"" else: _prms = _prms[1:] _prms = "OAuth " + _prms _headers["Authorization"] = _prms _result = urlfetch.fetch( url = request_url, payload = urllib.urlencode(request_prms), method = urlfetch.POST, headers = _headers ) logging.info(_result.status_code) logging.info(_result.content) if _result.status_code == 200: return _result.content else: return None def statusesUpdate(self,access_token,access_token_secret,status): self.prms["oauth_token"] = access_token self.access_token_secret = access_token_secret self.prms["status"] = status _response = self._request(self.statuses_update_url,{"status":status}) return _response
0 コメント:
コメントを投稿