1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Compare_contacts accepts 2 vcf files, extracts full name, email, and
17telephone numbers from each and reports how many unique cards it finds across
18the two files.
19"""
20
21from mmap import ACCESS_READ
22from mmap import mmap
23import logging
24import re
25import random
26import string
27import time
28from acts.utils import exe_cmd
29import queue
30
31# CallLog types
32INCOMMING_CALL_TYPE = "1"
33OUTGOING_CALL_TYPE = "2"
34MISSED_CALL_TYPE = "3"
35
36# Callback strings.
37CONTACTS_CHANGED_CALLBACK = "ContactsChanged"
38CALL_LOG_CHANGED = "CallLogChanged"
39CONTACTS_ERASED_CALLBACK = "ContactsErased"
40
41# URI for contacts database on Nexus.
42CONTACTS_URI = "content://com.android.contacts/data/phones"
43
44# Path for temporary file storage on device.
45STORAGE_PATH = "/storage/emulated/0/Download/"
46
47PBAP_SYNC_TIME = 30
48
49log = logging
50
51
52def parse_contacts(file_name):
53    """Read vcf file and generate a list of contacts.
54
55    Contacts full name, prefered email, and all phone numbers are extracted.
56    """
57
58    vcard_regex = re.compile(b"^BEGIN:VCARD((\n*?.*?)*?)END:VCARD",
59                             re.MULTILINE)
60    fullname_regex = re.compile(b"^FN:(.*)", re.MULTILINE)
61    email_regex = re.compile(b"^EMAIL;PREF:(.*)", re.MULTILINE)
62    tel_regex = re.compile(b"^TEL;(.*):(.*)", re.MULTILINE)
63
64    with open(file_name, "r") as contacts_file:
65        contacts = []
66        contacts_map = mmap(
67            contacts_file.fileno(), length=0, access=ACCESS_READ)
68        new_contact = None
69
70        # Find all VCARDs in the input file, then extract the first full name,
71        # first email address, and all phone numbers from it.  If there is at
72        # least a full name add it to the contact list.
73        for current_vcard in vcard_regex.findall(contacts_map):
74            new_contact = VCard()
75
76            fullname = fullname_regex.search(current_vcard[0])
77            if fullname is not None:
78                new_contact.name = fullname.group(1)
79
80            email = email_regex.search(current_vcard[0])
81            if email is not None:
82                new_contact.email = email.group(1)
83
84            for phone_number in tel_regex.findall(current_vcard[0]):
85                new_contact.add_phone_number(
86                    PhoneNumber(phone_number[0], phone_number[1]))
87
88            contacts.append(new_contact)
89
90        return contacts
91
92
93def phone_number_count(destination_path, file_name):
94    """Counts number of phone numbers in a VCF.
95    """
96    tel_regex = re.compile(b"^TEL;(.*):(.*)", re.MULTILINE)
97    with open("{}{}".format(destination_path, file_name),
98              "r") as contacts_file:
99        contacts_map = mmap(
100            contacts_file.fileno(), length=0, access=ACCESS_READ)
101        numbers = tel_regex.findall(contacts_map)
102        return len(numbers)
103
104
105def count_contacts_with_differences(destination_path,
106                                    pce_contacts_vcf_file_name,
107                                    pse_contacts_vcf_file_name):
108    """Compare two contact files and report the number of differences.
109
110    Difference count is returned, and the differences are logged, this is order
111    independent.
112    """
113
114    pce_contacts = parse_contacts("{}{}".format(destination_path,
115                                                pce_contacts_vcf_file_name))
116    pse_contacts = parse_contacts("{}{}".format(destination_path,
117                                                pse_contacts_vcf_file_name))
118
119    differences = set(pce_contacts).symmetric_difference(set(pse_contacts))
120    if not differences:
121        log.info("All {} contacts in the phonebooks match".format(
122            str(len(pce_contacts))))
123    else:
124        log.info("{} contacts match, but ".format(
125            str(len(set(pce_contacts).intersection(set(pse_contacts))))))
126        log.info("the following {} entries don't match:".format(
127            str(len(differences))))
128        for current_vcard in differences:
129            log.info(current_vcard)
130    return len(differences)
131
132
133class PhoneNumber(object):
134    """Simple class for maintaining a phone number entry and type with only the
135    digits.
136    """
137
138    def __init__(self, phone_type, phone_number):
139        self.phone_type = phone_type
140        # remove non digits from phone_number
141        self.phone_number = re.sub(r"\D", "", str(phone_number))
142
143    def __eq__(self, other):
144        return (self.phone_type == other.phone_type and
145                self.phone_number == other.phone_number)
146
147    def __hash__(self):
148        return hash(self.phone_type) ^ hash(self.phone_number)
149
150
151class VCard(object):
152    """Contains name, email, and phone numbers.
153    """
154
155    def __init__(self):
156        self.name = None
157        self.first_name = None
158        self.last_name = None
159        self.email = None
160        self.phone_numbers = []
161        self.photo = None
162
163    def __lt__(self, other):
164        return self.name < other.name
165
166    def __hash__(self):
167        result = hash(self.name) ^ hash(self.email) ^ hash(self.photo == None)
168        for number in self.phone_numbers:
169            result ^= hash(number)
170        return result
171
172    def __eq__(self, other):
173        return hash(self) == hash(other)
174
175    def __ne__(self, other):
176        return not self.__eq__(other)
177
178    def __str__(self):
179        vcard_strings = ["BEGIN:VCARD\n", "VERSION:2.1\n"]
180
181        if self.first_name or self.last_name:
182            vcard_strings.append("N:{};{};;;\nFN:{} {}\n".format(
183                self.last_name, self.first_name, self.first_name,
184                self.last_name))
185        elif self.name:
186            vcard_strings.append("FN:{}\n".format(self.name))
187
188        if self.phone_numbers:
189            for phone in self.phone_numbers:
190                vcard_strings.append("TEL;{}:{}\n".format(
191                    str(phone.phone_type), phone.phone_number))
192
193        if self.email:
194            vcard_strings.append("EMAIL;PREF:{}\n".format(self.email))
195
196        vcard_strings.append("END:VCARD\n")
197        return "".join(vcard_strings)
198
199    def add_phone_number(self, phone_number):
200        if phone_number not in self.phone_numbers:
201            self.phone_numbers.append(phone_number)
202
203
204def generate_random_phone_number():
205    """Generate a random phone number/type
206    """
207    return PhoneNumber("CELL",
208                       "+{0:010d}".format(random.randint(0, 9999999999)))
209
210
211def generate_random_string(length=8,
212                           charset="{}{}{}".format(string.digits,
213                                                   string.ascii_letters,
214                                                   string.punctuation)):
215    """Generate a random string of specified length from the characterset
216    """
217    # Remove ; since that would make 2 words.
218    charset = charset.replace(";", "")
219    name = []
220    for i in range(length):
221        name.append(random.choice(charset))
222    return "".join(name)
223
224
225def generate_contact_list(destination_path,
226                          file_name,
227                          contact_count,
228                          phone_number_count=1):
229    """Generate a simple VCF file for count contacts with basic content.
230
231    An example with count = 1 and local_number = 2]
232
233    BEGIN:VCARD
234    VERSION:2.1
235    N:Person;1;;;
236    FN:1 Person
237    TEL;CELL:+1-555-555-1234
238    TEL;CELL:+1-555-555-4321
239    EMAIL;PREF:person1@gmail.com
240    END:VCARD
241    """
242    vcards = []
243    for i in range(contact_count):
244        current_contact = VCard()
245        current_contact.first_name = generate_random_string(
246            random.randint(1, 19))
247        current_contact.last_name = generate_random_string(
248            random.randint(1, 19))
249        current_contact.email = "{}{}@{}.{}".format(
250            current_contact.last_name, current_contact.first_name,
251            generate_random_string(random.randint(1, 19)),
252            generate_random_string(random.randint(1, 4)))
253        for number in range(phone_number_count):
254            current_contact.add_phone_number(generate_random_phone_number())
255        vcards.append(current_contact)
256    create_new_contacts_vcf_from_vcards(destination_path, file_name, vcards)
257
258
259def create_new_contacts_vcf_from_vcards(destination_path, vcf_file_name,
260                                        vcards):
261    """Create a new file with filename
262    """
263    contact_file = open("{}{}".format(destination_path, vcf_file_name), "w+")
264    for card in vcards:
265        contact_file.write(str(card))
266    contact_file.close()
267
268
269def get_contact_count(device):
270    """Returns the number of name:phone number pairs.
271    """
272    contact_list = device.droid.contactsQueryContent(
273        CONTACTS_URI, ["display_name", "data1"], "", [], "display_name")
274    return len(contact_list)
275
276
277def import_device_contacts_from_vcf(device, destination_path, vcf_file, timeout=10):
278    """Uploads and import vcf file to device.
279    """
280    number_count = phone_number_count(destination_path, vcf_file)
281    device.log.info("Trying to add {} phone numbers.".format(number_count))
282    local_phonebook_path = "{}{}".format(destination_path, vcf_file)
283    phone_phonebook_path = "{}{}".format(STORAGE_PATH, vcf_file)
284    device.adb.push("{} {}".format(local_phonebook_path, phone_phonebook_path))
285    device.droid.importVcf("file://{}{}".format(STORAGE_PATH, vcf_file))
286    start_time = time.time()
287    while time.time() < start_time + timeout:
288        #TODO: use unattended way to bypass contact import module instead of keyevent
289        if "ImportVCardActivity" in device.get_my_current_focus_window():
290            # keyevent to allow contacts import from vcf file
291            for key in ["DPAD_RIGHT", "DPAD_RIGHT", "ENTER"]:
292                device.adb.shell("input keyevent KEYCODE_{}".format(key))
293            break
294        time.sleep(1)
295    if wait_for_phone_number_update_complete(device, number_count):
296        return number_count
297    else:
298        return 0
299
300
301def export_device_contacts_to_vcf(device, destination_path, vcf_file):
302    """Export and download vcf file from device.
303    """
304    path_on_phone = "{}{}".format(STORAGE_PATH, vcf_file)
305    device.droid.exportVcf("{}".format(path_on_phone))
306    # Download and then remove file from device
307    device.adb.pull("{} {}".format(path_on_phone, destination_path))
308    return True
309
310
311def delete_vcf_files(device):
312    """Deletes all files with .vcf extension
313    """
314    files = device.adb.shell("ls {}".format(STORAGE_PATH))
315    for file_name in files.split():
316        if ".vcf" in file_name:
317            device.adb.shell("rm -f {}{}".format(STORAGE_PATH, file_name))
318
319
320def erase_contacts(device):
321    """Erase all contacts out of devices contact database.
322    """
323    device.log.info("Erasing contacts.")
324    if get_contact_count(device) > 0:
325        device.droid.contactsEraseAll()
326        try:
327            device.ed.pop_event(CONTACTS_ERASED_CALLBACK, PBAP_SYNC_TIME)
328        except queue.Empty:
329            log.error("Phone book not empty.")
330            return False
331    return True
332
333
334def wait_for_phone_number_update_complete(device, expected_count):
335    """Check phone_number count on device and wait for updates until it has the
336    expected number of phone numbers in its contact database.
337    """
338    update_completed = True
339    try:
340        while (expected_count != get_contact_count(device) and
341               device.ed.pop_event(CONTACTS_CHANGED_CALLBACK, PBAP_SYNC_TIME)):
342            pass
343    except queue.Empty:
344        log.error("Contacts failed to update.")
345        update_completed = False
346    device.log.info("Found {} out of the expected {} contacts.".format(
347        get_contact_count(device), expected_count))
348    return update_completed
349
350
351def wait_for_call_log_update_complete(device, expected_count):
352    """Check call log count on device and wait for updates until it has the
353    expected number of calls in its call log database.
354    """
355    update_completed = True
356    try:
357        while (expected_count != device.droid.callLogGetCount() and
358               device.ed.pop_event(CALL_LOG_CHANGED, PBAP_SYNC_TIME)):
359            pass
360    except queue.Empty:
361        log.error("Call Log failed to update.")
362        update_completed = False
363    device.log.info("Found {} out of the expected {} call logs.".format(
364        device.droid.callLogGetCount(), expected_count))
365    return
366
367
368def add_call_log(device, call_log_type, phone_number, call_time):
369    """Add call number and time to specified log.
370    """
371    new_call_log = {}
372    new_call_log["type"] = str(call_log_type)
373    new_call_log["number"] = phone_number
374    new_call_log["time"] = str(call_time)
375    device.droid.callLogsPut(new_call_log)
376
377
378def get_and_compare_call_logs(pse, pce, call_log_type):
379    """Gather and compare call logs from PSE and PCE for the specified type.
380    """
381    pse_call_log = pse.droid.callLogsGet(call_log_type)
382    pce_call_log = pce.droid.callLogsGet(call_log_type)
383    return compare_call_logs(pse_call_log, pce_call_log)
384
385
386def normalize_phonenumber(phone_number):
387    """Remove all non-digits from phone_number
388    """
389    return re.sub(r"\D", "", phone_number)
390
391
392def compare_call_logs(pse_call_log, pce_call_log):
393    """Gather and compare call logs from PSE and PCE for the specified type.
394    """
395    call_logs_match = True
396    if len(pse_call_log) == len(pce_call_log):
397        for i in range(len(pse_call_log)):
398            # Compare the phone number
399            if normalize_phonenumber(pse_call_log[i][
400                    "number"]) != normalize_phonenumber(pce_call_log[i][
401                         "number"]):
402                log.warning("Call Log numbers differ")
403                call_logs_match = False
404
405            # Compare which log it was taken from (Incomming, Outgoing, Missed
406            if pse_call_log[i]["type"] != pce_call_log[i]["type"]:
407                log.warning("Call Log types differ")
408                call_logs_match = False
409
410            # Compare time to truncated second.
411            if int(pse_call_log[i]["date"]) // 1000 != int(pce_call_log[i][
412                    "date"]) // 1000:
413                log.warning("Call log times don't match, check timezone.")
414                call_logs_match = False
415
416    else:
417        log.warning("Call Log lengths differ {}:{}".format(
418            len(pse_call_log), len(pce_call_log)))
419        call_logs_match = False
420
421    if not call_logs_match:
422        log.info("PSE Call Log:")
423        log.info(pse_call_log)
424        log.info("PCE Call Log:")
425        log.info(pce_call_log)
426
427    return call_logs_match
428
429