transfer @zwaxman's code from drive

This commit is contained in:
2026-03-04 14:10:10 +00:00
commit 4399945a7b
2 changed files with 293 additions and 0 deletions

254
classify_members.py Normal file
View File

@@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""
Member Classification Script
Classifies members from an email list into three categories:
- not-in-good-standing: Members whose status is not "Member in Good Standing"
- existing-member: Members in good standing who already exist in the member database
- new-member: Members in good standing who are not yet in the member database
"""
import csv
import argparse
import os
import re
from pathlib import Path
from collections import defaultdict
def extract_date_from_filename(filename):
"""
Extract date string from filename (e.g., "20FEB2026" from "AllMembersEmails-20FEB2026.csv").
Args:
filename: Input filename
Returns:
Date string (e.g., "20FEB2026") or None if not found
"""
# Pattern to match date format like "20FEB2026" or "20FEB2026" in filename
# Looks for pattern: digits + 3 letters + digits
pattern = r'(\d{1,2}[A-Z]{3}\d{4})'
match = re.search(pattern, filename, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
def load_member_emails(member_db_path):
"""
Load email addresses from the member database CSV file.
Skips the first descriptive line and reads from the header row.
Args:
member_db_path: Path to the member database CSV file
Returns:
set: Set of email addresses (lowercase for case-insensitive matching)
"""
member_emails = set()
try:
with open(member_db_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
# Skip the first descriptive line
next(reader, None)
# Read header row
headers = next(reader, None)
if not headers:
print(f"Warning: {member_db_path} appears to be empty or malformed")
return member_emails
# Find the "Email address" column index
try:
email_col_index = headers.index('Email address')
except ValueError:
print(f"Error: 'Email address' column not found in {member_db_path}")
print(f"Available columns: {headers}")
return member_emails
# Read all email addresses
for row in reader:
if len(row) > email_col_index and row[email_col_index]:
email = row[email_col_index].strip().lower()
if email:
member_emails.add(email)
print(f"Loaded {len(member_emails)} email addresses from member database")
return member_emails
except FileNotFoundError:
print(f"Error: File not found: {member_db_path}")
return member_emails
except Exception as e:
print(f"Error reading {member_db_path}: {e}")
return member_emails
def classify_members(all_members_path, member_db_path):
"""
Classify members from the email list into three categories.
Args:
all_members_path: Path to the all members email CSV file
member_db_path: Path to the member database CSV file
Returns:
tuple: (not_in_good_standing, existing_members, new_members, date_string)
Each is a list of dictionaries representing CSV rows
"""
# Load existing member emails
member_emails = load_member_emails(member_db_path)
# Extract date from filename
date_string = extract_date_from_filename(os.path.basename(all_members_path))
if not date_string:
print(f"Warning: Could not extract date from filename: {all_members_path}")
date_string = "UNKNOWN_DATE"
# Classify members
not_in_good_standing = []
existing_members = []
new_members = []
try:
with open(all_members_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
# Verify required columns exist
if 'email' not in reader.fieldnames or 'membership_status' not in reader.fieldnames:
print(f"Error: Required columns not found. Expected 'email' and 'membership_status'")
print(f"Found columns: {reader.fieldnames}")
return [], [], [], date_string
row_count = 0
for row in reader:
row_count += 1
email = row.get('email', '').strip()
membership_status = row.get('membership_status', '').strip()
# Check membership status
if membership_status != 'Member in Good Standing':
not_in_good_standing.append(row)
else:
# Check if email exists in member database (case-insensitive)
email_lower = email.lower()
if email_lower in member_emails:
existing_members.append(row)
else:
new_members.append(row)
print(f"Processed {row_count} rows")
print(f" - Not in good standing: {len(not_in_good_standing)}")
print(f" - Existing members: {len(existing_members)}")
print(f" - New members: {len(new_members)}")
except FileNotFoundError:
print(f"Error: File not found: {all_members_path}")
return [], [], [], date_string
except Exception as e:
print(f"Error reading {all_members_path}: {e}")
return [], [], [], date_string
return not_in_good_standing, existing_members, new_members, date_string
def write_csv_file(data, output_path, fieldnames):
"""
Write data to a CSV file.
Args:
data: List of dictionaries representing rows
output_path: Path to output CSV file
fieldnames: List of column names
"""
try:
with open(output_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f" Written: {output_path} ({len(data)} rows)")
except Exception as e:
print(f"Error writing {output_path}: {e}")
def main():
"""Main function to run the member classification script."""
parser = argparse.ArgumentParser(
description='Classify members into not-in-good-standing, existing-member, and new-member categories'
)
parser.add_argument(
'--all-members',
default='AllMembersEmails-20FEB2026.csv',
help='Path to the all members email CSV file (default: AllMembersEmails-20FEB2026.csv)'
)
parser.add_argument(
'--member-db',
default='nnjdsamembers.csv',
help='Path to the member database CSV file (default: nnjdsamembers.csv)'
)
args = parser.parse_args()
print("=" * 60)
print("Member Classification Script")
print("=" * 60)
print(f"All members file: {args.all_members}")
print(f"Member database: {args.member_db}")
print()
# Classify members
not_in_good_standing, existing_members, new_members, date_string = classify_members(
args.all_members,
args.member_db
)
if not date_string:
print("Error: Could not determine output directory name")
return
# Create output directory
output_dir = Path(date_string)
output_dir.mkdir(exist_ok=True)
print(f"\nOutput directory: {output_dir}/")
# Get fieldnames from the first row (all should have same structure)
fieldnames = None
if not_in_good_standing:
fieldnames = list(not_in_good_standing[0].keys())
elif existing_members:
fieldnames = list(existing_members[0].keys())
elif new_members:
fieldnames = list(new_members[0].keys())
else:
print("Warning: No data to write")
return
# Write output files
print("\nWriting output files...")
write_csv_file(
not_in_good_standing,
output_dir / f"{date_string}_not-in-good-standing.csv",
fieldnames
)
write_csv_file(
existing_members,
output_dir / f"{date_string}_existing-member.csv",
fieldnames
)
write_csv_file(
new_members,
output_dir / f"{date_string}_new-member.csv",
fieldnames
)
print("\n" + "=" * 60)
print("Classification complete!")
print("=" * 60)
if __name__ == '__main__':
main()