1package repositories
2
3import (
4	"database/sql"
5	"fmt"
6
7	"github.com/pkg/errors"
8
9	ent "repodiff/entities"
10	"repodiff/mappers"
11	repoSQL "repodiff/persistence/sql"
12)
13
14type GlobalDenormalizer struct {
15	db *sql.DB
16}
17
18type ScopedDenormalizer struct {
19	db           *sql.DB
20	target       ent.DiffTarget
21	mappedTarget ent.MappedDiffTarget
22}
23
24func (g GlobalDenormalizer) DenormalizeToTopCommitter() error {
25	table := "denormalized_view_top_committer"
26	if _, err := g.db.Exec(
27		fmt.Sprintf(
28			"TRUNCATE TABLE %s",
29			table,
30		),
31	); err != nil {
32		return err
33	}
34	_, err := g.db.Exec(
35		fmt.Sprintf(
36			`INSERT INTO %s (
37					upstream_target_id,
38					downstream_target_id,
39					surrogate_id,
40					committer,
41					commits,
42					line_changes,
43					tech_area,
44					upstream_url,
45					upstream_branch,
46					downstream_url,
47					downstream_branch
48				) (
49					SELECT
50						upstream_target_id,
51						downstream_target_id,
52						@rn:=@rn+1 AS surrogate_id,
53						committer,
54						commits,
55						line_changes,
56						tech_area,
57						upstream_url,
58						upstream_branch,
59						downstream_url,
60						downstream_branch
61					FROM (
62						SELECT upstream_target_id,
63							downstream_target_id,
64							author as committer,
65							tech_area,
66							COUNT(*) AS commits,
67							SUM(0) AS line_changes,
68							upstream_url,
69							upstream_branch,
70							downstream_url,
71							downstream_branch
72						FROM denormalized_view_recent_commit GROUP BY
73							author,
74							tech_area,
75							upstream_target_id,
76							downstream_target_id,
77							upstream_url,
78							upstream_branch,
79							downstream_url,
80							downstream_branch ORDER BY upstream_target_id,
81							downstream_target_id
82						) t1,
83						(SELECT @rn:=0) t2
84					)`,
85			table,
86		),
87	)
88	return err
89}
90
91func (g GlobalDenormalizer) DenormalizeToTopTechArea() error {
92	table := "denormalized_view_top_tech_area"
93	if _, err := g.db.Exec(
94		fmt.Sprintf(
95			"TRUNCATE TABLE %s",
96			table,
97		),
98	); err != nil {
99		return err
100	}
101	_, err := g.db.Exec(
102		fmt.Sprintf(
103			`INSERT INTO %s (
104					upstream_target_id,
105					downstream_target_id,
106					surrogate_id,
107					tech_area,
108					commits,
109					line_changes,
110					upstream_url,
111					upstream_branch,
112					downstream_url,
113					downstream_branch
114				) (
115					SELECT
116						upstream_target_id,
117						downstream_target_id,
118						@rn:=@rn+1 AS surrogate_id,
119						tech_area,
120						commits,
121						line_changes,
122						upstream_url,
123						upstream_branch,
124						downstream_url,
125						downstream_branch FROM (
126							SELECT
127								upstream_target_id,
128								downstream_target_id,
129								tech_area,
130								COUNT(*) AS commits,
131								SUM(0) AS line_changes,
132								upstream_url,
133								upstream_branch,
134								downstream_url,
135								downstream_branch
136							FROM denormalized_view_recent_commit GROUP BY
137								tech_area,
138								upstream_target_id,
139								downstream_target_id,
140								upstream_url,
141								upstream_branch,
142								downstream_url,
143								downstream_branch
144							ORDER BY
145								upstream_target_id,
146								downstream_target_id
147						) t1,
148						(SELECT @rn:=0) t2
149					)`,
150			table,
151		),
152	)
153	return err
154}
155
156func (s ScopedDenormalizer) DenormalizeToRecentView(diffRows []ent.AnalyzedDiffRow) error {
157	table := "denormalized_view_recent_project"
158	if err := s.deleteExistingView(table); err != nil {
159		return err
160	}
161	return errors.Wrap(
162		repoSQL.SingleTransactionInsert(
163			s.db,
164			fmt.Sprintf(
165				`INSERT INTO %s (
166					upstream_target_id,
167					downstream_target_id,
168					row_index,
169					date,
170					downstream_project,
171					upstream_project,
172					status,
173					files_changed,
174					line_insertions,
175					line_deletions,
176					line_changes,
177					commits_not_upstreamed,
178					project_type,
179					upstream_url,
180					upstream_branch,
181					downstream_url,
182					downstream_branch
183				) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
184				table,
185			),
186			s.rowsWithScopedIndices(
187				mappers.DiffRowsToDenormalizedCols(diffRows),
188			),
189		),
190		errorMessageForTable(table),
191	)
192}
193
194func NewGlobalDenormalizerRepository() (GlobalDenormalizer, error) {
195	db, err := repoSQL.GetDBConnectionPool()
196	return GlobalDenormalizer{
197		db: db,
198	}, errors.Wrap(err, "Could not establish a database connection")
199}
200
201func NewScopedDenormalizerRepository(target ent.DiffTarget, mappedTarget ent.MappedDiffTarget) (ScopedDenormalizer, error) {
202	db, err := repoSQL.GetDBConnectionPool()
203	return ScopedDenormalizer{
204		db:           db,
205		target:       cleanedDiffTarget(target),
206		mappedTarget: mappedTarget,
207	}, errors.Wrap(err, "Could not establish a database connection")
208}
209
210func (s ScopedDenormalizer) DenormalizeToChangesOverTime(diffRows []ent.AnalyzedDiffRow) error {
211	// This query only inserts a single row into the database.  If it becomes problematic, this
212	// could become more efficient without the prepared statement embedded in the SingleTransactionInsert
213	// function
214	table := "denormalized_view_changes_over_time"
215	return errors.Wrap(
216		repoSQL.SingleTransactionInsert(
217			s.db,
218			fmt.Sprintf(
219				`INSERT IGNORE INTO %s (
220					upstream_target_id,
221					downstream_target_id,
222					datastudio_datetime,
223					modified_projects,
224					line_changes,
225					files_changed,
226					upstream_url,
227					upstream_branch,
228					downstream_url,
229					downstream_branch
230				) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
231				table,
232			),
233			s.rowsWithScopedIndices(
234				mappers.DiffRowsToAggregateChangesOverTime(diffRows),
235			),
236		),
237		errorMessageForTable(table),
238	)
239}
240
241func (s ScopedDenormalizer) DenormalizeToRecentCommits(commitRows []ent.AnalyzedCommitRow, commitToTimestamp map[string]ent.RepoTimestamp) error {
242	table := "denormalized_view_recent_commit"
243	if err := s.deleteExistingView(table); err != nil {
244		return err
245	}
246	return errors.Wrap(
247		repoSQL.SingleTransactionInsert(
248			s.db,
249			fmt.Sprintf(
250				`INSERT INTO %s (
251					upstream_target_id,
252					downstream_target_id,
253					row_index,
254					commit_,
255					downstream_project,
256					author,
257					subject,
258					tech_area,
259					project_type,
260					first_seen_datastudio_datetime,
261					upstream_url,
262					upstream_branch,
263					downstream_url,
264					downstream_branch
265				) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
266				table,
267			),
268			s.rowsWithScopedIndices(
269				mappers.CommitRowsToDenormalizedCols(commitRows, commitToTimestamp),
270			),
271		),
272		errorMessageForTable(table),
273	)
274}
275
276func (s ScopedDenormalizer) deleteExistingView(tableName string) error {
277	_, err := s.db.Exec(
278		fmt.Sprintf(
279			`DELETE FROM %s
280			WHERE
281				upstream_target_id = ?
282				AND downstream_target_id = ?`,
283			tableName,
284		),
285		s.mappedTarget.UpstreamTarget,
286		s.mappedTarget.DownstreamTarget,
287	)
288	return err
289}
290
291func (s ScopedDenormalizer) rowsWithScopedIndices(rowsOfCols [][]interface{}) [][]interface{} {
292	return mappers.PrependMappedDiffTarget(
293		s.mappedTarget,
294		mappers.AppendDiffTarget(
295			s.target,
296			rowsOfCols,
297		),
298	)
299}
300
301func errorMessageForTable(tableName string) string {
302	return fmt.Sprintf("Error inserting rows into %s", tableName)
303}
304