1# Copyright 2016 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16from google import protobuf
17from gcloud import bigtable
18
19_COLUMN_FAMILY_ID = 'cf1'
20
21
22class BigTableClient(object):
23    """Defines the big table client that connects to the big table.
24
25    Attributes:
26        _column_family_id: A String for family of columns.
27        _client: An instance of Client which is project specific.
28        _client_instance: Representation of a Google Cloud Bigtable Instance.
29        _start_index: Start index for the row key. It gets incremented as we
30            dequeue.
31        _end_index : End index for row key. This is incremented as we Enqueue.
32        _table_name: A string that represents the big table.
33        _table_instance: An instance of the Table that represents the big table.
34    """
35
36    def __init__(self, table, project_id):
37        self._column_family_id = _COLUMN_FAMILY_ID
38        self._client = bigtable.Client(project=project_id, admin=True)
39        self._client_instance = None
40        self._start_index = 0
41        self._end_index = 0
42        self._table_name = table
43        self._table_instance = None
44        # Start client to enable receiving requests
45        self.StartClient()
46
47    def StartClient(self, instance_id):
48        """Starts client to prepare it to make requests."""
49
50        # Start the client
51        if not self._client.is_started():
52            self._client.start()
53        self._client_instance = self._client.instance(instance_id)
54        if self._table_instance is None:
55            self._table_instance = self._client_instance.table(
56                self._table_name)
57
58    def StopClient(self):
59        """Stop client to close all the open gRPC clients."""
60
61        # stop client
62        self._client.stop()
63
64    def CreateTable(self):
65        """Creates a table in which read/write operations are performed.
66
67        Raises:
68            AbortionError: Error occurred when creating table is not successful.
69                This could be due to creating a table with a duplicate name.
70        """
71
72        # Create a table
73        logging.debug('Creating the table %s', self._table_name)
74
75        self._table_instance.create()
76        cf1 = self._table_instance.column_family(self._column_family_id)
77        cf1.create()
78
79    def Enqueue(self, messages, column_id):
80        """Writes new rows to the given table.
81
82        Args:
83            messages: An array of strings that represents the message to be
84                written to a new row in the table. Each message is writte to a
85                new row
86            column_id: A string that represents the name of the column to which
87                data is to be written.
88        """
89
90        # Start writing rows
91        logging.debug('Writing to the table : %s, column : %s',
92                      self._table_name, column_id)
93        for value in messages:
94            row_key = str(self._end_index)
95            self._end_index = self._end_index + 1
96            row = self._table_instance.row(row_key)
97            row.set_cell(self._column_family_id,
98                         column_id.encode('utf-8'), value.encode('utf-8'))
99            row.commit()
100        # End writing rows
101
102    def Dequeue(self):
103        """Removes and returns the first row from the table.
104
105        Returns:
106            row: A row object that represents the top most row.
107        """
108
109        if self._end_index < self._start_index:
110            return
111
112        logging.debug('Getting a single row by row key.')
113        key = str(self._start_index)
114        row_cond = self._table_instance.row(key)
115        top_row = row_cond
116        row_cond.delete()
117        self._start_index = self._start_index + 1
118
119        return top_row
120
121    def DeleteTable(self):
122        """Performs delete operation for a given table."""
123
124        # Delete the table
125        logging.debug('Deleting the table : %s', self._table_name)
126        self._table_instance.delete()
127