1# Copyright (C) 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15r"""Read a MultiCarrierSettings file and update CarrierSettings data.
16
17For APNs in the input file, they are simply appended to the apn list of the
18corresponding carrier in CarrierSettings data. If a new carrier (identified by
19canonical_name) appears in input, the other_carriers.textpb will be updated.
20
21How to run:
22
23update_carrier_data.par \
24--in_file=/tmp/tmpapns.textpb \
25--data_dir=/tmp/carrier/data
26"""
27
28from __future__ import absolute_import
29from __future__ import division
30from __future__ import print_function
31import argparse
32import copy
33import os
34import compare
35from google.protobuf import text_format
36import carrier_list_pb2
37import carrier_settings_pb2
38
39parser = argparse.ArgumentParser()
40parser.add_argument(
41    '--data_dir', default='./data', help='Folder path for CarrierSettings data')
42parser.add_argument(
43    '--in_file', default='./tmpapns.textpb', help='Temp APN file')
44FLAGS = parser.parse_args()
45
46TIER1_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'tier1_carriers.textpb')
47OTHER_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'other_carriers.textpb')
48
49
50def equals_apn(a, b):
51  """Tell if two ApnItem proto are the same."""
52  a = compare.NormalizeRepeatedFields(copy.deepcopy(a))
53  b = compare.NormalizeRepeatedFields(copy.deepcopy(b))
54  # ignore 'name' field
55  a.ClearField('name')
56  b.ClearField('name')
57  return compare.Proto2Equals(a, b)
58
59
60def find_apn(apn, apns):
61  """Tell if apn is in apns."""
62  for a in apns:
63    if equals_apn(apn, a):
64      return True
65  return False
66
67
68def merge_mms_apn(a, b):
69  """Try to merge mmsc fields of b into a, if that's the only diff."""
70  aa = compare.NormalizeRepeatedFields(copy.deepcopy(a))
71  bb = compare.NormalizeRepeatedFields(copy.deepcopy(b))
72  # check if any fields other than mms are different
73  for field in ['name', 'mmsc_proxy', 'mmsc_proxy_port']:
74    aa.ClearField(field)
75    bb.ClearField(field)
76  if compare.Proto2Equals(aa, bb):
77    for field in ['mmsc_proxy', 'mmsc_proxy_port']:
78      if b.HasField(field):
79        setattr(a, field, getattr(b, field))
80
81
82def clean_apn(setting):
83  """Remove duplicated ApnItems from a CarrierSettings proto.
84
85  Args:
86    setting: a CarrierSettings proto
87
88  Returns:
89    None
90  """
91  if not setting.HasField('apns') or len(setting.apns.apn) <= 1:
92    return
93  apns = setting.apns.apn[:]
94  cleaned_apns = [a for n, a in enumerate(apns) if not find_apn(a, apns[:n])]
95  del setting.apns.apn[:]
96  setting.apns.apn.extend(cleaned_apns)
97
98
99def merge_apns(dest_apns, source_apns):
100  """Merge source_apns into dest_apns."""
101  for apn in dest_apns:
102    for source in source_apns:
103      merge_mms_apn(apn, source)
104  ret = list(dest_apns)
105  for source in source_apns:
106    if not find_apn(source, ret):
107      ret.append(source)
108  return ret
109
110
111def to_string(cid):
112  """Return the string representation of a CarrierId."""
113  ret = cid.mcc_mnc
114  if cid.HasField('spn'):
115    ret += 'SPN=' + cid.spn.upper()
116  if cid.HasField('imsi'):
117    ret += 'IMSI=' + cid.imsi.upper()
118  if cid.HasField('gid1'):
119    ret += 'GID1=' + cid.gid1.upper()
120  return ret
121
122
123def to_carrier_id(cid_string):
124  """Return the CarrierId from its string representation."""
125  cid = carrier_list_pb2.CarrierId()
126  if 'SPN=' in cid_string:
127    ind = cid_string.find('SPN=')
128    cid.mcc_mnc = cid_string[:ind]
129    cid.spn = cid_string[ind + len('SPN='):]
130  elif 'IMSI=' in cid_string:
131    ind = cid_string.find('IMSI=')
132    cid.mcc_mnc = cid_string[:ind]
133    cid.imsi = cid_string[ind + len('IMSI='):]
134  elif 'GID1=' in cid_string:
135    ind = cid_string.find('GID1=')
136    cid.mcc_mnc = cid_string[:ind]
137    cid.gid1 = cid_string[ind + len('GID1='):]
138  else:
139    cid.mcc_mnc = cid_string
140  return cid
141
142
143def get_input(path):
144  """Read input MultiCarrierSettings textpb file.
145
146  Args:
147    path: the path to input MultiCarrierSettings textpb file
148
149  Returns:
150    A MultiCarrierSettings. None when failed.
151  """
152  mcs = None
153  with open(path, 'r', encoding='utf-8') as f:
154    mcs = carrier_settings_pb2.MultiCarrierSettings()
155    text_format.Merge(f.read(), mcs)
156
157  return mcs
158
159
160def get_knowncarriers(files):
161  """Create a mapping from mccmnc and possible mvno data to canonical name.
162
163  Args:
164    files: list of paths to carrier list textpb files
165
166  Returns:
167    A dict, key is to_string(carrier_id), value is cname.
168  """
169  ret = dict()
170  for path in files:
171    with open(path, 'r', encoding='utf-8') as f:
172      carriers = carrier_list_pb2.CarrierList()
173      text_format.Merge(f.read(), carriers)
174      for carriermap in carriers.entry:
175        for cid in carriermap.carrier_id:
176          ret[to_string(cid)] = carriermap.canonical_name
177
178  return ret
179
180
181def clear_apn_fields_in_default_value(carrier_settings):
182
183  def clean(apn):
184    if apn.HasField('bearer_bitmask') and apn.bearer_bitmask == '0':
185      apn.ClearField('bearer_bitmask')
186    return apn
187
188  for apn in carrier_settings.apns.apn:
189    clean(apn)
190  return carrier_settings
191
192
193def merge_carrier_settings(patch, carrier_file):
194  """Merge a CarrierSettings into a base CarrierSettings in textpb file.
195
196  This function merge apns only. It assumes that the patch and base have the
197  same canonical_name.
198
199  Args:
200    patch: the carrier_settings to be merged
201    carrier_file: the path to the base carrier_settings file
202  """
203  # Load base
204  with open(carrier_file, 'r', encoding='utf-8') as f:
205    base_setting = text_format.ParseLines(f,
206                                          carrier_settings_pb2.CarrierSettings())
207
208  clean_apn(patch)
209  clean_apn(base_setting)
210
211  # Merge apns
212  apns = base_setting.apns.apn[:]
213  apns = merge_apns(apns, patch.apns.apn[:])
214  del base_setting.apns.apn[:]
215  base_setting.apns.apn.extend(apns)
216
217  # Write back
218  with open(carrier_file, 'w', encoding='utf-8') as f:
219    text_format.PrintMessage(base_setting, f, as_utf8=True)
220
221
222def merge_multi_carrier_settings(patch_list, carrier_file):
223  """Merge CarrierSettings into a base MultiCarrierSettings in textpb file.
224
225  This function merge apns only. The base may or may not contains an entry with
226  the same canonical_name as the patch.
227
228  Args:
229    patch_list: a list of CarrierSettings to be merged
230    carrier_file: the path to the base MultiCarrierSettings file
231  """
232  # Load base
233  with open(carrier_file, 'r', encoding='utf-8') as f:
234    base_settings = text_format.ParseLines(
235        f, carrier_settings_pb2.MultiCarrierSettings())
236
237  for patch in patch_list:
238    clean_apn(patch)
239    # find the (first and only) entry with patch.canonical_name and update it.
240    for setting in base_settings.setting:
241      if setting.canonical_name == patch.canonical_name:
242        clean_apn(setting)
243        apns = setting.apns.apn[:]
244        apns = merge_apns(apns, patch.apns.apn[:])
245        del setting.apns.apn[:]
246        setting.apns.apn.extend(apns)
247        break
248    # Or if no match, append it to base_settings
249    else:
250      base_settings.setting.extend([patch])
251
252  # Write back
253  with open(carrier_file, 'w', encoding='utf-8') as f:
254    text_format.PrintMessage(base_settings, f, as_utf8=True)
255
256
257def add_new_carriers(cnames, carrier_list_file):
258  """Add a new carrier into a CarrierList in textpb file.
259
260  The carrier_id of the new carrier is induced from the cname, assuming
261  that the cname is constructed by to_string.
262
263  Args:
264    cnames: a list of canonical_name of new carriers
265    carrier_list_file: the path to the CarrierList textpb file
266
267  Returns:
268    None
269  """
270  with open(carrier_list_file, 'r', encoding='utf-8') as f:
271    carriers = text_format.ParseLines(f, carrier_list_pb2.CarrierList())
272
273  for cname in cnames:
274    # Append the new carrier
275    new_carrier = carriers.entry.add()
276    new_carrier.canonical_name = cname
277    new_carrier.carrier_id.extend([to_carrier_id(cname)])
278
279  tmp = sorted(carriers.entry, key=lambda c: c.canonical_name)
280  del carriers.entry[:]
281  carriers.entry.extend(tmp)
282
283  with open(carrier_list_file, 'w', encoding='utf-8') as f:
284    text_format.PrintMessage(carriers, f, as_utf8=True)
285
286
287def add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers):
288  """Add APNs for carriers in others.textpb that doesn't have APNs, by mccmnc.
289
290  If a carrier defined as mccmnc + mvno_data doesn't hava APNs, it should use
291  APNs from the carrier defined as mccmnc only.
292
293  Modifies others.textpb file in-place.
294
295  If a carriersettingstool.no_apn_for_mvno_bool is defined as true for a MVNO,
296  the APNs from the corresponding MNO(by MCC/MNC) will not be used.
297
298  Args:
299    apns: a list of CarrierSettings message with APNs only.
300    tier1_carriers: parsed tier-1 carriers list; must not contain new carriers.
301      A dict, key is to_string(carrier_id), value is cname.
302    other_carriers: parsed other carriers list; must not contain new carriers. A
303      dict, key is to_string(carrier_id), value is cname.
304  """
305  # Convert apns from a list to a map, key being the canonical_name
306  apns_dict = {
307      carrier_settings.canonical_name: carrier_settings
308      for carrier_settings in apns
309  }
310
311  others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir
312  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
313    others = text_format.ParseLines(others_textpb_file,
314                                    carrier_settings_pb2.MultiCarrierSettings())
315
316  for setting in others.setting:
317    if not setting.HasField('apns'):
318      carrier_id = to_carrier_id(setting.canonical_name)
319      if carrier_id.HasField('mvno_data'):
320        # in case we don't need MNO APN for this MVNO
321        skip_mno_apn = False
322        if setting.HasField('configs'):
323          for conf in setting.configs.config:
324            if conf.key == 'carriersettingstool.no_apn_for_mvno_bool':
325              skip_mno_apn = conf.bool_value
326              break
327        if skip_mno_apn:
328          continue
329        carrier_id.ClearField('mvno_data')
330        carrier_id_str_of_mccmnc = to_string(carrier_id)
331        cname_of_mccmnc = tier1_carriers.get(
332            carrier_id_str_of_mccmnc) or other_carriers.get(
333                carrier_id_str_of_mccmnc)
334        if cname_of_mccmnc:
335          apn = apns_dict.get(cname_of_mccmnc)
336          if apn:
337            setting.apns.CopyFrom(apn.apns)
338
339  sanitise_carrier_config(others.setting)
340
341  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
342    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)
343
344def sanitise_carrier_config(setting):
345  """Remove temparary carrier config items that's only used for conversion tool"""
346  for carrier_setting in setting:
347    if carrier_setting.HasField('configs'):
348      configs = carrier_setting.configs.config[:]
349      del carrier_setting.configs.config[:]
350      for config in configs:
351        if not config.key.startswith('carriersettingstool.'):
352          carrier_setting.configs.config.append(config)
353
354def add_carrierconfig_for_new_carriers(cnames, tier1_carriers, other_carriers):
355  """Add carrier configs for new (non-tier-1) carriers.
356
357   For new carriers, ie. carriers existing in APN but not CarrierConfig:
358      - for <mccmnc>: copy carrier config of <mcc>.
359      - for <mccmnc>(GID1|SPN|IMSI)=<mvnodata>: copy carrier config of <mccmnc>,
360      or <mcc>.
361
362  Modifies others.textpb file in-place.
363
364  Args:
365    cnames: a list of canonical_name of new carriers.
366    tier1_carriers: parsed tier-1 carriers list; must not contain new carriers.
367      A dict, key is to_string(carrier_id), value is cname.
368    other_carriers: parsed other carriers list; must not contain new carriers. A
369      dict, key is to_string(carrier_id), value is cname.
370  """
371  carrier_configs_map = {}
372
373  others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir
374  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
375    others = text_format.ParseLines(others_textpb_file,
376                                    carrier_settings_pb2.MultiCarrierSettings())
377    for setting in others.setting:
378      if setting.canonical_name in other_carriers:
379        carrier_configs_map[setting.canonical_name] = setting.configs
380  for cid_str, cname in tier1_carriers.items():
381    tier1_textpb = '%s/setting/%s.textpb' % (FLAGS.data_dir, cname)
382    with open(tier1_textpb, 'r', encoding='utf-8') as tier1_textpb_file:
383      tier1 = text_format.ParseLines(tier1_textpb_file,
384                                     carrier_settings_pb2.CarrierSettings())
385      carrier_configs_map[cid_str] = tier1.configs
386
387  for setting in others.setting:
388    if setting.canonical_name in cnames:
389      carrier_id = to_carrier_id(setting.canonical_name)
390      mccmnc = carrier_id.mcc_mnc
391      mcc = mccmnc[:3]
392      if mccmnc in carrier_configs_map:
393        setting.configs.config.extend(carrier_configs_map[mccmnc].config[:])
394      elif mcc in carrier_configs_map:
395        setting.configs.config.extend(carrier_configs_map[mcc].config[:])
396
397  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
398    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)
399
400
401def cleanup_mcc_only_carriers():
402  """Removes mcc-only carriers from other_carriers.textpb & others.textpb.
403
404  Modifies other_carriers.textpb file & others.textpb file in-place.
405  """
406  mcc_only_carriers = set()
407
408  with open(
409      OTHER_CARRIERS_TEXTPB, 'r',
410      encoding='utf-8') as other_carriers_textpb_file:
411    other_carriers = text_format.ParseLines(other_carriers_textpb_file,
412                                            carrier_list_pb2.CarrierList())
413
414  other_carriers_entry_with_mccmnc = []
415  for carrier in other_carriers.entry:
416    for carrier_id in carrier.carrier_id:
417      if len(carrier_id.mcc_mnc) == 3:
418        mcc_only_carriers.add(carrier.canonical_name)
419      else:
420        other_carriers_entry_with_mccmnc.append(carrier)
421  del other_carriers.entry[:]
422  other_carriers.entry.extend(other_carriers_entry_with_mccmnc)
423
424  # Finish early if no mcc_only_carriers; that means no file modification
425  # required.
426  if not mcc_only_carriers:
427    return
428
429  with open(
430      OTHER_CARRIERS_TEXTPB, 'w',
431      encoding='utf-8') as other_carriers_textpb_file:
432    text_format.PrintMessage(
433        other_carriers, other_carriers_textpb_file, as_utf8=True)
434
435  others_textpb = os.path.join(FLAGS.data_dir, 'setting', 'others.textpb')
436  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
437    others = text_format.ParseLines(others_textpb_file,
438                                    carrier_settings_pb2.MultiCarrierSettings())
439  copy_others_setting = others.setting[:]
440  del others.setting[:]
441  others.setting.extend([
442      setting for setting in copy_others_setting
443      if setting.canonical_name not in mcc_only_carriers
444  ])
445
446  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
447    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)
448
449
450def main():
451  apns = get_input(FLAGS.in_file).setting
452  tier1_carriers = get_knowncarriers([TIER1_CARRIERS_TEXTPB])
453  other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB])
454  new_carriers = []
455
456  # Step 1a: merge APNs into CarrierConfigs by canonical name.
457  # Also find out "new carriers" existing in APNs but not in CarrierConfigs.
458  other_carriers_patch = []
459  for carrier_settings in apns:
460    carrier_settings = clear_apn_fields_in_default_value(carrier_settings)
461
462    cname = carrier_settings.canonical_name
463    if cname in tier1_carriers.values():
464      merge_carrier_settings(carrier_settings,
465                             '%s/setting/%s.textpb' % (FLAGS.data_dir, cname))
466    else:
467      other_carriers_patch.append(carrier_settings)
468      if cname not in other_carriers.values():
469        new_carriers.append(cname)
470
471  merge_multi_carrier_settings(other_carriers_patch,
472                               '%s/setting/others.textpb' % FLAGS.data_dir)
473
474  # Step 1b: populate carrier configs for new carriers.
475  add_carrierconfig_for_new_carriers(new_carriers, tier1_carriers,
476                                     other_carriers)
477
478  # Step 2: merge new carriers into non-tier1 carrier list.
479  add_new_carriers(new_carriers, OTHER_CARRIERS_TEXTPB)
480  # Update other_carriers map
481  other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB])
482
483  # Step 3: merge APNs into CarrierConfigs by mccmnc: for a carrier defined
484  # as mccmnc + gid/spn/imsi, if it doesn't have any APNs, it should use APNs
485  # from carrier defined as mccmnc only.
486  # Only handle non-tier1 carriers, as tier1 carriers are assumed to be better
487  # maintained and are already having APNs defined.
488  add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers)
489
490  # Step 4: clean up mcc-only carriers; they're used in step 3 but should not
491  # be in final carrier settings to avoid confusing CarrierSettings app.
492  cleanup_mcc_only_carriers()
493
494
495if __name__ == '__main__':
496  main()
497