User:BurritoBazooka/watcher script.py
Appearance
Forks
[edit]If you fork, please add a link back to this page, and add a link to your fork to the following list. However, you are welcome to apply contributions directly to this page.
Forks:
- (None)
Code
[edit]import feedparser
import subprocess, threading
import time, atexit
import random
# Very hastily written script to let me watch certain pages for vandalism while doing other things.
# Contributions welcome.
# Maintainer/Author: [[User:BurritoBazooka]] <http://en.wiki.x.io/wiki/Special:EmailUser/BurritoBazooka>
# License: CC BY-SA 3.0 License and the GFDL. A hyperlink or URL is sufficient attribution.
# TODO:
# Use rcstream instead of polling (I wrote similar code for Wikidata somewhere) http://wikitech.wikimedia.org/wiki/RCStream
# More features:
# Expand to have all articles on my watchlist
# Get random recent diffs for review (biased towards those on watchlist, new accounts, tagged edits, and those without an edit summary)
# UI for reviewing diffs, as well as notifying user when page under review gets more changes
# Allow user to quickly revert changes and warn another user (possibly not using pywikibot, but just direct web links)
articles = ["Main_Page", "Outer_space", "User_talk:BurritoBazooka"] # Edit this list to your liking
last_date = None
def tell(*args):
print(*args)
subprocess.run(["notify-send", " ".join(args)])
speak("beep beep")
def speak(what_to_say="No input given", speed=240, gap=0, lang="en"):
espeak = subprocess.Popen(["espeak", "-s"+str(speed), "-g"+str(gap), "-v"+str(lang), str(what_to_say)], stdin=subprocess.PIPE)
class Watcher:
def __init__(self, url):
self.last_date = None
self.url = url
self.running = False
def do_watch(self):
d = feedparser.parse(self.url)
rev = d['entries'][0]
date = rev['published_parsed']
if self.last_date is None or date > self.last_date:
tell(time.asctime(date), rev['title'])
self.last_date = date
def loop(self, delay=15):
M = 1000
v = (delay/5)*M # random sleep time avoids hitting server with all our requests at once
self.running = True
while self.running:
self.do_watch()
sleeptime = delay + (random.randint(-v, v) / M)
time.sleep(sleeptime)
def stop(self):
self.running = False
if __name__ == "__main__":
wt = {}
url_template = "http://en.wiki.x.io/w/index.php?title={}&action=history&feed=rss"
urls = [url_template.format(article) for article in articles]
for u in urls:
w = Watcher(u)
t = threading.Thread(target=w.loop)
wt[w] = t
atexit.register(w.stop)
t.start()
time.sleep(0.25)