import ujson as json import re from datetime import datetime, timedelta import logging import sys import flask from flask.ext.login import LoginManager, login_user, login_required, current_user, logout_user, login_fresh, \ confirm_login from flask.ext.sqlalchemy import SQLAlchemy from flask.ext.wtf import Form import requests from wtforms import StringField from wtforms.validators import DataRequired logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.DEBUG) handler = logging.FileHandler('wkburned.log') handler.setLevel(logging.WARN) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) console_handler.setFormatter(formatter) logger.addHandler(handler) logger.addHandler(console_handler) app = flask.Flask(__name__) logger.debug("Configuring application.") app.config.update( PORT=5000, JSON_AS_ASCII=False ) app.config.from_pyfile('config.py', silent=True) logger.debug("Setting up login manager.") login_manager = LoginManager() login_manager.init_app(app) logger.debug("Setting up database.") db = SQLAlchemy(app) filter_regex = re.compile('\d+([,-]\d+)*') logger.debug("Setup complete") class LoginForm(Form): api_key = StringField('api_key', validators=[DataRequired()]) class User(db.Model): __tablename__ = 'users' api_key = db.Column(db.String, unique=True, primary_key=True) radicals = db.Column(db.String) kanji = db.Column(db.String) vocabulary = db.Column(db.String) username = db.Column(db.String) gravatar = db.Column(db.String) last_updated = db.Column(db.DateTime) # auto_init and username should only be changed for testing purposes def __init__(self, api_key, auto_init=True, username=None): logger.info("Creating new user with %s", api_key) self.api_key = api_key self.username = username self.last_updated = datetime.min if auto_init: # pragma: no cover self.last_updated = datetime.utcnow() self.parse_radicals_and_userdata() self.parse_kanji() self.parse_vocabulary() # If a user with this name already exists, delete it as their api key has probably changed to the new input. if User.query.filter(User.username == self.username).first(): logger.info("User with username '%s' already exists, deleting old entry", self.username) db.session.delete(User.query.filter(User.username == self.username).first()) db.session.add(self) db.session.commit() def is_authenticated(self): return True def is_active(self): return True def is_anonymous(self): return False def get_id(self): return str(self.api_key) def parse_radicals_and_userdata(self): # pragma: no cover logger.debug("Parsing radicals and userdata for %s", self.api_key) response = requests.get("https://www.wanikani.com/api/user/" + self.api_key + "/radicals/") response.raise_for_status() data = response.json() if data.get('error'): logger.error("Error response from WK when updating radicals and userdata. %s", data['error']['message']) raise ValueError(data['error']['message']) self.username = data['user_information']['username'] self.gravatar = data['user_information']['gravatar'] if data.get('requested_information'): self.radicals = json.dumps(data['requested_information'], ensure_ascii=False) def parse_kanji(self): # pragma: no cover logger.debug("Parsing kanji for %s", self.api_key) response = requests.get("https://www.wanikani.com/api/user/" + self.api_key + "/kanji/") response.raise_for_status() data = response.json() if data.get('error'): logger.error("Error response from WK when updating kanji. %s", data['error']['message']) raise ValueError(data['error']['message']) if data.get('requested_information'): self.kanji = json.dumps(data['requested_information'], ensure_ascii=False) def parse_vocabulary(self): # pragma: no cover logger.debug("Parsing vocabulary for %s", self.api_key) response = requests.get("https://www.wanikani.com/api/user/" + self.api_key + "/vocabulary/") response.raise_for_status() data = response.json() if data.get('error'): logger.error("Error response from WK when updating vocabulary. %s", data['error']['message']) raise ValueError(data['error']['message']) if data.get('requested_information'): self.vocabulary = json.dumps(data['requested_information']['general'], ensure_ascii=False) def update_all(self): # pragma: no cover logger.info("Updating all info for %s", self.api_key) if (datetime.utcnow() - self.last_updated) > timedelta(hours=1): try: self.parse_radicals_and_userdata() self.parse_kanji() self.parse_vocabulary() self.last_updated = datetime.utcnow() db.session.commit() except requests.exceptions.SSLError: logger.error("%s received an SSL error trying to update. Ensure SSL certs are installed.", self.api_key) raise ValueError('Backend SSL Certificate Error, contact administrator.') else: logger.warn("%s tried to update too frequently", self.api_key) raise ValueError('Cannot refresh now, try again later.') def parse_range(input_range): if filter_regex.match(input_range): result = [] components = input_range.split(',') for item in components: if item.isdigit(): result.append(int(item)) else: range_ends = item.split('-') result.extend(list(range(int(range_ends[0]), int(range_ends[1]) + 1))) return result def get_items_with_level_restriction(level_range, item_state, item_types): items = [] radical_count = 0 loads = json.loads if 'radical' in item_types and current_user.radicals: items.extend({'item_type': 'radical', 'answer': item['meaning'], 'question': item['image'] if item['image'] else item['character']} for item in loads(current_user.radicals) if item['user_specific'] and item['user_specific']['srs'] in item_state and item['level'] in level_range) radical_count = len(items) kanji_count = 0 if 'kanji' in item_types and current_user.kanji: for item in filter((lambda x: x['user_specific'] and x['user_specific']['srs'] in item_state and x['level'] in level_range), loads(current_user.kanji)): items.extend([{'item_type': 'kanji', 'question': item['character'], 'answer': item['onyomi'] if item['important_reading'] == 'onyomi' else item['kunyomi'], 'answer_type': 'kana'}, {'item_type': 'kanji', 'question': item['character'], 'answer': item['meaning'], 'answer_type': 'eng'}]) kanji_count = int((len(items) - radical_count) / 2) vocabulary_count = 0 if 'vocab' in item_types and current_user.vocabulary: for item in filter((lambda x: x['user_specific'] and x['user_specific']['srs'] in item_state and x['level'] in level_range), loads(current_user.vocabulary)): items.extend([{'item_type': 'vocabulary', 'question': item['character'], 'answer': item['kana'], 'answer_type': 'kana'}, {'item_type': 'vocabulary', 'question': item['character'], 'answer': item['meaning'], 'answer_type': 'eng'}]) vocabulary_count = int((len(items) - (radical_count + (kanji_count * 2))) / 2) if not items: return flask.jsonify(error="No items within these filter parameters") else: return flask.jsonify(radical_count=radical_count, kanji_count=kanji_count, vocabulary_count=vocabulary_count, item_list=items) @login_manager.user_loader def load_user(api_key): return User.query.get(str(api_key)) @app.route('/', methods=('GET', 'POST')) def show_home(): if current_user.is_authenticated: return flask.redirect(flask.url_for('show_quiz')) form = LoginForm() if form.validate_on_submit(): input_data = str(form.api_key.data) user = User.query.get(input_data) if not user: user = User.query.filter(User.username == input_data).first() if user: login_user(user, remember=True) return flask.redirect(flask.url_for('show_quiz')) elif len(input_data) == 32: # pragma: no cover try: new_user = User(input_data) login_user(new_user, remember=True) return flask.redirect(flask.url_for('show_quiz')) except ValueError as err: flask.flash(err) return flask.render_template("welcome.html", form=form) else: flask.flash("API Key length invalid, or username not already in database.") return flask.render_template("welcome.html", form=form) return flask.render_template("welcome.html", form=form) @login_required @app.route('/quiz') def show_quiz(): if not login_fresh(): if not current_user.is_anonymous and User.query.get(current_user.api_key): confirm_login() return flask.render_template("quiz.html") else: logout_user() return flask.redirect(flask.url_for('show_home')) return flask.render_template("quiz.html") @app.route('/user_items') @login_required def get_items(): level_range = list(range(0, 61)) if flask.request.args.get('level_range') and flask.request.args.get('level_range') is not '': level_range = parse_range(flask.request.args.get('level_range')) item_state = ['burned'] if flask.request.args.get('item_state') and flask.request.args.get('item_state') is not '': item_state = flask.request.args.get('item_state').split(',') item_types = ['radical', 'kanji', 'vocab'] if flask.request.args.get('item_types') and flask.request.args.get('item_types') is not '': item_types = flask.request.args.get('item_types').split(',') return get_items_with_level_restriction(level_range, item_state, item_types) @app.route('/refresh', methods=['POST']) @login_required def refresh_api(): # pragma: no cover try: current_user.update_all() return flask.jsonify(last_refresh=datetime_format(current_user.last_updated)) except ValueError as err: logger.error("Error during /refresh. %s", str(err)) return str(err), 500 @app.route('/logout') def logout(): logout_user() return flask.redirect(flask.url_for('show_home')) @login_manager.unauthorized_handler def unauthorized(): return flask.redirect(flask.url_for('show_home')) @app.template_filter('datetime_format') def datetime_format(input_data): return input_data.strftime("%d %B %Y %I:%M%p") if __name__ == '__main__': db.create_all() app.run(threaded=True, port=app.config['PORT'])