|
| 1 | +""" |
| 2 | +Command to migrate course-level notification preferences to account-level preferences. |
| 3 | +""" |
| 4 | +import logging |
| 5 | +from typing import Dict, List, Any, Iterator |
| 6 | +from collections import defaultdict |
| 7 | + |
| 8 | +from django.core.management.base import BaseCommand, CommandParser |
| 9 | +from django.db import transaction |
| 10 | + |
| 11 | +from openedx.core.djangoapps.notifications.email_notifications import EmailCadence |
| 12 | +from openedx.core.djangoapps.notifications.models import CourseNotificationPreference, NotificationPreference |
| 13 | +from openedx.core.djangoapps.notifications.utils import aggregate_notification_configs |
| 14 | + |
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | +DEFAULT_BATCH_SIZE = 1000 |
| 18 | + |
| 19 | + |
| 20 | +class Command(BaseCommand): |
| 21 | + """ |
| 22 | + Migrates course-level notification preferences to account-level notification preferences. |
| 23 | +
|
| 24 | + This command processes users in batches, aggregates their course-level preferences, |
| 25 | + and creates new account-level preferences. It includes a dry-run mode. |
| 26 | + Existing account-level preferences for a processed user will be deleted before |
| 27 | + new ones are created to ensure idempotency. |
| 28 | + """ |
| 29 | + help = "Migrates course-level notification preferences to account-level preferences for all relevant users." |
| 30 | + |
| 31 | + def add_arguments(self, parser: CommandParser): |
| 32 | + parser.add_argument( |
| 33 | + '--batch-size', |
| 34 | + type=int, |
| 35 | + default=DEFAULT_BATCH_SIZE, |
| 36 | + help=f"The number of users to process in each batch. Default: {DEFAULT_BATCH_SIZE}" |
| 37 | + ) |
| 38 | + parser.add_argument( |
| 39 | + '--dry-run', |
| 40 | + action='store_true', |
| 41 | + help="Simulate the migration without making any database changes." |
| 42 | + ) |
| 43 | + |
| 44 | + @staticmethod |
| 45 | + def _get_user_ids_to_process() -> Iterator[int]: |
| 46 | + """ |
| 47 | + Yields all distinct user IDs with course notification preferences. |
| 48 | + """ |
| 49 | + logger.info("Fetching all distinct user IDs with course notification preferences...") |
| 50 | + user_id_queryset = (CourseNotificationPreference |
| 51 | + .objects |
| 52 | + .values_list('user_id', flat=True) |
| 53 | + .distinct()) |
| 54 | + # The iterator with chunk_size is memory efficient for fetching the IDs themselves. |
| 55 | + yield from user_id_queryset.iterator() |
| 56 | + |
| 57 | + @staticmethod |
| 58 | + def _create_preference_object( |
| 59 | + user_id: int, |
| 60 | + app_name: str, |
| 61 | + notification_type: str, |
| 62 | + values: Dict[str, Any] |
| 63 | + ) -> NotificationPreference: |
| 64 | + """ |
| 65 | + Helper function to create a NotificationPreference instance. |
| 66 | + """ |
| 67 | + return NotificationPreference( |
| 68 | + user_id=user_id, |
| 69 | + app=app_name, |
| 70 | + type=notification_type, |
| 71 | + web=values.get('web'), |
| 72 | + email=values.get('email'), |
| 73 | + push=values.get('push'), |
| 74 | + email_cadence=values.get('email_cadence', EmailCadence.DAILY) |
| 75 | + ) |
| 76 | + |
| 77 | + def _create_preferences_from_configs( |
| 78 | + self, |
| 79 | + user_id: int, |
| 80 | + course_preferences_configs: List[Dict] |
| 81 | + ) -> List[NotificationPreference]: |
| 82 | + """ |
| 83 | + Processes a list of preference configs for a single user. |
| 84 | + Returns a list of NotificationPreference objects to be created. |
| 85 | + """ |
| 86 | + new_account_preferences: List[NotificationPreference] = [] |
| 87 | + |
| 88 | + if not course_preferences_configs: |
| 89 | + logger.debug(f"No course preferences found for user {user_id}. Skipping.") |
| 90 | + return new_account_preferences |
| 91 | + |
| 92 | + aggregated_data = aggregate_notification_configs(course_preferences_configs) |
| 93 | + |
| 94 | + for app_name, app_config in aggregated_data.items(): |
| 95 | + if not isinstance(app_config, dict): |
| 96 | + logger.warning( |
| 97 | + f"Malformed app_config for app '{app_name}' for user {user_id}. " |
| 98 | + f"Expected dict, got {type(app_config)}. Skipping app." |
| 99 | + ) |
| 100 | + continue |
| 101 | + |
| 102 | + notif_types = app_config.get('notification_types', {}) |
| 103 | + if not isinstance(notif_types, dict): |
| 104 | + logger.warning( |
| 105 | + f"Malformed 'notification_types' for app '{app_name}' for user {user_id}. Expected dict, " |
| 106 | + f"got {type(notif_types)}. Skipping notification_types." |
| 107 | + ) |
| 108 | + continue |
| 109 | + |
| 110 | + # Handle regular notification types |
| 111 | + for notification_type, values in notif_types.items(): |
| 112 | + if notification_type == 'core': |
| 113 | + continue |
| 114 | + if values is None or not isinstance(values, dict): |
| 115 | + logger.warning( |
| 116 | + f"Skipping malformed notification type data for '{notification_type}' " |
| 117 | + f"in app '{app_name}' for user {user_id}." |
| 118 | + ) |
| 119 | + continue |
| 120 | + new_account_preferences.append( |
| 121 | + self._create_preference_object(user_id, app_name, notification_type, values) |
| 122 | + ) |
| 123 | + |
| 124 | + # Handle core notification types |
| 125 | + core_types_list = app_config.get('core_notification_types', []) |
| 126 | + if not isinstance(core_types_list, list): |
| 127 | + logger.warning( |
| 128 | + f"Malformed 'core_notification_types' for app '{app_name}' for user {user_id}. " |
| 129 | + f"Expected list, got {type(core_types_list)}. Skipping core_notification_types." |
| 130 | + ) |
| 131 | + continue |
| 132 | + |
| 133 | + core_values = notif_types.get('core', {}) |
| 134 | + if not isinstance(core_values, dict): |
| 135 | + logger.warning( |
| 136 | + f"Malformed values for 'core' notification types in app '{app_name}' for user {user_id}. " |
| 137 | + f"Expected dict, got {type(core_values)}. Using empty defaults." |
| 138 | + ) |
| 139 | + core_values = {} |
| 140 | + |
| 141 | + for core_type_name in core_types_list: |
| 142 | + if core_type_name is None or not isinstance(core_type_name, str): |
| 143 | + logger.warning( |
| 144 | + f"Skipping malformed core_type_name: '{core_type_name}' in app '{app_name}' for user {user_id}." |
| 145 | + ) |
| 146 | + continue |
| 147 | + new_account_preferences.append( |
| 148 | + self._create_preference_object(user_id, app_name, core_type_name, core_values) |
| 149 | + ) |
| 150 | + return new_account_preferences |
| 151 | + |
| 152 | + def _process_batch(self, user_ids: List[int]) -> List[NotificationPreference]: |
| 153 | + """ |
| 154 | + Fetches all preferences for a batch of users and processes them. |
| 155 | + """ |
| 156 | + all_new_preferences: List[NotificationPreference] = [] |
| 157 | + |
| 158 | + # 1. Fetch all preference data for the batch in a single query. |
| 159 | + course_prefs = CourseNotificationPreference.objects.filter( |
| 160 | + user_id__in=user_ids |
| 161 | + ).values('user_id', 'notification_preference_config') |
| 162 | + |
| 163 | + # 2. Group the fetched data by user_id in memory. |
| 164 | + prefs_by_user = defaultdict(list) |
| 165 | + for pref in course_prefs: |
| 166 | + prefs_by_user[pref['user_id']].append(pref['notification_preference_config']) |
| 167 | + |
| 168 | + # 3. Process each user's grouped data. |
| 169 | + for user_id, configs in prefs_by_user.items(): |
| 170 | + user_new_preferences = self._create_preferences_from_configs(user_id, configs) |
| 171 | + if user_new_preferences: |
| 172 | + all_new_preferences.extend(user_new_preferences) |
| 173 | + logger.debug(f"User {user_id}: Aggregated {len(configs)} course preferences " |
| 174 | + f"into {len(user_new_preferences)} account preferences.") |
| 175 | + else: |
| 176 | + logger.debug(f"User {user_id}: No account preferences generated from {len(configs)} " |
| 177 | + f"course preferences.") |
| 178 | + |
| 179 | + return all_new_preferences |
| 180 | + |
| 181 | + def handle(self, *args: Any, **options: Any): |
| 182 | + dry_run = options['dry_run'] |
| 183 | + batch_size = options['batch_size'] |
| 184 | + |
| 185 | + if dry_run: |
| 186 | + logger.info(self.style.WARNING("Performing a DRY RUN. No changes will be made to the database.")) |
| 187 | + else: |
| 188 | + # Clear all existing preferences once at the beginning. |
| 189 | + # This is more efficient and safer than deleting per-user. |
| 190 | + NotificationPreference.objects.all().delete() |
| 191 | + logger.info('Cleared all existing account-level notification preferences.') |
| 192 | + |
| 193 | + user_id_iterator = self._get_user_ids_to_process() |
| 194 | + |
| 195 | + user_id_batch: List[int] = [] |
| 196 | + total_users_processed = 0 |
| 197 | + total_preferences_created = 0 |
| 198 | + |
| 199 | + for user_id in user_id_iterator: |
| 200 | + user_id_batch.append(user_id) |
| 201 | + |
| 202 | + if len(user_id_batch) >= batch_size: |
| 203 | + try: |
| 204 | + with transaction.atomic(): |
| 205 | + # Process the entire batch of users |
| 206 | + preferences_to_create = self._process_batch(user_id_batch) |
| 207 | + |
| 208 | + if preferences_to_create: |
| 209 | + if not dry_run: |
| 210 | + NotificationPreference.objects.bulk_create(preferences_to_create) |
| 211 | + |
| 212 | + total_preferences_created += len(preferences_to_create) |
| 213 | + logger.info( |
| 214 | + self.style.SUCCESS( |
| 215 | + f"Batch complete. {'Would create' if dry_run else 'Created'} " |
| 216 | + f"{len(preferences_to_create)} preferences for {len(user_id_batch)} users." |
| 217 | + ) |
| 218 | + ) |
| 219 | + else: |
| 220 | + logger.info(f"Batch complete. No preferences to create for {len(user_id_batch)} users.") |
| 221 | + |
| 222 | + total_users_processed += len(user_id_batch) |
| 223 | + user_id_batch = [] # Reset the batch |
| 224 | + |
| 225 | + except Exception as e: # pylint: disable=broad-except |
| 226 | + logger.error(f"Failed to process batch containing users {user_id_batch}: {e}", exc_info=True) |
| 227 | + # The transaction for the whole batch will be rolled back. |
| 228 | + # Clear the batch to continue with the next set of users. |
| 229 | + user_id_batch = [] |
| 230 | + |
| 231 | + if total_users_processed > 0 and total_users_processed % (batch_size * 5) == 0: |
| 232 | + logger.info(f"PROGRESS: Total users processed so far: {total_users_processed}. " |
| 233 | + f"Total preferences {'would be' if dry_run else ''} " |
| 234 | + f"created: {total_preferences_created}") |
| 235 | + |
| 236 | + # Process any remaining users in the last, smaller batch |
| 237 | + if user_id_batch: |
| 238 | + try: |
| 239 | + with transaction.atomic(): |
| 240 | + preferences_to_create = self._process_batch(user_id_batch) |
| 241 | + if preferences_to_create: |
| 242 | + if not dry_run: |
| 243 | + NotificationPreference.objects.bulk_create(preferences_to_create) |
| 244 | + total_preferences_created += len(preferences_to_create) |
| 245 | + logger.info( |
| 246 | + self.style.SUCCESS( |
| 247 | + f"Final batch complete. {'Would create' if dry_run else 'Created'} " |
| 248 | + f"{len(preferences_to_create)} preferences for {len(user_id_batch)} users." |
| 249 | + ) |
| 250 | + ) |
| 251 | + total_users_processed += len(user_id_batch) |
| 252 | + except Exception as e: # pylint: disable=broad-except |
| 253 | + logger.error(f"Failed to process final batch of users {user_id_batch}: {e}", exc_info=True) |
| 254 | + |
| 255 | + logger.info( |
| 256 | + self.style.SUCCESS( |
| 257 | + f"Migration complete. Processed {total_users_processed} users. " |
| 258 | + f"{'Would have created' if dry_run else 'Created'} a total of {total_preferences_created} " |
| 259 | + f"account-level preferences." |
| 260 | + ) |
| 261 | + ) |
| 262 | + if dry_run: |
| 263 | + logger.info(self.style.WARNING("DRY RUN finished. No actual changes were made.")) |
0 commit comments