Changes to the Cumulus backup format and tools.
[cumulus.git] / localdb.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2007-2008 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* When creating backup snapshots, maintain a local database of data blocks and
21  * checksums, in addition to the data contents (which may be stored remotely).
22  * This database is consulted when attempting to build incremental snapshots,
23  * as it says which objects can be reused.
24  *
25  * The database is implemented as an SQLite3 database, but this implementation
26  * detail is kept internal to this file, so that the storage format may be
27  * changed later. */
28
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sqlite3.h>
33
34 #include <algorithm>
35 #include <string>
36
37 #include "localdb.h"
38 #include "store.h"
39 #include "util.h"
40
41 using std::max;
42 using std::min;
43 using std::set;
44 using std::string;
45
46 static const int SCHEMA_MAJOR = 0;
47 static const int SCHEMA_MINOR = 11;
48
49 /* Helper function to prepare a statement for execution in the current
50  * database. */
51 sqlite3_stmt *LocalDb::Prepare(const char *sql)
52 {
53     sqlite3_stmt *stmt;
54     int rc;
55     const char *tail;
56
57     rc = sqlite3_prepare_v2(db, sql, strlen(sql), &stmt, &tail);
58     if (rc != SQLITE_OK) {
59         ReportError(rc);
60         fatal(string("Error preparing statement: ") + sql);
61     }
62
63     return stmt;
64 }
65
66 void LocalDb::ReportError(int rc)
67 {
68     fprintf(stderr, "Result code: %d\n", rc);
69     fprintf(stderr, "Error message: %s\n", sqlite3_errmsg(db));
70 }
71
72 void LocalDb::Open(const char *path, const char *snapshot_name,
73                    const char *snapshot_scheme, double intent)
74 {
75     int rc;
76
77     rc = sqlite3_open(path, &db);
78     if (rc) {
79         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
80         sqlite3_close(db);
81         fatal("Error opening local database");
82     }
83
84     rc = sqlite3_exec(db, "begin", NULL, NULL, NULL);
85     if (rc) {
86         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
87         sqlite3_close(db);
88         fatal("Error starting transaction");
89     }
90
91     sqlite3_extended_result_codes(db, 1);
92
93     /* Check that the local database format is the correct version; if not,
94      * report an error. */
95     sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
96
97     rc = sqlite3_step(stmt);
98     if (rc != SQLITE_ROW) {
99         fatal("Unable to read local database version from database");
100     } else if (rc == SQLITE_ROW) {
101         int major = sqlite3_column_int(stmt, 0);
102         int minor = sqlite3_column_int(stmt, 1);
103         if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
104             fprintf(stderr,
105                     "Local database does not have required schema version!\n"
106                     "  expected: %d.%d, found: %d.%d\n",
107                     SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
108             fatal("Unable to continue.");
109         }
110     }
111     sqlite3_finalize(stmt);
112
113     if (snapshot_scheme == NULL)
114         snapshot_scheme = "";
115
116     /* Insert this snapshot into the database, and determine the integer key
117      * which will be used to identify it. */
118     stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) "
119                    "values (?, ?, julianday('now'), ?)");
120     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
121                       SQLITE_TRANSIENT);
122     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
123                       SQLITE_TRANSIENT);
124     sqlite3_bind_double(stmt, 3, intent);
125
126     rc = sqlite3_step(stmt);
127     if (rc != SQLITE_DONE) {
128         ReportError(rc);
129         sqlite3_close(db);
130         fatal("Database execution error!");
131     }
132
133     snapshotid = sqlite3_last_insert_rowid(db);
134     sqlite3_finalize(stmt);
135     if (snapshotid == 0) {
136         ReportError(rc);
137         sqlite3_close(db);
138         fatal("Find snapshot id");
139     }
140
141     /* Create a temporary table which will be used to keep track of the objects
142      * used by this snapshot.  When the database is closed, we will summarize
143      * the results of this table into segments_used. */
144     rc = sqlite3_exec(db,
145                       "create temporary table snapshot_refs ("
146                       "    segmentid integer not null,"
147                       "    object text not null,"
148                       "    size integer not null"
149                       ")", NULL, NULL, NULL);
150     if (rc != SQLITE_OK) {
151         ReportError(rc);
152         sqlite3_close(db);
153         fatal("Database initialization");
154     }
155     rc = sqlite3_exec(db,
156                       "create unique index snapshot_refs_index "
157                       "on snapshot_refs(segmentid, object)",
158                       NULL, NULL, NULL);
159     if (rc != SQLITE_OK) {
160         ReportError(rc);
161         sqlite3_close(db);
162         fatal("Database initialization");
163     }
164 }
165
166 void LocalDb::Close()
167 {
168     int rc;
169
170     /* Summarize the snapshot_refs table into segment_utilization. */
171     sqlite3_stmt *stmt = Prepare(
172         "insert or replace into segment_utilization "
173         "select ? as snapshotid, segmentid, sum(size) "
174         "from snapshot_refs group by segmentid"
175     );
176     sqlite3_bind_int64(stmt, 1, snapshotid);
177     rc = sqlite3_step(stmt);
178     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
179         ReportError(rc);
180         sqlite3_close(db);
181         fprintf(stderr, "DATABASE ERROR: Unable to create segment summary!\n");
182     }
183     sqlite3_finalize(stmt);
184
185     /* Commit changes to the database and close. */
186     rc = sqlite3_exec(db, "commit", NULL, NULL, NULL);
187     if (rc != SQLITE_OK) {
188         fprintf(stderr, "DATABASE ERROR: Can't commit database!\n");
189         ReportError(rc);
190     }
191     sqlite3_close(db);
192 }
193
194 int64_t LocalDb::SegmentToId(const string &segment)
195 {
196     int rc;
197     sqlite3_stmt *stmt;
198     int64_t result;
199
200     stmt = Prepare("insert or ignore into segments(segment) values (?)");
201     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
202                       SQLITE_TRANSIENT);
203     rc = sqlite3_step(stmt);
204     if (rc != SQLITE_DONE) {
205         fatal("Could not execute INSERT statement!");
206     }
207     sqlite3_finalize(stmt);
208
209     stmt = Prepare("select segmentid from segments where segment = ?");
210     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
211                       SQLITE_TRANSIENT);
212
213     rc = sqlite3_step(stmt);
214     if (rc == SQLITE_DONE) {
215         fatal("No segment found by id");
216     } else if (rc == SQLITE_ROW) {
217         result = sqlite3_column_int64(stmt, 0);
218     } else {
219         fatal("Error executing find segment by id query");
220     }
221
222     sqlite3_finalize(stmt);
223
224     return result;
225 }
226
227 string LocalDb::IdToSegment(int64_t segmentid)
228 {
229     int rc;
230     sqlite3_stmt *stmt;
231     string result;
232
233     stmt = Prepare("select segment from segments where segmentid = ?");
234     sqlite3_bind_int64(stmt, 1, segmentid);
235
236     rc = sqlite3_step(stmt);
237     if (rc == SQLITE_DONE) {
238         fatal("No segment found by id");
239     } else if (rc == SQLITE_ROW) {
240         result = (const char *)sqlite3_column_text(stmt, 0);
241     } else {
242         fatal("Error executing find segment by id query");
243     }
244
245     sqlite3_finalize(stmt);
246
247     return result;
248 }
249
250 void LocalDb::StoreObject(const ObjectReference& ref, double age)
251 {
252     int rc;
253     sqlite3_stmt *stmt;
254
255     assert(ref.has_checksum());
256     string checksum = ref.get_checksum();
257     assert(ref.range_is_exact());
258     int64_t size = ref.get_range_length();
259
260     if (age == 0.0) {
261         stmt = Prepare("insert into block_index("
262                        "segmentid, object, checksum, size, timestamp) "
263                        "values (?, ?, ?, ?, julianday('now'))");
264     } else {
265         stmt = Prepare("insert into block_index("
266                        "segmentid, object, checksum, size, timestamp) "
267                        "values (?, ?, ?, ?, ?)");
268     }
269
270     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
271     string obj = ref.get_sequence();
272     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
273     sqlite3_bind_text(stmt, 3, checksum.c_str(), checksum.size(),
274                       SQLITE_TRANSIENT);
275     sqlite3_bind_int64(stmt, 4, size);
276     if (age != 0.0)
277         sqlite3_bind_double(stmt, 5, age);
278
279     rc = sqlite3_step(stmt);
280     if (rc != SQLITE_DONE) {
281         fprintf(stderr, "Could not execute INSERT statement!\n");
282         ReportError(rc);
283     }
284
285     sqlite3_finalize(stmt);
286 }
287
288 ObjectReference LocalDb::FindObject(const string &checksum, int64_t size)
289 {
290     int rc;
291     sqlite3_stmt *stmt;
292     ObjectReference ref;
293
294     stmt = Prepare("select segmentid, object from block_index "
295                    "where checksum = ? and size = ? and expired is null");
296     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
297                       SQLITE_TRANSIENT);
298     sqlite3_bind_int64(stmt, 2, size);
299
300     rc = sqlite3_step(stmt);
301     if (rc == SQLITE_DONE) {
302     } else if (rc == SQLITE_ROW) {
303         ref = ObjectReference(IdToSegment(sqlite3_column_int64(stmt, 0)),
304                               (const char *)sqlite3_column_text(stmt, 1));
305         ref.set_range(0, size, true);
306     } else {
307         fprintf(stderr, "Could not execute SELECT statement!\n");
308         ReportError(rc);
309     }
310
311     sqlite3_finalize(stmt);
312
313     return ref;
314 }
315
316 bool LocalDb::IsOldObject(const string &checksum, int64_t size, double *age,
317                           int *group)
318 {
319     int rc;
320     sqlite3_stmt *stmt;
321     bool found = false;
322
323     stmt = Prepare("select segmentid, object, timestamp, expired "
324                    "from block_index where checksum = ? and size = ?");
325     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
326                       SQLITE_TRANSIENT);
327     sqlite3_bind_int64(stmt, 2, size);
328
329     rc = sqlite3_step(stmt);
330     if (rc == SQLITE_DONE) {
331         found = false;
332     } else if (rc == SQLITE_ROW) {
333         found = true;
334         *age = sqlite3_column_double(stmt, 2);
335         *group = sqlite3_column_int(stmt, 3);
336     } else {
337         fprintf(stderr, "Could not execute SELECT statement!\n");
338         ReportError(rc);
339     }
340
341     sqlite3_finalize(stmt);
342
343     return found;
344 }
345
346 /* Does this object still exist in the database (and not expired)? */
347 bool LocalDb::IsAvailable(const ObjectReference &ref)
348 {
349     int rc;
350     sqlite3_stmt *stmt;
351     bool found = false;
352
353     // Special objects (such as the zero object) aren't stored in segments, and
354     // so are always available.
355     if (!ref.is_normal())
356         return true;
357
358     stmt = Prepare("select count(*) from block_index "
359                    "where segmentid = ? and object = ? and expired is null");
360     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
361     sqlite3_bind_text(stmt, 2, ref.get_sequence().c_str(),
362                       ref.get_sequence().size(), SQLITE_TRANSIENT);
363
364     rc = sqlite3_step(stmt);
365     if (rc == SQLITE_DONE) {
366         found = false;
367     } else if (rc == SQLITE_ROW) {
368         if (sqlite3_column_int(stmt, 0) > 0)
369             found = true;
370     } else {
371         fprintf(stderr, "Could not execute SELECT statement!\n");
372         ReportError(rc);
373     }
374
375     sqlite3_finalize(stmt);
376
377     return found;
378 }
379
380 set<string> LocalDb::GetUsedSegments()
381 {
382     int rc;
383     sqlite3_stmt *stmt;
384     set<string> result;
385
386     stmt = Prepare("select segment from segments "
387                    "where segmentid in (select segmentid from snapshot_refs)");
388
389     while (true) {
390         rc = sqlite3_step(stmt);
391         if (rc == SQLITE_ROW) {
392             const char *segment
393                 = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
394             result.insert(segment);
395         } else if (rc == SQLITE_DONE) {
396             break;
397         } else {
398             ReportError(rc);
399         }
400     }
401
402     sqlite3_finalize(stmt);
403
404     return result;
405 }
406
407 void LocalDb::UseObject(const ObjectReference& ref)
408 {
409     int rc;
410     sqlite3_stmt *stmt;
411
412     if (!ref.is_normal())
413         return;
414
415     int64_t old_size = 0;
416     stmt = Prepare("select size from snapshot_refs "
417                    "where segmentid = ? and object = ?");
418     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
419     string obj = ref.get_sequence();
420     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
421     rc = sqlite3_step(stmt);
422     if (rc == SQLITE_ROW) {
423         old_size = sqlite3_column_int64(stmt, 0);
424     }
425     sqlite3_finalize(stmt);
426
427     // Attempt to determine the underlying size of the object.  This may
428     // require a database lookup if the length is not encoded into the object
429     // reference already.
430     int64_t object_size = 0;
431     if (ref.range_is_exact()) {
432         object_size = ref.get_range_length();
433     } else {
434         stmt = Prepare("select size from block_index "
435                        "where segmentid = ? and object = ?");
436         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
437         obj = ref.get_sequence();
438         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
439         rc = sqlite3_step(stmt);
440         if (rc == SQLITE_ROW) {
441             object_size = sqlite3_column_int64(stmt, 0);
442         } else {
443             fprintf(stderr, "Warning: No block found in block_index for %s\n",
444                     ref.to_string().c_str());
445         }
446         sqlite3_finalize(stmt);
447     }
448
449     // Possibly mark additional bytes as being referenced.  The number of bytes
450     // referenced can only be increased (up to the object size).  The bytes
451     // referenced will be set to the object size only if the entire object is
452     // referenced at once: a series of partial ranges that add up to the total
453     // size will have a reference size capped at just less than the full object
454     // size (we can't tell if some bytes were referenced multiple times, and
455     // thus we conservatively assume some bytes might still be unreferenced).
456     int64_t new_refs = old_size;
457     if (ref.has_range()) {
458         new_refs = ref.get_range_length();
459     } else {
460         new_refs = object_size;
461     }
462     int64_t new_size = old_size + new_refs;
463     if (old_size < object_size && new_refs < object_size)
464         new_size = min(new_size, object_size - 1);
465     new_size = min(object_size, new_size);
466     new_size = max(new_size, (int64_t)0);
467
468     if (new_size != old_size) {
469         stmt = Prepare("insert or replace "
470                        "into snapshot_refs(segmentid, object, size) "
471                        "values (?, ?, ?)");
472         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
473         obj = ref.get_sequence();
474         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
475         sqlite3_bind_int64(stmt, 3, new_size);
476
477         rc = sqlite3_step(stmt);
478         if (rc != SQLITE_DONE) {
479             fprintf(stderr, "Could not execute INSERT statement!\n");
480             ReportError(rc);
481         }
482
483         sqlite3_finalize(stmt);
484     }
485 }
486
487 void LocalDb::SetSegmentMetadata(const std::string &segment,
488                                  const std::string &path,
489                                  const std::string &checksum,
490                                  const std::string &type,
491                                  int data_size, int disk_size)
492 {
493     int rc;
494     sqlite3_stmt *stmt;
495
496     stmt = Prepare("update segments set path = ?, checksum = ?, "
497                    "type = ?, data_size = ?, disk_size = ?, "
498                    "mtime = coalesce(mtime, julianday('now')) "
499                    "where segmentid = ?");
500     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
501                       SQLITE_TRANSIENT);
502     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
503                       SQLITE_TRANSIENT);
504     sqlite3_bind_text(stmt, 3, type.c_str(), type.size(),
505                       SQLITE_TRANSIENT);
506     sqlite3_bind_int64(stmt, 4, data_size);
507     sqlite3_bind_int64(stmt, 5, disk_size);
508     sqlite3_bind_int64(stmt, 6, SegmentToId(segment));
509
510     rc = sqlite3_step(stmt);
511     if (rc != SQLITE_DONE) {
512         fprintf(stderr, "Could not update segment checksum in database!\n");
513         ReportError(rc);
514     }
515
516     sqlite3_finalize(stmt);
517 }
518
519 bool LocalDb::GetSegmentMetadata(const string &segment,
520                                  string *seg_path,
521                                  string *seg_checksum)
522 {
523     int rc;
524     sqlite3_stmt *stmt;
525     ObjectReference ref;
526     int found = false;
527
528     stmt = Prepare("select path, checksum from segments where segment = ?");
529     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
530                       SQLITE_TRANSIENT);
531
532     rc = sqlite3_step(stmt);
533     if (rc == SQLITE_DONE) {
534     } else if (rc == SQLITE_ROW) {
535         found = true;
536         const char *val;
537
538         val = (const char *)sqlite3_column_text(stmt, 0);
539         if (val == NULL)
540             found = false;
541         else
542             *seg_path = val;
543
544         val = (const char *)sqlite3_column_text(stmt, 1);
545         if (val == NULL)
546             found = false;
547         else
548             *seg_checksum = val;
549     } else {
550         fprintf(stderr, "Could not execute SELECT statement!\n");
551         ReportError(rc);
552     }
553
554     sqlite3_finalize(stmt);
555
556     return found;
557 }
558
559 /* Look up and return the packed representation of the subblock chunk
560  * signatures.  Returns true if signatures were found for the specified object,
561  * and if so sets *buf to point at a buffer of memory (allocated with malloc;
562  * the caller should free it), and *len to the length of the buffer. */
563 bool LocalDb::LoadChunkSignatures(ObjectReference ref,
564                                   void **buf, size_t *len,
565                                   string *algorithm)
566 {
567     int rc;
568     sqlite3_stmt *stmt;
569     int found = false;
570
571     stmt = Prepare("select signatures, algorithm from subblock_signatures "
572                    "where blockid = (select blockid from block_index "
573                    "                 where segmentid = ? and object = ?)");
574     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
575     string obj = ref.get_sequence();
576     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
577
578     rc = sqlite3_step(stmt);
579     if (rc == SQLITE_DONE) {
580     } else if (rc == SQLITE_ROW) {
581         const void *data = sqlite3_column_blob(stmt, 0);
582         *len = sqlite3_column_bytes(stmt, 0);
583
584         if (*len > 0) {
585             *buf = malloc(*len);
586             if (*buf != NULL) {
587                 memcpy(*buf, data, *len);
588                 *algorithm = (const char *)sqlite3_column_text(stmt, 1);
589                 found = true;
590             }
591         }
592     } else {
593         fprintf(stderr, "Could not execute SELECT statement!\n");
594         ReportError(rc);
595     }
596
597     sqlite3_finalize(stmt);
598
599     return found;
600 }
601
602 /* Store the subblock chunk signatures for a specified object.  The object
603  * itself must have already been indexed in the database. */
604 void LocalDb::StoreChunkSignatures(ObjectReference ref,
605                                    const void *buf, size_t len,
606                                    const string& algorithm)
607 {
608     int rc;
609     sqlite3_stmt *stmt;
610
611     stmt = Prepare("select blockid from block_index "
612                    "where segmentid = ? and object = ?");
613     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
614     string obj = ref.get_sequence();
615     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
616
617     rc = sqlite3_step(stmt);
618     if (rc != SQLITE_ROW) {
619         fprintf(stderr,
620                 "Could not determine blockid in StoreChunkSignatures!\n");
621         ReportError(rc);
622         fatal("Error getting blockid");
623     }
624     int64_t blockid = sqlite3_column_int64(stmt, 0);
625     sqlite3_finalize(stmt);
626
627     stmt = Prepare("insert or replace "
628                    "into subblock_signatures(blockid, algorithm, signatures) "
629                    "values (?, ?, ?)");
630     sqlite3_bind_int64(stmt, 1, blockid);
631     sqlite3_bind_text(stmt, 2, algorithm.c_str(), algorithm.size(),
632                       SQLITE_TRANSIENT);
633     sqlite3_bind_blob(stmt, 3, buf, len, SQLITE_TRANSIENT);
634
635     rc = sqlite3_step(stmt);
636     if (rc != SQLITE_DONE) {
637         fprintf(stderr, "Could not insert sub-block checksums!\n");
638         ReportError(rc);
639     }
640
641     sqlite3_finalize(stmt);
642 }