-
Notifications
You must be signed in to change notification settings - Fork 0
/
reddit_scraper.py
281 lines (237 loc) · 11.9 KB
/
reddit_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import praw
import os
import argparse
import gspread
import csv
import logging
from datetime import datetime, timedelta
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('googleapicliet.discovery_cache').setLevel(logging.ERROR)
# Constants
SCRAPER_FOLDER_NAME = 'scraper'
REDDIT_FOLDER_NAME = 'reddit'
SCOPES = ["https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive.file",
"https://www.googleapis.com/auth/drive"]
OAUTH_CREDENTIALS_FILE = 'google_credentials_oauth.json'
STORAGE_HEADERS = ["date", "title", "post_content", "post_vote_count", "top_comment", "comment_vote_count", "url"]
# Replace these with your own Reddit app credentials in praw.ini file
reddit = praw.Reddit(
site_name="Scraper"
)
def get_posts_up_to_date(subreddit_name, min_upvotes, max_posts, storage):
logging.info(f"Starting to scrape posts from subreddit: {subreddit_name}")
# Check if the storage target exists and get the most recent post date if available
filename = f"{subreddit_name}_{min_upvotes}"
last_date_in_file = storage.get_last_post_date(filename)
if last_date_in_file:
logging.info(f"Last post date found in storage: {last_date_in_file}")
else:
logging.info("No previous posts found in storage. Retrieving all posts.")
subreddit = reddit.subreddit(subreddit_name)
# If an end_date is provided, use it; otherwise, retrieve all posts
if last_date_in_file:
end_timestamp = int(datetime.strptime(last_date_in_file, '%Y-%m-%d %H:%M:%S UTC').timestamp())
else:
end_timestamp = None # No date limit if no date is found in the file
posts = []
two_days_ago = datetime.utcnow() - timedelta(days=2)
logging.info("Retrieving posts from Reddit...")
# Seems like it can only query about 1000 posts? That really sucks.
queried_post_count = 0
# Use PRAW's `new` generator to paginate through posts
for post in subreddit.new(limit=None): # Pagination is handled automatically
queried_post_count += 1
# Check if the post has enough upvotes and if the post's creation date is within the limit
if post.score >= min_upvotes:
post_created_time = post.created_utc
# Skip posts created within the last 2 days
if post_created_time >= two_days_ago.timestamp():
continue
# Stop if the post is older than the end_date (if provided)
if end_timestamp and post_created_time <= end_timestamp:
logging.info("Reached posts older than the last saved post. Stopping retrieval.")
break
# Add the post to the list
top_comment_body = ""
top_comment_score = 0
if len(post.comments) > 0:
post.comments.replace_more(limit=0) # Load all comments
top_comment = max(post.comments, key=lambda comment: comment.score if comment.body != '[deleted]' and comment.body != '[removed]' else 0)
top_comment_body = top_comment.body
top_comment_score = top_comment.score
posts.append({
"date": datetime.utcfromtimestamp(post.created_utc).strftime('%Y-%m-%d %H:%M:%S UTC'),
"title": post.title,
"post_content": post.selftext,
"post_vote_count": post.score,
"top_comment": top_comment_body,
"comment_vote_count": top_comment_score,
"url": post.url,
})
logging.info(f"Retrieved post from date: {posts[-1]['date']}. Retrieved posts: {len(posts)}")
if len(posts) >= max_posts:
logging.info(f"Reached the maximum number of posts to retrieve: {max_posts}")
break
logging.info(f"Queried a total of {queried_post_count} posts")
posts.reverse()
# Write the data to the selected storage method
logging.info(f"Writing {len(posts)} posts to storage")
storage.write_posts(filename, posts)
logging.info("Finished writing posts to storage")
class CSVStorage:
@staticmethod
def get_last_post_date(filename):
"""
Check if the CSV file exists, and if so, return the date of the last entry in the file.
"""
# Check if the file exists
filepath = filename + '.csv'
if not os.path.isfile(filepath):
return None # No file exists, so no date to return
try:
# Open the CSV and retrieve the last post date
with open(filepath, mode="r", encoding="utf-8") as file:
reader = csv.DictReader(file)
rows = list(reader)
if len(rows) == 0:
return None # File exists but is empty
return rows[-1]["date"] # Return the date of the last entry
except Exception as e:
logging.error(f"Error reading CSV file: {e}")
return None
@staticmethod
def write_posts(filename, posts):
"""
Append posts to a CSV file. Create the file if it doesn't exist.
"""
# Check if the file exists
filepath = filename + '.csv'
file_exists = os.path.isfile(filepath)
# Open the file in append mode
with open(filepath, mode="a", newline='', encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=STORAGE_HEADERS)
# If the file does not exist or is empty, write the headers first
if not file_exists or os.stat(filepath).st_size == 0:
writer.writeheader()
# Write the post data to the CSV file
for post in posts:
writer.writerow(post)
class GoogleSheetStorage:
def __init__(self):
self._initialize_google_client()
self.folder_id = self._setup_drive_directory()
def get_last_post_date(self, filename):
"""
Check if the Google Sheet exists in the specified folder, and if so, return the date of the last entry in the sheet.
"""
query = f"mimeType='application/vnd.google-apps.spreadsheet' and name='{filename}' and '{self.folder_id}' in parents and trashed=false"
results = self.drive_service.files().list(q=query, fields="files(id, name)").execute()
items = results.get('files', [])
if not items:
logging.info(f"Google Sheet with name '{filename}' not found. Creating a new sheet.")
# If the sheet does not exist, create it in the specified folder
file_metadata = {
'name': filename,
'mimeType': 'application/vnd.google-apps.spreadsheet',
'parents': [self.folder_id]
}
sheet = self.drive_service.files().create(body=file_metadata, fields='id').execute()
sheet_id = sheet.get('id')
sheet = self.client.open_by_key(sheet_id).sheet1
sheet.append_row(STORAGE_HEADERS)
return None
else:
# Open the existing sheet
sheet_id = items[0]['id']
sheet = self.client.open_by_key(sheet_id).sheet1
records = sheet.get_all_records()
if len(records) == 0:
return None # Sheet exists but is empty
return records[-1]["date"] # Return the date of the last entry
def write_posts(self, filename, posts):
"""
Append posts to a Google Sheet in the specified folder. Create the sheet if it doesn't exist.
"""
query = f"mimeType='application/vnd.google-apps.spreadsheet' and name='{filename}' and '{self.folder_id}' in parents and trashed=false"
results = self.drive_service.files().list(q=query, fields="files(id, name)").execute()
items = results.get('files', [])
if not items:
logging.info(f"Google Sheet with name '{filename}' not found. Creating a new sheet.")
# Create the sheet in the specified folder
file_metadata = {
'name': filename,
'mimeType': 'application/vnd.google-apps.spreadsheet',
'parents': [self.folder_id]
}
sheet = self.drive_service.files().create(body=file_metadata, fields='id').execute()
sheet_id = sheet.get('id')
sheet = self.client.open_by_key(sheet_id).sheet1
sheet.append_row(STORAGE_HEADERS)
else:
sheet_id = items[0]['id']
sheet = self.client.open_by_key(sheet_id).sheet1
# Write the post data to the Google Sheet
for post in posts:
sheet.append_row(
[post["date"], post["title"], post["post_content"], post["post_vote_count"], post["top_comment"],
post["comment_vote_count"], post["url"]])
def _initialize_google_client(self):
"""
Set up OAuth flow to get user credentials and return a Google Sheets client.
"""
flow = InstalledAppFlow.from_client_secrets_file(OAUTH_CREDENTIALS_FILE, SCOPES)
self.creds = flow.run_local_server(port=0)
self.client = gspread.authorize(self.creds)
def _setup_drive_directory(self):
"""
Check if the 'scraper' directory exists in Google Drive, and create it if it does not.
Then, check if the 'reddit' directory exists within 'scraper', and create it if it does not.
"""
self.drive_service = build('drive', 'v3', credentials=self.creds, cache_discovery=False)
scraper_folder_id = self._get_or_create_folder(SCRAPER_FOLDER_NAME)
reddit_folder_id = self._get_or_create_folder(REDDIT_FOLDER_NAME, parent_id=scraper_folder_id)
return reddit_folder_id
def _get_or_create_folder(self, folder_name, parent_id=None):
"""
Check if a folder exists by name and parent, create it if it doesn't exist, and return its ID.
"""
query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and trashed=false"
if parent_id:
query += f" and '{parent_id}' in parents"
results = self.drive_service.files().list(q=query, fields="files(id, name)").execute()
items = results.get('files', [])
if items:
return items[0]['id']
else:
# Create folder
file_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}
if parent_id:
file_metadata['parents'] = [parent_id]
folder = self.drive_service.files().create(body=file_metadata, fields='id').execute()
return folder.get('id')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scrape Reddit posts and save them to a storage file.")
parser.add_argument("subreddit_name", type=str, help="The name of the subreddit to scrape.")
parser.add_argument("--min_upvotes", type=int, default=1,
help="The minimum number of upvotes required for a post to be included (default is 1).")
parser.add_argument("--max_posts", type=int, default=10,
help="The maximum number of posts to retrieve (default is 10).")
parser.add_argument("--storage", type=str, choices=["csv", "gs"], default="csv",
help="The storage to store the data: 'csv' or 'gs' for google_sheet (default is 'csv').")
args = parser.parse_args()
if args.storage == "csv":
storage = CSVStorage()
elif args.storage == "gs":
storage = GoogleSheetStorage()
else:
raise ValueError("Invalid storage option")
get_posts_up_to_date(subreddit_name=args.subreddit_name, min_upvotes=args.min_upvotes,
max_posts=args.max_posts, storage=storage)
# TODO: Make it so that when changing the Headers, I don't have to change the implementations.