Improve tracking of segments and segment utilization.
[cumulus.git] / localdb.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2007-2008 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* When creating backup snapshots, maintain a local database of data blocks and
21  * checksums, in addition to the data contents (which may be stored remotely).
22  * This database is consulted when attempting to build incremental snapshots,
23  * as it says which objects can be reused.
24  *
25  * The database is implemented as an SQLite3 database, but this implementation
26  * detail is kept internal to this file, so that the storage format may be
27  * changed later. */
28
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sqlite3.h>
33
34 #include <algorithm>
35 #include <string>
36
37 #include "localdb.h"
38 #include "store.h"
39 #include "util.h"
40
41 using std::max;
42 using std::min;
43 using std::set;
44 using std::string;
45
46 static const int SCHEMA_MAJOR = 0;
47 static const int SCHEMA_MINOR = 11;
48
49 /* Helper function to prepare a statement for execution in the current
50  * database. */
51 sqlite3_stmt *LocalDb::Prepare(const char *sql)
52 {
53     sqlite3_stmt *stmt;
54     int rc;
55     const char *tail;
56
57     rc = sqlite3_prepare_v2(db, sql, strlen(sql), &stmt, &tail);
58     if (rc != SQLITE_OK) {
59         ReportError(rc);
60         fatal(string("Error preparing statement: ") + sql);
61     }
62
63     return stmt;
64 }
65
66 void LocalDb::ReportError(int rc)
67 {
68     fprintf(stderr, "Result code: %d\n", rc);
69     fprintf(stderr, "Error message: %s\n", sqlite3_errmsg(db));
70 }
71
72 void LocalDb::Open(const char *path, const char *snapshot_name,
73                    const char *snapshot_scheme, double intent)
74 {
75     int rc;
76
77     rc = sqlite3_open(path, &db);
78     if (rc) {
79         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
80         sqlite3_close(db);
81         fatal("Error opening local database");
82     }
83
84     rc = sqlite3_exec(db, "begin", NULL, NULL, NULL);
85     if (rc) {
86         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
87         sqlite3_close(db);
88         fatal("Error starting transaction");
89     }
90
91     sqlite3_extended_result_codes(db, 1);
92
93     /* Check that the local database format is the correct version; if not,
94      * report an error. */
95     sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
96
97     rc = sqlite3_step(stmt);
98     if (rc != SQLITE_ROW) {
99         fatal("Unable to read local database version from database");
100     } else if (rc == SQLITE_ROW) {
101         int major = sqlite3_column_int(stmt, 0);
102         int minor = sqlite3_column_int(stmt, 1);
103         if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
104             fprintf(stderr,
105                     "Local database does not have required schema version!\n"
106                     "  expected: %d.%d, found: %d.%d\n",
107                     SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
108             fatal("Unable to continue.");
109         }
110     }
111     sqlite3_finalize(stmt);
112
113     if (snapshot_scheme == NULL)
114         snapshot_scheme = "";
115
116     /* Insert this snapshot into the database, and determine the integer key
117      * which will be used to identify it. */
118     stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) "
119                    "values (?, ?, julianday('now'), ?)");
120     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
121                       SQLITE_TRANSIENT);
122     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
123                       SQLITE_TRANSIENT);
124     sqlite3_bind_double(stmt, 3, intent);
125
126     rc = sqlite3_step(stmt);
127     if (rc != SQLITE_DONE) {
128         ReportError(rc);
129         sqlite3_close(db);
130         fatal("Database execution error!");
131     }
132
133     snapshotid = sqlite3_last_insert_rowid(db);
134     sqlite3_finalize(stmt);
135     if (snapshotid == 0) {
136         ReportError(rc);
137         sqlite3_close(db);
138         fatal("Find snapshot id");
139     }
140
141     /* Create a temporary table which will be used to keep track of the objects
142      * used by this snapshot.  When the database is closed, we will summarize
143      * the results of this table into segments_used. */
144     rc = sqlite3_exec(db,
145                       "create temporary table snapshot_refs ("
146                       "    segmentid integer not null,"
147                       "    object text not null,"
148                       "    size integer not null"
149                       ")", NULL, NULL, NULL);
150     if (rc != SQLITE_OK) {
151         ReportError(rc);
152         sqlite3_close(db);
153         fatal("Database initialization");
154     }
155     rc = sqlite3_exec(db,
156                       "create unique index snapshot_refs_index "
157                       "on snapshot_refs(segmentid, object)",
158                       NULL, NULL, NULL);
159     if (rc != SQLITE_OK) {
160         ReportError(rc);
161         sqlite3_close(db);
162         fatal("Database initialization");
163     }
164 }
165
166 void LocalDb::Close()
167 {
168     int rc;
169
170     /* Summarize the snapshot_refs table into segment_utilization. */
171     sqlite3_stmt *stmt = Prepare(
172         "insert or replace into segment_utilization "
173         "select ? as snapshotid, segmentid, sum(size) "
174         "from snapshot_refs group by segmentid"
175     );
176     sqlite3_bind_int64(stmt, 1, snapshotid);
177     rc = sqlite3_step(stmt);
178     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
179         ReportError(rc);
180         sqlite3_close(db);
181         fprintf(stderr, "DATABASE ERROR: Unable to create segment summary!\n");
182     }
183     sqlite3_finalize(stmt);
184
185     /* Commit changes to the database and close. */
186     rc = sqlite3_exec(db, "commit", NULL, NULL, NULL);
187     if (rc != SQLITE_OK) {
188         fprintf(stderr, "DATABASE ERROR: Can't commit database!\n");
189         ReportError(rc);
190     }
191     sqlite3_close(db);
192 }
193
194 int64_t LocalDb::SegmentToId(const string &segment)
195 {
196     int rc;
197     sqlite3_stmt *stmt;
198     int64_t result;
199
200     stmt = Prepare("insert or ignore into segments(segment) values (?)");
201     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
202                       SQLITE_TRANSIENT);
203     rc = sqlite3_step(stmt);
204     if (rc != SQLITE_DONE) {
205         fatal("Could not execute INSERT statement!");
206     }
207     sqlite3_finalize(stmt);
208
209     stmt = Prepare("select segmentid from segments where segment = ?");
210     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
211                       SQLITE_TRANSIENT);
212
213     rc = sqlite3_step(stmt);
214     if (rc == SQLITE_DONE) {
215         fatal("No segment found by id");
216     } else if (rc == SQLITE_ROW) {
217         result = sqlite3_column_int64(stmt, 0);
218     } else {
219         fatal("Error executing find segment by id query");
220     }
221
222     sqlite3_finalize(stmt);
223
224     return result;
225 }
226
227 string LocalDb::IdToSegment(int64_t segmentid)
228 {
229     int rc;
230     sqlite3_stmt *stmt;
231     string result;
232
233     stmt = Prepare("select segment from segments where segmentid = ?");
234     sqlite3_bind_int64(stmt, 1, segmentid);
235
236     rc = sqlite3_step(stmt);
237     if (rc == SQLITE_DONE) {
238         fatal("No segment found by id");
239     } else if (rc == SQLITE_ROW) {
240         result = (const char *)sqlite3_column_text(stmt, 0);
241     } else {
242         fatal("Error executing find segment by id query");
243     }
244
245     sqlite3_finalize(stmt);
246
247     return result;
248 }
249
250 void LocalDb::StoreObject(const ObjectReference& ref, double age)
251 {
252     int rc;
253     sqlite3_stmt *stmt;
254
255     assert(ref.has_checksum());
256     string checksum = ref.get_checksum();
257     assert(ref.range_is_exact());
258     int64_t size = ref.get_range_length();
259
260     if (age == 0.0) {
261         stmt = Prepare("insert into block_index("
262                        "segmentid, object, checksum, size, timestamp) "
263                        "values (?, ?, ?, ?, julianday('now'))");
264     } else {
265         stmt = Prepare("insert into block_index("
266                        "segmentid, object, checksum, size, timestamp) "
267                        "values (?, ?, ?, ?, ?)");
268     }
269
270     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
271     string obj = ref.get_sequence();
272     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
273     sqlite3_bind_text(stmt, 3, checksum.c_str(), checksum.size(),
274                       SQLITE_TRANSIENT);
275     sqlite3_bind_int64(stmt, 4, size);
276     if (age != 0.0)
277         sqlite3_bind_double(stmt, 5, age);
278
279     rc = sqlite3_step(stmt);
280     if (rc != SQLITE_DONE) {
281         fprintf(stderr, "Could not execute INSERT statement!\n");
282         ReportError(rc);
283     }
284
285     sqlite3_finalize(stmt);
286
287     if (age != 0.0) {
288         stmt = Prepare("update segments "
289                        "set mtime = coalesce(max(mtime, ?), ?) "
290                        "where segmentid = ?");
291         sqlite3_bind_double(stmt, 1, age);
292         sqlite3_bind_double(stmt, 2, age);
293         sqlite3_bind_int64(stmt, 3, SegmentToId(ref.get_segment()));
294         rc = sqlite3_step(stmt);
295         sqlite3_finalize(stmt);
296     }
297 }
298
299 ObjectReference LocalDb::FindObject(const string &checksum, int64_t size)
300 {
301     int rc;
302     sqlite3_stmt *stmt;
303     ObjectReference ref;
304
305     stmt = Prepare("select segmentid, object from block_index "
306                    "where checksum = ? and size = ? and expired is null");
307     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
308                       SQLITE_TRANSIENT);
309     sqlite3_bind_int64(stmt, 2, size);
310
311     rc = sqlite3_step(stmt);
312     if (rc == SQLITE_DONE) {
313     } else if (rc == SQLITE_ROW) {
314         ref = ObjectReference(IdToSegment(sqlite3_column_int64(stmt, 0)),
315                               (const char *)sqlite3_column_text(stmt, 1));
316         ref.set_range(0, size, true);
317     } else {
318         fprintf(stderr, "Could not execute SELECT statement!\n");
319         ReportError(rc);
320     }
321
322     sqlite3_finalize(stmt);
323
324     return ref;
325 }
326
327 bool LocalDb::IsOldObject(const string &checksum, int64_t size, double *age,
328                           int *group)
329 {
330     int rc;
331     sqlite3_stmt *stmt;
332     bool found = false;
333
334     stmt = Prepare("select segmentid, object, timestamp, expired "
335                    "from block_index where checksum = ? and size = ?");
336     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
337                       SQLITE_TRANSIENT);
338     sqlite3_bind_int64(stmt, 2, size);
339
340     rc = sqlite3_step(stmt);
341     if (rc == SQLITE_DONE) {
342         found = false;
343     } else if (rc == SQLITE_ROW) {
344         found = true;
345         *age = sqlite3_column_double(stmt, 2);
346         *group = sqlite3_column_int(stmt, 3);
347     } else {
348         fprintf(stderr, "Could not execute SELECT statement!\n");
349         ReportError(rc);
350     }
351
352     sqlite3_finalize(stmt);
353
354     return found;
355 }
356
357 /* Does this object still exist in the database (and not expired)? */
358 bool LocalDb::IsAvailable(const ObjectReference &ref)
359 {
360     int rc;
361     sqlite3_stmt *stmt;
362     bool found = false;
363
364     // Special objects (such as the zero object) aren't stored in segments, and
365     // so are always available.
366     if (!ref.is_normal())
367         return true;
368
369     stmt = Prepare("select count(*) from block_index "
370                    "where segmentid = ? and object = ? and expired is null");
371     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
372     sqlite3_bind_text(stmt, 2, ref.get_sequence().c_str(),
373                       ref.get_sequence().size(), SQLITE_TRANSIENT);
374
375     rc = sqlite3_step(stmt);
376     if (rc == SQLITE_DONE) {
377         found = false;
378     } else if (rc == SQLITE_ROW) {
379         if (sqlite3_column_int(stmt, 0) > 0)
380             found = true;
381     } else {
382         fprintf(stderr, "Could not execute SELECT statement!\n");
383         ReportError(rc);
384     }
385
386     sqlite3_finalize(stmt);
387
388     return found;
389 }
390
391 set<string> LocalDb::GetUsedSegments()
392 {
393     int rc;
394     sqlite3_stmt *stmt;
395     set<string> result;
396
397     stmt = Prepare("select segment from segments "
398                    "where segmentid in (select segmentid from snapshot_refs)");
399
400     while (true) {
401         rc = sqlite3_step(stmt);
402         if (rc == SQLITE_ROW) {
403             const char *segment
404                 = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
405             result.insert(segment);
406         } else if (rc == SQLITE_DONE) {
407             break;
408         } else {
409             ReportError(rc);
410         }
411     }
412
413     sqlite3_finalize(stmt);
414
415     return result;
416 }
417
418 void LocalDb::UseObject(const ObjectReference& ref)
419 {
420     int rc;
421     sqlite3_stmt *stmt;
422
423     if (!ref.is_normal())
424         return;
425
426     int64_t old_size = 0;
427     stmt = Prepare("select size from snapshot_refs "
428                    "where segmentid = ? and object = ?");
429     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
430     string obj = ref.get_sequence();
431     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
432     rc = sqlite3_step(stmt);
433     if (rc == SQLITE_ROW) {
434         old_size = sqlite3_column_int64(stmt, 0);
435     }
436     sqlite3_finalize(stmt);
437
438     // Attempt to determine the underlying size of the object.  This may
439     // require a database lookup if the length is not encoded into the object
440     // reference already.
441     int64_t object_size = 0;
442     if (ref.range_is_exact()) {
443         object_size = ref.get_range_length();
444     } else {
445         stmt = Prepare("select size from block_index "
446                        "where segmentid = ? and object = ?");
447         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
448         obj = ref.get_sequence();
449         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
450         rc = sqlite3_step(stmt);
451         if (rc == SQLITE_ROW) {
452             object_size = sqlite3_column_int64(stmt, 0);
453         } else {
454             fprintf(stderr, "Warning: No block found in block_index for %s\n",
455                     ref.to_string().c_str());
456         }
457         sqlite3_finalize(stmt);
458     }
459
460     // Possibly mark additional bytes as being referenced.  The number of bytes
461     // referenced can only be increased (up to the object size).  The bytes
462     // referenced will be set to the object size only if the entire object is
463     // referenced at once: a series of partial ranges that add up to the total
464     // size will have a reference size capped at just less than the full object
465     // size (we can't tell if some bytes were referenced multiple times, and
466     // thus we conservatively assume some bytes might still be unreferenced).
467     int64_t new_refs = old_size;
468     if (ref.has_range()) {
469         new_refs = ref.get_range_length();
470     } else {
471         new_refs = object_size;
472     }
473     int64_t new_size = old_size + new_refs;
474     if (old_size < object_size && new_refs < object_size)
475         new_size = min(new_size, object_size - 1);
476     new_size = min(object_size, new_size);
477     new_size = max(new_size, (int64_t)0);
478
479     if (new_size != old_size) {
480         stmt = Prepare("insert or replace "
481                        "into snapshot_refs(segmentid, object, size) "
482                        "values (?, ?, ?)");
483         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
484         obj = ref.get_sequence();
485         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
486         sqlite3_bind_int64(stmt, 3, new_size);
487
488         rc = sqlite3_step(stmt);
489         if (rc != SQLITE_DONE) {
490             fprintf(stderr, "Could not execute INSERT statement!\n");
491             ReportError(rc);
492         }
493
494         sqlite3_finalize(stmt);
495     }
496 }
497
498 void LocalDb::SetSegmentChecksum(const std::string &segment,
499                                  const std::string &path,
500                                  const std::string &checksum,
501                                  int data_size, int disk_size)
502 {
503     int rc;
504     sqlite3_stmt *stmt;
505
506     stmt = Prepare("update segments set path = ?, checksum = ?, "
507                    "data_size = ?, disk_size = ?, "
508                    "mtime = coalesce(mtime, julianday('now')) "
509                    "where segmentid = ?");
510     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
511                       SQLITE_TRANSIENT);
512     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
513                       SQLITE_TRANSIENT);
514     sqlite3_bind_int64(stmt, 3, data_size);
515     sqlite3_bind_int64(stmt, 4, disk_size);
516     sqlite3_bind_int64(stmt, 5, SegmentToId(segment));
517
518     rc = sqlite3_step(stmt);
519     if (rc != SQLITE_DONE) {
520         fprintf(stderr, "Could not update segment checksum in database!\n");
521         ReportError(rc);
522     }
523
524     sqlite3_finalize(stmt);
525 }
526
527 bool LocalDb::GetSegmentChecksum(const string &segment,
528                                  string *seg_path,
529                                  string *seg_checksum)
530 {
531     int rc;
532     sqlite3_stmt *stmt;
533     ObjectReference ref;
534     int found = false;
535
536     stmt = Prepare("select path, checksum from segments where segment = ?");
537     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
538                       SQLITE_TRANSIENT);
539
540     rc = sqlite3_step(stmt);
541     if (rc == SQLITE_DONE) {
542     } else if (rc == SQLITE_ROW) {
543         found = true;
544         const char *val;
545
546         val = (const char *)sqlite3_column_text(stmt, 0);
547         if (val == NULL)
548             found = false;
549         else
550             *seg_path = val;
551
552         val = (const char *)sqlite3_column_text(stmt, 1);
553         if (val == NULL)
554             found = false;
555         else
556             *seg_checksum = val;
557     } else {
558         fprintf(stderr, "Could not execute SELECT statement!\n");
559         ReportError(rc);
560     }
561
562     sqlite3_finalize(stmt);
563
564     return found;
565 }
566
567 /* Look up and return the packed representation of the subblock chunk
568  * signatures.  Returns true if signatures were found for the specified object,
569  * and if so sets *buf to point at a buffer of memory (allocated with malloc;
570  * the caller should free it), and *len to the length of the buffer. */
571 bool LocalDb::LoadChunkSignatures(ObjectReference ref,
572                                   void **buf, size_t *len,
573                                   string *algorithm)
574 {
575     int rc;
576     sqlite3_stmt *stmt;
577     int found = false;
578
579     stmt = Prepare("select signatures, algorithm from subblock_signatures "
580                    "where blockid = (select blockid from block_index "
581                    "                 where segmentid = ? and object = ?)");
582     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
583     string obj = ref.get_sequence();
584     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
585
586     rc = sqlite3_step(stmt);
587     if (rc == SQLITE_DONE) {
588     } else if (rc == SQLITE_ROW) {
589         const void *data = sqlite3_column_blob(stmt, 0);
590         *len = sqlite3_column_bytes(stmt, 0);
591
592         if (*len > 0) {
593             *buf = malloc(*len);
594             if (*buf != NULL) {
595                 memcpy(*buf, data, *len);
596                 *algorithm = (const char *)sqlite3_column_text(stmt, 1);
597                 found = true;
598             }
599         }
600     } else {
601         fprintf(stderr, "Could not execute SELECT statement!\n");
602         ReportError(rc);
603     }
604
605     sqlite3_finalize(stmt);
606
607     return found;
608 }
609
610 /* Store the subblock chunk signatures for a specified object.  The object
611  * itself must have already been indexed in the database. */
612 void LocalDb::StoreChunkSignatures(ObjectReference ref,
613                                    const void *buf, size_t len,
614                                    const string& algorithm)
615 {
616     int rc;
617     sqlite3_stmt *stmt;
618
619     stmt = Prepare("select blockid from block_index "
620                    "where segmentid = ? and object = ?");
621     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
622     string obj = ref.get_sequence();
623     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
624
625     rc = sqlite3_step(stmt);
626     if (rc != SQLITE_ROW) {
627         fprintf(stderr,
628                 "Could not determine blockid in StoreChunkSignatures!\n");
629         ReportError(rc);
630         fatal("Error getting blockid");
631     }
632     int64_t blockid = sqlite3_column_int64(stmt, 0);
633     sqlite3_finalize(stmt);
634
635     stmt = Prepare("insert or replace "
636                    "into subblock_signatures(blockid, algorithm, signatures) "
637                    "values (?, ?, ?)");
638     sqlite3_bind_int64(stmt, 1, blockid);
639     sqlite3_bind_text(stmt, 2, algorithm.c_str(), algorithm.size(),
640                       SQLITE_TRANSIENT);
641     sqlite3_bind_blob(stmt, 3, buf, len, SQLITE_TRANSIENT);
642
643     rc = sqlite3_step(stmt);
644     if (rc != SQLITE_DONE) {
645         fprintf(stderr, "Could not insert sub-block checksums!\n");
646         ReportError(rc);
647     }
648
649     sqlite3_finalize(stmt);
650 }