1# Copyright (C) 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15r"""Read a MultiCarrierSettings file and update CarrierSettings data.
16
17For APNs in the input file, they are simply appended to the apn list of the
18corresponding carrier in CarrierSettings data. If a new carrier (identified by
19canonical_name) appears in input, the other_carriers.textpb will be updated.
20
21How to run:
22
23update_carrier_data.par \
24--in_file=/tmp/tmpapns.textpb \
25--data_dir=/tmp/carrier/data
26"""
27
28from __future__ import absolute_import
29from __future__ import division
30from __future__ import print_function
31import argparse
32import copy
33import os
34import compare
35from google.protobuf import text_format
36import carrier_list_pb2
37import carrier_settings_pb2
38
39parser = argparse.ArgumentParser()
40parser.add_argument(
41    '--data_dir', default='./data', help='Folder path for CarrierSettings data')
42parser.add_argument(
43    '--in_file', default='./tmpapns.textpb', help='Temp APN file')
44FLAGS = parser.parse_args()
45
46TIER1_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'tier1_carriers.textpb')
47OTHER_CARRIERS_TEXTPB = os.path.join(FLAGS.data_dir, 'other_carriers.textpb')
48
49
50def equals_apn(a, b):
51  """Tell if two ApnItem proto are the same."""
52  a = compare.NormalizeRepeatedFields(copy.deepcopy(a))
53  b = compare.NormalizeRepeatedFields(copy.deepcopy(b))
54  # ignore 'name' field
55  a.ClearField('name')
56  b.ClearField('name')
57  return compare.Proto2Equals(a, b)
58
59
60def find_apn(apn, apns):
61  """Tell if apn is in apns."""
62  for a in apns:
63    if equals_apn(apn, a):
64      return True
65  return False
66
67
68def merge_mms_apn(a, b):
69  """Try to merge mmsc fields of b into a, if that's the only diff."""
70  aa = compare.NormalizeRepeatedFields(copy.deepcopy(a))
71  bb = compare.NormalizeRepeatedFields(copy.deepcopy(b))
72  # check if any fields other than mms are different
73  for field in ['name', 'mmsc_proxy', 'mmsc_proxy_port']:
74    aa.ClearField(field)
75    bb.ClearField(field)
76  if compare.Proto2Equals(aa, bb):
77    for field in ['mmsc_proxy', 'mmsc_proxy_port']:
78      if b.HasField(field):
79        setattr(a, field, getattr(b, field))
80
81
82def clean_apn(setting):
83  """Remove duplicated ApnItems from a CarrierSettings proto.
84
85  Args:
86    setting: a CarrierSettings proto
87
88  Returns:
89    None
90  """
91  if not setting.HasField('apns') or len(setting.apns.apn) <= 1:
92    return
93  apns = setting.apns.apn[:]
94  cleaned_apns = [a for n, a in enumerate(apns) if not find_apn(a, apns[:n])]
95  del setting.apns.apn[:]
96  setting.apns.apn.extend(cleaned_apns)
97
98
99def merge_apns(dest_apns, source_apns):
100  """Merge source_apns into dest_apns."""
101  for apn in dest_apns:
102    for source in source_apns:
103      merge_mms_apn(apn, source)
104  ret = list(dest_apns)
105  for source in source_apns:
106    if not find_apn(source, ret):
107      ret.append(source)
108  return ret
109
110
111def to_string(cid):
112  """Return the string representation of a CarrierId."""
113  ret = cid.mcc_mnc
114  if cid.HasField('spn'):
115    ret += 'SPN=' + cid.spn.upper()
116  if cid.HasField('imsi'):
117    ret += 'IMSI=' + cid.imsi.upper()
118  if cid.HasField('gid1'):
119    ret += 'GID1=' + cid.gid1.upper()
120  return ret
121
122
123def to_carrier_id(cid_string):
124  """Return the CarrierId from its string representation."""
125  cid = carrier_list_pb2.CarrierId()
126  if 'SPN=' in cid_string:
127    ind = cid_string.find('SPN=')
128    cid.mcc_mnc = cid_string[:ind]
129    cid.spn = cid_string[ind + len('SPN='):]
130  elif 'IMSI=' in cid_string:
131    ind = cid_string.find('IMSI=')
132    cid.mcc_mnc = cid_string[:ind]
133    cid.imsi = cid_string[ind + len('IMSI='):]
134  elif 'GID1=' in cid_string:
135    ind = cid_string.find('GID1=')
136    cid.mcc_mnc = cid_string[:ind]
137    cid.gid1 = cid_string[ind + len('GID1='):]
138  else:
139    cid.mcc_mnc = cid_string
140  return cid
141
142
143def get_input(path):
144  """Read input MultiCarrierSettings textpb file.
145
146  Args:
147    path: the path to input MultiCarrierSettings textpb file
148
149  Returns:
150    A MultiCarrierSettings. None when failed.
151  """
152  mcs = None
153  with open(path, 'r', encoding='utf-8') as f:
154    mcs = carrier_settings_pb2.MultiCarrierSettings()
155    text_format.Merge(f.read(), mcs)
156
157  return mcs
158
159
160def get_knowncarriers(files):
161  """Create a mapping from mccmnc and possible mvno data to canonical name.
162
163  Args:
164    files: list of paths to carrier list textpb files
165
166  Returns:
167    A dict, key is to_string(carrier_id), value is cname.
168  """
169  ret = dict()
170  for path in files:
171    with open(path, 'r', encoding='utf-8') as f:
172      carriers = carrier_list_pb2.CarrierList()
173      text_format.Merge(f.read(), carriers)
174      for carriermap in carriers.entry:
175        for cid in carriermap.carrier_id:
176          ret[to_string(cid)] = carriermap.canonical_name
177
178  return ret
179
180
181def clear_apn_fields_in_default_value(carrier_settings):
182
183  def clean(apn):
184    if apn.HasField('bearer_bitmask') and apn.bearer_bitmask == '0':
185      apn.ClearField('bearer_bitmask')
186    return apn
187
188  for apn in carrier_settings.apns.apn:
189    clean(apn)
190  return carrier_settings
191
192
193def merge_carrier_settings(patch, carrier_file):
194  """Merge a CarrierSettings into a base CarrierSettings in textpb file.
195
196  This function merge apns only. It assumes that the patch and base have the
197  same canonical_name.
198
199  Args:
200    patch: the carrier_settings to be merged
201    carrier_file: the path to the base carrier_settings file
202  """
203  # Load base
204  with open(carrier_file, 'r', encoding='utf-8') as f:
205    base_setting = text_format.ParseLines(f,
206                                          carrier_settings_pb2.CarrierSettings())
207
208  clean_apn(patch)
209  clean_apn(base_setting)
210
211  # Merge apns
212  apns = base_setting.apns.apn[:]
213  apns = merge_apns(apns, patch.apns.apn[:])
214  del base_setting.apns.apn[:]
215  base_setting.apns.apn.extend(apns)
216
217  # Write back
218  with open(carrier_file, 'w', encoding='utf-8') as f:
219    text_format.PrintMessage(base_setting, f, as_utf8=True)
220
221
222def merge_multi_carrier_settings(patch_list, carrier_file):
223  """Merge CarrierSettings into a base MultiCarrierSettings in textpb file.
224
225  This function merge apns only. The base may or may not contains an entry with
226  the same canonical_name as the patch.
227
228  Args:
229    patch_list: a list of CarrierSettings to be merged
230    carrier_file: the path to the base MultiCarrierSettings file
231  """
232  # Load base
233  with open(carrier_file, 'r', encoding='utf-8') as f:
234    base_settings = text_format.ParseLines(
235        f, carrier_settings_pb2.MultiCarrierSettings())
236
237  for patch in patch_list:
238    clean_apn(patch)
239    # find the (first and only) entry with patch.canonical_name and update it.
240    for setting in base_settings.setting:
241      if setting.canonical_name == patch.canonical_name:
242        clean_apn(setting)
243        apns = setting.apns.apn[:]
244        apns = merge_apns(apns, patch.apns.apn[:])
245        del setting.apns.apn[:]
246        setting.apns.apn.extend(apns)
247        break
248    # Or if no match, append it to base_settings
249    else:
250      base_settings.setting.extend([patch])
251
252  # Write back
253  with open(carrier_file, 'w', encoding='utf-8') as f:
254    text_format.PrintMessage(base_settings, f, as_utf8=True)
255
256
257def add_new_carriers(cnames, carrier_list_file):
258  """Add a new carrier into a CarrierList in textpb file.
259
260  The carrier_id of the new carrier is induced from the cname, assuming
261  that the cname is constructed by to_string.
262
263  Args:
264    cnames: a list of canonical_name of new carriers
265    carrier_list_file: the path to the CarrierList textpb file
266
267  Returns:
268    None
269  """
270  with open(carrier_list_file, 'r', encoding='utf-8') as f:
271    carriers = text_format.ParseLines(f, carrier_list_pb2.CarrierList())
272
273  for cname in cnames:
274    # Append the new carrier
275    new_carrier = carriers.entry.add()
276    new_carrier.canonical_name = cname
277    new_carrier.carrier_id.extend([to_carrier_id(cname)])
278
279  tmp = sorted(carriers.entry, key=lambda c: c.canonical_name)
280  del carriers.entry[:]
281  carriers.entry.extend(tmp)
282
283  with open(carrier_list_file, 'w', encoding='utf-8') as f:
284    text_format.PrintMessage(carriers, f, as_utf8=True)
285
286
287def add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers):
288  """Add APNs for carriers in others.textpb that doesn't have APNs, by mccmnc.
289
290  If a carrier defined as mccmnc + mvno_data doesn't hava APNs, it should use
291  APNs from the carrier defined as mccmnc only.
292
293  Modifies others.textpb file in-place.
294
295  Args:
296    apns: a list of CarrierSettings message with APNs only.
297    tier1_carriers: parsed tier-1 carriers list; must not contain new carriers.
298      A dict, key is to_string(carrier_id), value is cname.
299    other_carriers: parsed other carriers list; must not contain new carriers. A
300      dict, key is to_string(carrier_id), value is cname.
301  """
302  # Convert apns from a list to a map, key being the canonical_name
303  apns_dict = {
304      carrier_settings.canonical_name: carrier_settings
305      for carrier_settings in apns
306  }
307
308  others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir
309  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
310    others = text_format.ParseLines(others_textpb_file,
311                                    carrier_settings_pb2.MultiCarrierSettings())
312
313  for setting in others.setting:
314    if not setting.HasField('apns'):
315      carrier_id = to_carrier_id(setting.canonical_name)
316      if carrier_id.HasField('mvno_data'):
317        carrier_id.ClearField('mvno_data')
318        carrier_id_str_of_mccmnc = to_string(carrier_id)
319        cname_of_mccmnc = tier1_carriers.get(
320            carrier_id_str_of_mccmnc) or other_carriers.get(
321                carrier_id_str_of_mccmnc)
322        if cname_of_mccmnc:
323          apn = apns_dict.get(cname_of_mccmnc)
324          if apn:
325            setting.apns.CopyFrom(apn.apns)
326
327  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
328    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)
329
330
331def add_carrierconfig_for_new_carriers(cnames, tier1_carriers, other_carriers):
332  """Add carrier configs for new (non-tier-1) carriers.
333
334   For new carriers, ie. carriers existing in APN but not CarrierConfig:
335      - for <mccmnc>: copy carrier config of <mcc>.
336      - for <mccmnc>(GID1|SPN|IMSI)=<mvnodata>: copy carrier config of <mccmnc>,
337      or <mcc>.
338
339  Modifies others.textpb file in-place.
340
341  Args:
342    cnames: a list of canonical_name of new carriers.
343    tier1_carriers: parsed tier-1 carriers list; must not contain new carriers.
344      A dict, key is to_string(carrier_id), value is cname.
345    other_carriers: parsed other carriers list; must not contain new carriers. A
346      dict, key is to_string(carrier_id), value is cname.
347  """
348  carrier_configs_map = {}
349
350  others_textpb = '%s/setting/others.textpb' % FLAGS.data_dir
351  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
352    others = text_format.ParseLines(others_textpb_file,
353                                    carrier_settings_pb2.MultiCarrierSettings())
354    for setting in others.setting:
355      if setting.canonical_name in other_carriers:
356        carrier_configs_map[setting.canonical_name] = setting.configs
357  for cid_str, cname in tier1_carriers.items():
358    tier1_textpb = '%s/setting/%s.textpb' % (FLAGS.data_dir, cname)
359    with open(tier1_textpb, 'r', encoding='utf-8') as tier1_textpb_file:
360      tier1 = text_format.ParseLines(tier1_textpb_file,
361                                     carrier_settings_pb2.CarrierSettings())
362      carrier_configs_map[cid_str] = tier1.configs
363
364  for setting in others.setting:
365    if setting.canonical_name in cnames:
366      carrier_id = to_carrier_id(setting.canonical_name)
367      mccmnc = carrier_id.mcc_mnc
368      mcc = mccmnc[:3]
369      if mccmnc in carrier_configs_map:
370        setting.configs.config.extend(carrier_configs_map[mccmnc].config[:])
371      elif mcc in carrier_configs_map:
372        setting.configs.config.extend(carrier_configs_map[mcc].config[:])
373
374  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
375    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)
376
377
378def cleanup_mcc_only_carriers():
379  """Removes mcc-only carriers from other_carriers.textpb & others.textpb.
380
381  Modifies other_carriers.textpb file & others.textpb file in-place.
382  """
383  mcc_only_carriers = set()
384
385  with open(
386      OTHER_CARRIERS_TEXTPB, 'r',
387      encoding='utf-8') as other_carriers_textpb_file:
388    other_carriers = text_format.ParseLines(other_carriers_textpb_file,
389                                            carrier_list_pb2.CarrierList())
390
391  other_carriers_entry_with_mccmnc = []
392  for carrier in other_carriers.entry:
393    for carrier_id in carrier.carrier_id:
394      if len(carrier_id.mcc_mnc) == 3:
395        mcc_only_carriers.add(carrier.canonical_name)
396      else:
397        other_carriers_entry_with_mccmnc.append(carrier)
398  del other_carriers.entry[:]
399  other_carriers.entry.extend(other_carriers_entry_with_mccmnc)
400
401  # Finish early if no mcc_only_carriers; that means no file modification
402  # required.
403  if not mcc_only_carriers:
404    return
405
406  with open(
407      OTHER_CARRIERS_TEXTPB, 'w',
408      encoding='utf-8') as other_carriers_textpb_file:
409    text_format.PrintMessage(
410        other_carriers, other_carriers_textpb_file, as_utf8=True)
411
412  others_textpb = os.path.join(FLAGS.data_dir, 'setting', 'others.textpb')
413  with open(others_textpb, 'r', encoding='utf-8') as others_textpb_file:
414    others = text_format.ParseLines(others_textpb_file,
415                                    carrier_settings_pb2.MultiCarrierSettings())
416  copy_others_setting = others.setting[:]
417  del others.setting[:]
418  others.setting.extend([
419      setting for setting in copy_others_setting
420      if setting.canonical_name not in mcc_only_carriers
421  ])
422
423  with open(others_textpb, 'w', encoding='utf-8') as others_textpb_file:
424    text_format.PrintMessage(others, others_textpb_file, as_utf8=True)
425
426
427def main():
428  apns = get_input(FLAGS.in_file).setting
429  tier1_carriers = get_knowncarriers([TIER1_CARRIERS_TEXTPB])
430  other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB])
431  new_carriers = []
432
433  # Step 1a: merge APNs into CarrierConfigs by canonical name.
434  # Also find out "new carriers" existing in APNs but not in CarrierConfigs.
435  other_carriers_patch = []
436  for carrier_settings in apns:
437    carrier_settings = clear_apn_fields_in_default_value(carrier_settings)
438
439    cname = carrier_settings.canonical_name
440    if cname in tier1_carriers.values():
441      merge_carrier_settings(carrier_settings,
442                             '%s/setting/%s.textpb' % (FLAGS.data_dir, cname))
443    else:
444      other_carriers_patch.append(carrier_settings)
445      if cname not in other_carriers.values():
446        new_carriers.append(cname)
447
448  merge_multi_carrier_settings(other_carriers_patch,
449                               '%s/setting/others.textpb' % FLAGS.data_dir)
450
451  # Step 1b: populate carrier configs for new carriers.
452  add_carrierconfig_for_new_carriers(new_carriers, tier1_carriers,
453                                     other_carriers)
454
455  # Step 2: merge new carriers into non-tier1 carrier list.
456  add_new_carriers(new_carriers, OTHER_CARRIERS_TEXTPB)
457  # Update other_carriers map
458  other_carriers = get_knowncarriers([OTHER_CARRIERS_TEXTPB])
459
460  # Step 3: merge APNs into CarrierConfigs by mccmnc: for a carrier defined
461  # as mccmnc + gid/spn/imsi, if it doesn't have any APNs, it should use APNs
462  # from carrier defined as mccmnc only.
463  # Only handle non-tier1 carriers, as tier1 carriers are assumed to be better
464  # maintained and are already having APNs defined.
465  add_apns_for_other_carriers_by_mccmnc(apns, tier1_carriers, other_carriers)
466
467  # Step 4: clean up mcc-only carriers; they're used in step 3 but should not
468  # be in final carrier settings to avoid confusing CarrierSettings app.
469  cleanup_mcc_only_carriers()
470
471
472if __name__ == '__main__':
473  main()
474