1# Copyright (C) 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15r"""Read a MultiCarrierSettings file and update CarrierSettings data. 16 17For APNs in the input file, they are simply appended to the apn list of the 18corresponding carrier in CarrierSettings data. If a new carrier (identified by 19canonical_name) appears in input, the other_carriers.textpb will be updated. 20 21How to run: 22 23update_carrier_data.par \ 24--in_file=/tmp/tmpapns.textpb \ 25--data_dir=/tmp/carrier/data 26""" 27 28from __future__ import absolute_import 29from __future__ import division 30from __future__ import print_function 31import argparse 32import copy 33import os 34import compare 35from google.protobuf import text_format 36import carrier_list_pb2 37import carrier_settings_pb2 38 39parser = argparse.ArgumentParser() 40parser.add_argument( 41 '--data_dir', default='./data', help='Folder path for CarrierSettings data') 42parser.add_argument( 43 '--in_file', default='./tmpapns.textpb', help='Temp APN file') 44FLAGS = parser.parse_args() 45 46TIER1_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'tier1_carriers.textpb') 47OTHER_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'other_carriers.textpb') 48 49 50def equals_apn(a, b): 51 """Tell if two ApnItem proto are the same.""" 52 a = compare.NormalizeRepeatedFields(copy.deepcopy(a)) 53 b = compare.NormalizeRepeatedFields(copy.deepcopy(b)) 54 # ignore 'name' field 55 a.ClearField('name') 56 b.ClearField('name') 57 return compare.Proto2Equals(a, b) 58 59 60def find_apn(apn, apns): 61 """Tell if apn is in apns.""" 62 for a in apns: 63 if equals_apn(apn, a): 64 return True 65 return False 66 67 68def merge_mms_apn(a, b): 69 """Try to merge mmsc fields of b into a, if that's the only diff.""" 70 aa = compare.NormalizeRepeatedFields(copy.deepcopy(a)) 71 bb = compare.NormalizeRepeatedFields(copy.deepcopy(b)) 72 # check if any fields other than mms are different 73 for field in ['name', 'mmsc_proxy', 'mmsc_proxy_port']: 74 aa.ClearField(field) 75 bb.ClearField(field) 76 if compare.Proto2Equals(aa, bb): 77 for field in ['mmsc_proxy', 'mmsc_proxy_port']: 78 if b.HasField(field): 79 setattr(a, field, getattr(b, field)) 80 81 82def clean_apn(setting): 83 """Remove duplicated ApnItems from a CarrierSettings proto. 84 85 Args: 86 setting: a CarrierSettings proto 87 88 Returns: 89 None 90 """ 91 if not setting.HasField('apns') or len(setting.apns.apn) <= 1: 92 return 93 apns = setting.apns.apn[:] 94 cleaned_apns = [a for n, a in enumerate(apns) if not find_apn(a, apns[:n])] 95 del setting.apns.apn[:] 96 setting.apns.apn.extend(cleaned_apns) 97 98 99def merge_apns(dest_apns, source_apns): 100 """Merge source_apns into dest_apns.""" 101 for apn in dest_apns: 102 for source in source_apns: 103 merge_mms_apn(apn, source) 104 ret = list(dest_apns) 105 for source in source_apns: 106 if not find_apn(source, ret): 107 ret.append(source) 108 return ret 109 110 111def to_string(cid): 112 """Return the string representation of a CarrierId.""" 113 ret = cid.mcc_mnc 114 if cid.HasField('spn'): 115 ret += 'SPN=' + cid.spn.upper() 116 if cid.HasField('imsi'): 117 ret += 'IMSI=' + cid.imsi.upper() 118 if cid.HasField('gid1'): 119 ret += 'GID1=' + cid.gid1.upper() 120 return ret 121 122 123def to_carrier_id(cid_string): 124 """Return the CarrierId from its string representation.""" 125 cid = carrier_list_pb2.CarrierId() 126 if 'SPN=' in cid_string: 127 ind = cid_string.find('SPN=') 128 cid.mcc_mnc = cid_string[:ind] 129 cid.spn = cid_string[ind + len('SPN='):] 130 elif 'IMSI=' in cid_string: 131 ind = cid_string.find('IMSI=') 132 cid.mcc_mnc = cid_string[:ind] 133 cid.imsi = cid_string[ind + len('IMSI='):] 134 elif 'GID1=' in cid_string: 135 ind = cid_string.find('GID1=') 136 cid.mcc_mnc = cid_string[:ind] 137 cid.gid1 = cid_string[ind + len('GID1='):] 138 else: 139 cid.mcc_mnc = cid_string 140 return cid 141 142 143def get_input(path): 144 """Read input MultiCarrierSettings textpb file. 145 146 Args: 147 path: the path to input MultiCarrierSettings textpb file 148 149 Returns: 150 A MultiCarrierSettings. None when failed. 151 """ 152 mcs = None 153 with open(path, 'r', encoding='utf-8') as f: 154 mcs = carrier_settings_pb2.MultiCarrierSettings() 155 text_format.Merge(f.read(), mcs) 156 157 return mcs 158 159 160def get_knowncarriers(files): 161 """Create a mapping from mccmnc and possible mvno data to canonical name. 162 163 Args: 164 files: list of paths to carrier list textpb files 165 166 Returns: 167 A dict, key is to_string(carrier_id), value is cname. 168 """ 169 ret = dict() 170 for path in files: 171 with open(path, 'r', encoding='utf-8') as f: 172 carriers = carrier_list_pb2.CarrierList() 173 text_format.Merge(f.read(), carriers) 174 for carriermap in carriers.entry: 175 for cid in carriermap.carrier_id: 176 ret[to_string(cid)] = carriermap.canonical_name 177 178 return ret 179 180 181def clear_apn_fields_in_default_value(carrier_settings): 182 183 def clean(apn): 184 if apn.HasField('bearer_bitmask') and apn.bearer_bitmask == '0': 185 apn.ClearField('bearer_bitmask') 186 return apn 187 188 for apn in carrier_settings.apns.apn: 189 clean(apn) 190 return carrier_settings 191 192 193def merge_carrier_settings(patch, carrier_file): 194 """Merge a CarrierSettings into a base CarrierSettings in textpb file. 195 196 This function merge apns only. It assumes that the patch and base have the 197 same canonical_name. 198 199 Args: 200 patch: the carrier_settings to be merged 201 carrier_file: the path to the base carrier_settings file 202 """ 203 # Load base 204 with open(carrier_file, 'r', encoding='utf-8') as f: 205 base_setting = text_format.ParseLines(f, 206 carrier_settings_pb2.CarrierSettings()) 207 208 clean_apn(patch) 209 clean_apn(base_setting) 210 211 # Merge apns 212 apns = base_setting.apns.apn[:] 213 apns = merge_apns(apns, patch.apns.apn[:]) 214 del base_setting.apns.apn[:] 215 base_setting.apns.apn.extend(apns) 216 217 # Write back 218 with open(carrier_file, 'w', encoding='utf-8') as f: 219 text_format.PrintMessage(base_setting, f, as_utf8=True) 220 221 222def merge_multi_carrier_settings(patch_list, carrier_file): 223 """Merge CarrierSettings into a base MultiCarrierSettings in textpb file. 224 225 This function merge apns only. The base may or may not contains an entry with 226 the same canonical_name as the patch. 227 228 Args: 229 patch_list: a list of CarrierSettings to be merged 230 carrier_file: the path to the base MultiCarrierSettings file 231 """ 232 # Load base 233 with open(carrier_file, 'r', encoding='utf-8') as f: 234 base_settings = text_format.ParseLines( 235 f, carrier_settings_pb2.MultiCarrierSettings()) 236 237 for patch in patch_list: 238 clean_apn(patch) 239 # find the (first and only) entry with patch.canonical_name and update it. 240 for setting in base_settings.setting: 241 if setting.canonical_name == patch.canonical_name: 242 clean_apn(setting) 243 apns = setting.apns.apn[:] 244 apns = merge_apns(apns, patch.apns.apn[:]) 245 del setting.apns.apn[:] 246 setting.apns.apn.extend(apns) 247 break 248 # Or if no match, append it to base_settings 249 else: 250 base_settings.setting.extend([patch]) 251 252 # Write back 253 with open(carrier_file, 'w', encoding='utf-8') as f: 254 text_format.PrintMessage(base_settings, f, as_utf8=True) 255 256 257def add_new_carriers(cnames, carrier_list_file): 258 """Add a new carrier into a CarrierList in textpb file. 259 260 The carrier_id of the new carrier is induced from the cname, assuming 261 that the cname is constructed by to_string. 262 263 Args: 264 cnames: a list of canonical_name of new carriers 265 carrier_list_file: the path to the CarrierList textpb file 266 267 Returns: 268 None 269 """ 270 with open(carrier_list_file, 'r', encoding='utf-8') as f: 271 carriers = text_format.ParseLines(f, carrier_list_pb2.CarrierList()) 272 273 for cname in cnames: 274 # Append the new carrier 275 new_carrier = carriers.entry.add() 276 new_carrier.canonical_name = cname 277 new_carrier.carrier_id.extend([to_carrier_id(cname)]) 278 279 tmp = sorted(carriers.entry, key=lambda c: c.canonical_name) 280 del carriers.entry[:] 281 carriers.entry.extend(tmp) 282 283 with open(carrier_list_file, 'w', encoding='utf-8') as f: 284 text_format.PrintMessage(carriers, f, as_utf8=True) 285 286 287def add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers): 288 """Add APNs for carriers in others.textpb that doesn't have APNs, by mccmnc. 289 290 If a carrier defined as mccmnc + mvno_data doesn't hava APNs, it should use 291 APNs from the carrier defined as mccmnc only. 292 293 Modifies others.textpb file in-place. 294 295 Args: 296 apns: a list of CarrierSettings message with APNs only. 297 tier1_carriers: parsed tier-1 carriers list; must not contain new carriers. 298 A dict, key is to_string(carrier_id), value is cname. 299 other_carriers: parsed other carriers list; must not contain new carriers. A 300 dict, key is to_string(carrier_id), value is cname. 301 """ 302 # Convert apns from a list to a map, key being the canonical_name 303 apns_dict = { 304 carrier_settings.canonical_name: carrier_settings 305 for carrier_settings in apns 306 } 307 308 others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir 309 with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: 310 others = text_format.ParseLines(others_textpb_file, 311 carrier_settings_pb2.MultiCarrierSettings()) 312 313 for setting in others.setting: 314 if not setting.HasField('apns'): 315 carrier_id = to_carrier_id(setting.canonical_name) 316 if carrier_id.HasField('mvno_data'): 317 carrier_id.ClearField('mvno_data') 318 carrier_id_str_of_mccmnc = to_string(carrier_id) 319 cname_of_mccmnc = tier1_carriers.get( 320 carrier_id_str_of_mccmnc) or other_carriers.get( 321 carrier_id_str_of_mccmnc) 322 if cname_of_mccmnc: 323 apn = apns_dict.get(cname_of_mccmnc) 324 if apn: 325 setting.apns.CopyFrom(apn.apns) 326 327 with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: 328 text_format.PrintMessage(others, others_textpb_file, as_utf8=True) 329 330 331def add_carrierconfig_for_new_carriers(cnames, tier1_carriers, other_carriers): 332 """Add carrier configs for new (non-tier-1) carriers. 333 334 For new carriers, ie. carriers existing in APN but not CarrierConfig: 335 - for <mccmnc>: copy carrier config of <mcc>. 336 - for <mccmnc>(GID1|SPN|IMSI)=<mvnodata>: copy carrier config of <mccmnc>, 337 or <mcc>. 338 339 Modifies others.textpb file in-place. 340 341 Args: 342 cnames: a list of canonical_name of new carriers. 343 tier1_carriers: parsed tier-1 carriers list; must not contain new carriers. 344 A dict, key is to_string(carrier_id), value is cname. 345 other_carriers: parsed other carriers list; must not contain new carriers. A 346 dict, key is to_string(carrier_id), value is cname. 347 """ 348 carrier_configs_map = {} 349 350 others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir 351 with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: 352 others = text_format.ParseLines(others_textpb_file, 353 carrier_settings_pb2.MultiCarrierSettings()) 354 for setting in others.setting: 355 if setting.canonical_name in other_carriers: 356 carrier_configs_map[setting.canonical_name] = setting.configs 357 for cid_str, cname in tier1_carriers.items(): 358 tier1_textpb = '%s/setting/%s.textpb' % (FLAGS.data_dir, cname) 359 with open(tier1_textpb, 'r', encoding='utf-8') as tier1_textpb_file: 360 tier1 = text_format.ParseLines(tier1_textpb_file, 361 carrier_settings_pb2.CarrierSettings()) 362 carrier_configs_map[cid_str] = tier1.configs 363 364 for setting in others.setting: 365 if setting.canonical_name in cnames: 366 carrier_id = to_carrier_id(setting.canonical_name) 367 mccmnc = carrier_id.mcc_mnc 368 mcc = mccmnc[:3] 369 if mccmnc in carrier_configs_map: 370 setting.configs.config.extend(carrier_configs_map[mccmnc].config[:]) 371 elif mcc in carrier_configs_map: 372 setting.configs.config.extend(carrier_configs_map[mcc].config[:]) 373 374 with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: 375 text_format.PrintMessage(others, others_textpb_file, as_utf8=True) 376 377 378def cleanup_mcc_only_carriers(): 379 """Removes mcc-only carriers from other_carriers.textpb & others.textpb. 380 381 Modifies other_carriers.textpb file & others.textpb file in-place. 382 """ 383 mcc_only_carriers = set() 384 385 with open( 386 OTHER_CARRIERS_TEXTPB, 'r', 387 encoding='utf-8') as other_carriers_textpb_file: 388 other_carriers = text_format.ParseLines(other_carriers_textpb_file, 389 carrier_list_pb2.CarrierList()) 390 391 other_carriers_entry_with_mccmnc = [] 392 for carrier in other_carriers.entry: 393 for carrier_id in carrier.carrier_id: 394 if len(carrier_id.mcc_mnc) == 3: 395 mcc_only_carriers.add(carrier.canonical_name) 396 else: 397 other_carriers_entry_with_mccmnc.append(carrier) 398 del other_carriers.entry[:] 399 other_carriers.entry.extend(other_carriers_entry_with_mccmnc) 400 401 # Finish early if no mcc_only_carriers; that means no file modification 402 # required. 403 if not mcc_only_carriers: 404 return 405 406 with open( 407 OTHER_CARRIERS_TEXTPB, 'w', 408 encoding='utf-8') as other_carriers_textpb_file: 409 text_format.PrintMessage( 410 other_carriers, other_carriers_textpb_file, as_utf8=True) 411 412 others_textpb = os.path.join(FLAGS.data_dir, 'setting', 'others.textpb') 413 with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file: 414 others = text_format.ParseLines(others_textpb_file, 415 carrier_settings_pb2.MultiCarrierSettings()) 416 copy_others_setting = others.setting[:] 417 del others.setting[:] 418 others.setting.extend([ 419 setting for setting in copy_others_setting 420 if setting.canonical_name not in mcc_only_carriers 421 ]) 422 423 with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file: 424 text_format.PrintMessage(others, others_textpb_file, as_utf8=True) 425 426 427def main(): 428 apns = get_input(FLAGS.in_file).setting 429 tier1_carriers = get_knowncarriers([TIER1_CARRIERS_TEXTPB]) 430 other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB]) 431 new_carriers = [] 432 433 # Step 1a: merge APNs into CarrierConfigs by canonical name. 434 # Also find out "new carriers" existing in APNs but not in CarrierConfigs. 435 other_carriers_patch = [] 436 for carrier_settings in apns: 437 carrier_settings = clear_apn_fields_in_default_value(carrier_settings) 438 439 cname = carrier_settings.canonical_name 440 if cname in tier1_carriers.values(): 441 merge_carrier_settings(carrier_settings, 442 '%s/setting/%s.textpb' % (FLAGS.data_dir, cname)) 443 else: 444 other_carriers_patch.append(carrier_settings) 445 if cname not in other_carriers.values(): 446 new_carriers.append(cname) 447 448 merge_multi_carrier_settings(other_carriers_patch, 449 '%s/setting/others.textpb' % FLAGS.data_dir) 450 451 # Step 1b: populate carrier configs for new carriers. 452 add_carrierconfig_for_new_carriers(new_carriers, tier1_carriers, 453 other_carriers) 454 455 # Step 2: merge new carriers into non-tier1 carrier list. 456 add_new_carriers(new_carriers, OTHER_CARRIERS_TEXTPB) 457 # Update other_carriers map 458 other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB]) 459 460 # Step 3: merge APNs into CarrierConfigs by mccmnc: for a carrier defined 461 # as mccmnc + gid/spn/imsi, if it doesn't have any APNs, it should use APNs 462 # from carrier defined as mccmnc only. 463 # Only handle non-tier1 carriers, as tier1 carriers are assumed to be better 464 # maintained and are already having APNs defined. 465 add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers) 466 467 # Step 4: clean up mcc-only carriers; they're used in step 3 but should not 468 # be in final carrier settings to avoid confusing CarrierSettings app. 469 cleanup_mcc_only_carriers() 470 471 472if __name__ == '__main__': 473 main() 474