Fix a bug in computing the size of a segment that led to utilization > 1.0.
[cumulus.git] / localdb.cc
1 /* LBS: An LFS-inspired filesystem backup system
2  * Copyright (C) 2007  Michael Vrable
3  *
4  * When creating backup snapshots, maintain a local database of data blocks and
5  * checksums, in addition to the data contents (which may be stored remotely).
6  * This database is consulted when attempting to build incremental snapshots,
7  * as it says which objects can be reused.
8  *
9  * The database is implemented as an SQLite3 database, but this implementation
10  * detail is kept internal to this file, so that the storage format may be
11  * changed later. */
12
13 #include <assert.h>
14 #include <stdio.h>
15 #include <string.h>
16 #include <sqlite3.h>
17
18 #include <string>
19
20 #include "localdb.h"
21 #include "store.h"
22
23 using std::string;
24
25 /* Helper function to prepare a statement for execution in the current
26  * database. */
27 sqlite3_stmt *LocalDb::Prepare(const char *sql)
28 {
29     sqlite3_stmt *stmt;
30     int rc;
31     const char *tail;
32
33     rc = sqlite3_prepare_v2(db, sql, strlen(sql), &stmt, &tail);
34     if (rc != SQLITE_OK) {
35         ReportError(rc);
36         throw IOException(string("Error preparing statement: ") + sql);
37     }
38
39     return stmt;
40 }
41
42 void LocalDb::ReportError(int rc)
43 {
44     fprintf(stderr, "Result code: %d\n", rc);
45     fprintf(stderr, "Error message: %s\n", sqlite3_errmsg(db));
46 }
47
48 void LocalDb::Open(const char *path, const char *snapshot_name,
49                    const char *snapshot_scheme, double intent)
50 {
51     int rc;
52
53     rc = sqlite3_open(path, &db);
54     if (rc) {
55         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
56         sqlite3_close(db);
57         throw IOException("Error opening local database");
58     }
59
60     rc = sqlite3_exec(db, "begin", NULL, NULL, NULL);
61     if (rc) {
62         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
63         sqlite3_close(db);
64         throw IOException("Error starting transaction");
65     }
66
67     sqlite3_extended_result_codes(db, 1);
68
69     /* Insert this snapshot into the database, and determine the integer key
70      * which will be used to identify it. */
71     sqlite3_stmt *stmt = Prepare("insert into "
72                                  "snapshots(name, scheme, timestamp, intent) "
73                                  "values (?, ?, julianday('now'), ?)");
74     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
75                       SQLITE_TRANSIENT);
76     if (snapshot_scheme == NULL)
77         sqlite3_bind_null(stmt, 2);
78     else
79         sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
80                           SQLITE_TRANSIENT);
81     sqlite3_bind_double(stmt, 3, intent);
82
83     rc = sqlite3_step(stmt);
84     if (rc != SQLITE_DONE) {
85         ReportError(rc);
86         sqlite3_close(db);
87         throw IOException("Database execution error!");
88     }
89
90     snapshotid = sqlite3_last_insert_rowid(db);
91     sqlite3_finalize(stmt);
92     if (snapshotid == 0) {
93         ReportError(rc);
94         sqlite3_close(db);
95         throw IOException("Find snapshot id");
96     }
97
98     /* Create a temporary table which will be used to keep track of the objects
99      * used by this snapshot.  When the database is closed, we will summarize
100      * the results of this table into segments_used. */
101     rc = sqlite3_exec(db,
102                       "create temporary table snapshot_refs ("
103                       "    segmentid integer not null,"
104                       "    object text not null,"
105                       "    size integer not null"
106                       ")", NULL, NULL, NULL);
107     if (rc != SQLITE_OK) {
108         ReportError(rc);
109         sqlite3_close(db);
110         throw IOException("Database initialization");
111     }
112     rc = sqlite3_exec(db,
113                       "create unique index snapshot_refs_index "
114                       "on snapshot_refs(segmentid, object)",
115                       NULL, NULL, NULL);
116     if (rc != SQLITE_OK) {
117         ReportError(rc);
118         sqlite3_close(db);
119         throw IOException("Database initialization");
120     }
121 }
122
123 void LocalDb::Close()
124 {
125     int rc;
126
127     /* Summarize the snapshot_refs table into segments_used. */
128     sqlite3_stmt *stmt = Prepare("insert into segments_used "
129                                  "select ? as snapshotid, segmentid, "
130                                  "cast(used as real) / size as utilization "
131                                  "from "
132                                  "(select segmentid, sum(size) as used "
133                                  "from snapshot_refs group by segmentid) "
134                                  "join segments using (segmentid)");
135     sqlite3_bind_int64(stmt, 1, snapshotid);
136     rc = sqlite3_step(stmt);
137     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
138         ReportError(rc);
139         sqlite3_close(db);
140         fprintf(stderr, "DATABASE ERROR: Unable to create segment summary!\n");
141     }
142     sqlite3_finalize(stmt);
143
144     /* Commit changes to the database and close. */
145     rc = sqlite3_exec(db, "commit", NULL, NULL, NULL);
146     if (rc != SQLITE_OK) {
147         fprintf(stderr, "DATABASE ERROR: Can't commit database!\n");
148         ReportError(rc);
149     }
150     sqlite3_close(db);
151 }
152
153 int64_t LocalDb::SegmentToId(const string &segment)
154 {
155     int rc;
156     sqlite3_stmt *stmt;
157     int64_t result;
158
159     stmt = Prepare("insert or ignore into segments(segment) values (?)");
160     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
161                       SQLITE_TRANSIENT);
162     rc = sqlite3_step(stmt);
163     if (rc != SQLITE_DONE) {
164         throw IOException("Could not execute INSERT statement!");
165     }
166     sqlite3_finalize(stmt);
167
168     stmt = Prepare("select segmentid from segments where segment = ?");
169     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
170                       SQLITE_TRANSIENT);
171
172     rc = sqlite3_step(stmt);
173     if (rc == SQLITE_DONE) {
174         throw IOException("No segment found by id");
175     } else if (rc == SQLITE_ROW) {
176         result = sqlite3_column_int64(stmt, 0);
177     } else {
178         throw IOException("Error executing find segment by id query");
179     }
180
181     sqlite3_finalize(stmt);
182
183     return result;
184 }
185
186 string LocalDb::IdToSegment(int64_t segmentid)
187 {
188     int rc;
189     sqlite3_stmt *stmt;
190     string result;
191
192     stmt = Prepare("select segment from segments where segmentid = ?");
193     sqlite3_bind_int64(stmt, 1, segmentid);
194
195     rc = sqlite3_step(stmt);
196     if (rc == SQLITE_DONE) {
197         throw IOException("No segment found by id");
198     } else if (rc == SQLITE_ROW) {
199         result = (const char *)sqlite3_column_text(stmt, 0);
200     } else {
201         throw IOException("Error executing find segment by id query");
202     }
203
204     sqlite3_finalize(stmt);
205
206     return result;
207 }
208
209 void LocalDb::StoreObject(const ObjectReference& ref,
210                           const string &checksum, int64_t size,
211                           double age)
212 {
213     int rc;
214     sqlite3_stmt *stmt;
215
216     if (age == 0.0) {
217         stmt = Prepare("insert into block_index("
218                        "segmentid, object, checksum, size, timestamp) "
219                        "values (?, ?, ?, ?, julianday('now'))");
220     } else {
221         stmt = Prepare("insert into block_index("
222                        "segmentid, object, checksum, size, timestamp) "
223                        "values (?, ?, ?, ?, ?)");
224     }
225
226     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
227     string obj = ref.get_sequence();
228     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
229     sqlite3_bind_text(stmt, 3, checksum.c_str(), checksum.size(),
230                       SQLITE_TRANSIENT);
231     sqlite3_bind_int64(stmt, 4, size);
232     if (age != 0.0)
233         sqlite3_bind_double(stmt, 5, age);
234
235     rc = sqlite3_step(stmt);
236     if (rc != SQLITE_DONE) {
237         fprintf(stderr, "Could not execute INSERT statement!\n");
238         ReportError(rc);
239     }
240
241     sqlite3_finalize(stmt);
242 }
243
244 ObjectReference LocalDb::FindObject(const string &checksum, int64_t size)
245 {
246     int rc;
247     sqlite3_stmt *stmt;
248     ObjectReference ref;
249
250     stmt = Prepare("select segmentid, object from block_index "
251                    "where checksum = ? and size = ? and expired is null");
252     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
253                       SQLITE_TRANSIENT);
254     sqlite3_bind_int64(stmt, 2, size);
255
256     rc = sqlite3_step(stmt);
257     if (rc == SQLITE_DONE) {
258     } else if (rc == SQLITE_ROW) {
259         ref = ObjectReference(IdToSegment(sqlite3_column_int64(stmt, 0)),
260                               (const char *)sqlite3_column_text(stmt, 1));
261         ref.set_range(0, size);
262     } else {
263         fprintf(stderr, "Could not execute SELECT statement!\n");
264         ReportError(rc);
265     }
266
267     sqlite3_finalize(stmt);
268
269     return ref;
270 }
271
272 bool LocalDb::IsOldObject(const string &checksum, int64_t size, double *age,
273                           int *group)
274 {
275     int rc;
276     sqlite3_stmt *stmt;
277     bool found = false;
278
279     stmt = Prepare("select segmentid, object, timestamp, expired "
280                    "from block_index where checksum = ? and size = ?");
281     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
282                       SQLITE_TRANSIENT);
283     sqlite3_bind_int64(stmt, 2, size);
284
285     rc = sqlite3_step(stmt);
286     if (rc == SQLITE_DONE) {
287         found = false;
288     } else if (rc == SQLITE_ROW) {
289         found = true;
290         *age = sqlite3_column_double(stmt, 2);
291         *group = sqlite3_column_int(stmt, 3);
292     } else {
293         fprintf(stderr, "Could not execute SELECT statement!\n");
294         ReportError(rc);
295     }
296
297     sqlite3_finalize(stmt);
298
299     return found;
300 }
301
302 /* Does this object still exist in the database (and not expired)? */
303 bool LocalDb::IsAvailable(const ObjectReference &ref)
304 {
305     int rc;
306     sqlite3_stmt *stmt;
307     bool found = false;
308
309     // Special objects (such as the zero object) aren't stored in segments, and
310     // so are always available.
311     if (!ref.is_normal())
312         return true;
313
314     stmt = Prepare("select count(*) from block_index "
315                    "where segmentid = ? and object = ? and expired is null");
316     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
317     sqlite3_bind_text(stmt, 2, ref.get_sequence().c_str(),
318                       ref.get_sequence().size(), SQLITE_TRANSIENT);
319
320     rc = sqlite3_step(stmt);
321     if (rc == SQLITE_DONE) {
322         found = false;
323     } else if (rc == SQLITE_ROW) {
324         if (sqlite3_column_int(stmt, 0) > 0)
325             found = true;
326     } else {
327         fprintf(stderr, "Could not execute SELECT statement!\n");
328         ReportError(rc);
329     }
330
331     sqlite3_finalize(stmt);
332
333     return found;
334 }
335
336 void LocalDb::UseObject(const ObjectReference& ref)
337 {
338     int rc;
339     sqlite3_stmt *stmt;
340
341     if (!ref.is_normal())
342         return;
343
344     stmt = Prepare("insert or ignore into snapshot_refs "
345                    "select segmentid, object, size from block_index "
346                    "where segmentid = ? and object = ?");
347     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
348     string obj = ref.get_sequence();
349     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
350
351     rc = sqlite3_step(stmt);
352     if (rc != SQLITE_DONE) {
353         fprintf(stderr, "Could not execute INSERT statement!\n");
354         ReportError(rc);
355     }
356
357     sqlite3_finalize(stmt);
358 }
359
360 void LocalDb::SetSegmentChecksum(const std::string &segment,
361                                  const std::string &path,
362                                  const std::string &checksum,
363                                  int size)
364 {
365     int rc;
366     sqlite3_stmt *stmt;
367
368     stmt = Prepare("update segments set path = ?, checksum = ?, size = ? "
369                    "where segmentid = ?");
370     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
371                       SQLITE_TRANSIENT);
372     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
373                       SQLITE_TRANSIENT);
374     sqlite3_bind_int64(stmt, 3, size);
375     sqlite3_bind_int64(stmt, 4, SegmentToId(segment));
376
377     rc = sqlite3_step(stmt);
378     if (rc != SQLITE_DONE) {
379         fprintf(stderr, "Could not update segment checksum in database!\n");
380         ReportError(rc);
381     }
382
383     sqlite3_finalize(stmt);
384 }
385
386 bool LocalDb::GetSegmentChecksum(const string &segment,
387                                  string *seg_path,
388                                  string *seg_checksum)
389 {
390     int rc;
391     sqlite3_stmt *stmt;
392     ObjectReference ref;
393     int found = false;
394
395     stmt = Prepare("select path, checksum from segments where segment = ?");
396     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
397                       SQLITE_TRANSIENT);
398
399     rc = sqlite3_step(stmt);
400     if (rc == SQLITE_DONE) {
401     } else if (rc == SQLITE_ROW) {
402         found = true;
403         const char *val;
404
405         val = (const char *)sqlite3_column_text(stmt, 0);
406         if (val == NULL)
407             found = false;
408         else
409             *seg_path = val;
410
411         val = (const char *)sqlite3_column_text(stmt, 1);
412         if (val == NULL)
413             found = false;
414         else
415             *seg_checksum = val;
416     } else {
417         fprintf(stderr, "Could not execute SELECT statement!\n");
418         ReportError(rc);
419     }
420
421     sqlite3_finalize(stmt);
422
423     return found;
424 }