Ensure metadata file is written prior to calculating checksum.
[cumulus.git] / localdb.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2007-2008 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* When creating backup snapshots, maintain a local database of data blocks and
21  * checksums, in addition to the data contents (which may be stored remotely).
22  * This database is consulted when attempting to build incremental snapshots,
23  * as it says which objects can be reused.
24  *
25  * The database is implemented as an SQLite3 database, but this implementation
26  * detail is kept internal to this file, so that the storage format may be
27  * changed later. */
28
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sqlite3.h>
33
34 #include <algorithm>
35 #include <map>
36 #include <string>
37
38 #include "localdb.h"
39 #include "store.h"
40 #include "util.h"
41
42 using std::map;
43 using std::max;
44 using std::min;
45 using std::set;
46 using std::string;
47
48 static const int SCHEMA_MAJOR = 0;
49 static const int SCHEMA_MINOR = 11;
50
51 /* Helper function to prepare a statement for execution in the current
52  * database. */
53 sqlite3_stmt *LocalDb::Prepare(const char *sql)
54 {
55     sqlite3_stmt *stmt;
56     int rc;
57     const char *tail;
58
59     rc = sqlite3_prepare_v2(db, sql, strlen(sql), &stmt, &tail);
60     if (rc != SQLITE_OK) {
61         ReportError(rc);
62         fatal(string("Error preparing statement: ") + sql);
63     }
64
65     return stmt;
66 }
67
68 void LocalDb::ReportError(int rc)
69 {
70     fprintf(stderr, "Result code: %d\n", rc);
71     fprintf(stderr, "Error message: %s\n", sqlite3_errmsg(db));
72 }
73
74 void LocalDb::Open(const char *path, const char *snapshot_name,
75                    const char *snapshot_scheme)
76 {
77     int rc;
78
79     rc = sqlite3_open(path, &db);
80     if (rc) {
81         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
82         sqlite3_close(db);
83         fatal("Error opening local database");
84     }
85
86     rc = sqlite3_exec(db, "begin", NULL, NULL, NULL);
87     if (rc) {
88         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
89         sqlite3_close(db);
90         fatal("Error starting transaction");
91     }
92
93     sqlite3_extended_result_codes(db, 1);
94
95     /* Check that the local database format is the correct version; if not,
96      * report an error. */
97     sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
98
99     rc = sqlite3_step(stmt);
100     if (rc != SQLITE_ROW) {
101         fatal("Unable to read local database version from database");
102     } else if (rc == SQLITE_ROW) {
103         int major = sqlite3_column_int(stmt, 0);
104         int minor = sqlite3_column_int(stmt, 1);
105         if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
106             fprintf(stderr,
107                     "Local database does not have required schema version!\n"
108                     "  expected: %d.%d, found: %d.%d\n",
109                     SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
110             fatal("Unable to continue.");
111         }
112     }
113     sqlite3_finalize(stmt);
114
115     if (snapshot_scheme == NULL)
116         snapshot_scheme = "";
117
118     /* Insert this snapshot into the database, and determine the integer key
119      * which will be used to identify it. */
120     stmt = Prepare("insert into snapshots(name, scheme, timestamp) "
121                    "values (?, ?, julianday('now'))");
122     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
123                       SQLITE_TRANSIENT);
124     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
125                       SQLITE_TRANSIENT);
126
127     rc = sqlite3_step(stmt);
128     if (rc != SQLITE_DONE) {
129         ReportError(rc);
130         sqlite3_close(db);
131         fatal("Database execution error!");
132     }
133
134     snapshotid = sqlite3_last_insert_rowid(db);
135     sqlite3_finalize(stmt);
136     if (snapshotid == 0) {
137         ReportError(rc);
138         sqlite3_close(db);
139         fatal("Find snapshot id");
140     }
141
142     /* Create a temporary table which will be used to keep track of the objects
143      * used by this snapshot.  When the database is closed, we will summarize
144      * the results of this table into segments_used. */
145     rc = sqlite3_exec(db,
146                       "create temporary table snapshot_refs ("
147                       "    segmentid integer not null,"
148                       "    object text not null,"
149                       "    size integer not null"
150                       ")", NULL, NULL, NULL);
151     if (rc != SQLITE_OK) {
152         ReportError(rc);
153         sqlite3_close(db);
154         fatal("Database initialization");
155     }
156     rc = sqlite3_exec(db,
157                       "create unique index snapshot_refs_index "
158                       "on snapshot_refs(segmentid, object)",
159                       NULL, NULL, NULL);
160     if (rc != SQLITE_OK) {
161         ReportError(rc);
162         sqlite3_close(db);
163         fatal("Database initialization");
164     }
165 }
166
167 void LocalDb::Close()
168 {
169     int rc;
170
171     /* Summarize the snapshot_refs table into segment_utilization. */
172     sqlite3_stmt *stmt = Prepare(
173         "insert or replace into segment_utilization "
174         "select ? as snapshotid, segmentid, sum(size) "
175         "from snapshot_refs group by segmentid"
176     );
177     sqlite3_bind_int64(stmt, 1, snapshotid);
178     rc = sqlite3_step(stmt);
179     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
180         ReportError(rc);
181         sqlite3_close(db);
182         fprintf(stderr, "DATABASE ERROR: Unable to create segment summary!\n");
183     }
184     sqlite3_finalize(stmt);
185
186     /* Commit changes to the database and close. */
187     rc = sqlite3_exec(db, "commit", NULL, NULL, NULL);
188     if (rc != SQLITE_OK) {
189         fprintf(stderr, "DATABASE ERROR: Can't commit database!\n");
190         ReportError(rc);
191     }
192     sqlite3_close(db);
193 }
194
195 int64_t LocalDb::SegmentToId(const string &segment)
196 {
197     int rc;
198     sqlite3_stmt *stmt;
199     int64_t result;
200
201     stmt = Prepare("insert or ignore into segments(segment) values (?)");
202     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
203                       SQLITE_TRANSIENT);
204     rc = sqlite3_step(stmt);
205     if (rc != SQLITE_DONE) {
206         fatal("Could not execute INSERT statement!");
207     }
208     sqlite3_finalize(stmt);
209
210     stmt = Prepare("select segmentid from segments where segment = ?");
211     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
212                       SQLITE_TRANSIENT);
213
214     rc = sqlite3_step(stmt);
215     if (rc == SQLITE_DONE) {
216         fatal("No segment found by id");
217     } else if (rc == SQLITE_ROW) {
218         result = sqlite3_column_int64(stmt, 0);
219     } else {
220         fatal("Error executing find segment by id query");
221     }
222
223     sqlite3_finalize(stmt);
224
225     return result;
226 }
227
228 string LocalDb::IdToSegment(int64_t segmentid)
229 {
230     int rc;
231     sqlite3_stmt *stmt;
232     string result;
233
234     stmt = Prepare("select segment from segments where segmentid = ?");
235     sqlite3_bind_int64(stmt, 1, segmentid);
236
237     rc = sqlite3_step(stmt);
238     if (rc == SQLITE_DONE) {
239         fatal("No segment found by id");
240     } else if (rc == SQLITE_ROW) {
241         result = (const char *)sqlite3_column_text(stmt, 0);
242     } else {
243         fatal("Error executing find segment by id query");
244     }
245
246     sqlite3_finalize(stmt);
247
248     return result;
249 }
250
251 void LocalDb::StoreObject(const ObjectReference& ref, double age)
252 {
253     int rc;
254     sqlite3_stmt *stmt;
255
256     assert(ref.has_checksum());
257     string checksum = ref.get_checksum();
258     assert(ref.range_is_exact());
259     int64_t size = ref.get_range_length();
260
261     if (age == 0.0) {
262         stmt = Prepare("insert into block_index("
263                        "segmentid, object, checksum, size, timestamp) "
264                        "values (?, ?, ?, ?, julianday('now'))");
265     } else {
266         stmt = Prepare("insert into block_index("
267                        "segmentid, object, checksum, size, timestamp) "
268                        "values (?, ?, ?, ?, ?)");
269     }
270
271     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
272     string obj = ref.get_sequence();
273     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
274     sqlite3_bind_text(stmt, 3, checksum.c_str(), checksum.size(),
275                       SQLITE_TRANSIENT);
276     sqlite3_bind_int64(stmt, 4, size);
277     if (age != 0.0)
278         sqlite3_bind_double(stmt, 5, age);
279
280     rc = sqlite3_step(stmt);
281     if (rc != SQLITE_DONE) {
282         fprintf(stderr, "Could not execute INSERT statement!\n");
283         ReportError(rc);
284     }
285
286     sqlite3_finalize(stmt);
287 }
288
289 ObjectReference LocalDb::FindObject(const string &checksum, int64_t size)
290 {
291     int rc;
292     sqlite3_stmt *stmt;
293     ObjectReference ref;
294
295     stmt = Prepare("select segmentid, object from block_index "
296                    "where checksum = ? and size = ? and expired is null");
297     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
298                       SQLITE_TRANSIENT);
299     sqlite3_bind_int64(stmt, 2, size);
300
301     rc = sqlite3_step(stmt);
302     if (rc == SQLITE_DONE) {
303     } else if (rc == SQLITE_ROW) {
304         ref = ObjectReference(IdToSegment(sqlite3_column_int64(stmt, 0)),
305                               (const char *)sqlite3_column_text(stmt, 1));
306         ref.set_range(0, size, true);
307     } else {
308         fprintf(stderr, "Could not execute SELECT statement!\n");
309         ReportError(rc);
310     }
311
312     sqlite3_finalize(stmt);
313
314     return ref;
315 }
316
317 bool LocalDb::IsOldObject(const string &checksum, int64_t size, double *age,
318                           int *group)
319 {
320     int rc;
321     sqlite3_stmt *stmt;
322     bool found = false;
323
324     stmt = Prepare("select segmentid, object, julianday(timestamp), expired "
325                    "from block_index where checksum = ? and size = ?");
326     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
327                       SQLITE_TRANSIENT);
328     sqlite3_bind_int64(stmt, 2, size);
329
330     rc = sqlite3_step(stmt);
331     if (rc == SQLITE_DONE) {
332         found = false;
333     } else if (rc == SQLITE_ROW) {
334         found = true;
335         *age = sqlite3_column_double(stmt, 2);
336         *group = sqlite3_column_int(stmt, 3);
337     } else {
338         fprintf(stderr, "Could not execute SELECT statement!\n");
339         ReportError(rc);
340     }
341
342     sqlite3_finalize(stmt);
343
344     return found;
345 }
346
347 /* Does this object still exist in the database (and not expired)? */
348 bool LocalDb::IsAvailable(const ObjectReference &ref)
349 {
350     int rc;
351     sqlite3_stmt *stmt;
352     bool found = false;
353
354     // Special objects (such as the zero object) aren't stored in segments, and
355     // so are always available.
356     if (!ref.is_normal())
357         return true;
358
359     stmt = Prepare("select count(*) from block_index "
360                    "where segmentid = ? and object = ? and expired is null");
361     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
362     sqlite3_bind_text(stmt, 2, ref.get_sequence().c_str(),
363                       ref.get_sequence().size(), SQLITE_TRANSIENT);
364
365     rc = sqlite3_step(stmt);
366     if (rc == SQLITE_DONE) {
367         found = false;
368     } else if (rc == SQLITE_ROW) {
369         if (sqlite3_column_int(stmt, 0) > 0)
370             found = true;
371     } else {
372         fprintf(stderr, "Could not execute SELECT statement!\n");
373         ReportError(rc);
374     }
375
376     sqlite3_finalize(stmt);
377
378     return found;
379 }
380
381 set<string> LocalDb::GetUsedSegments()
382 {
383     int rc;
384     sqlite3_stmt *stmt;
385     set<string> result;
386
387     stmt = Prepare("select segment from segments "
388                    "where segmentid in (select segmentid from snapshot_refs)");
389
390     while (true) {
391         rc = sqlite3_step(stmt);
392         if (rc == SQLITE_ROW) {
393             const char *segment
394                 = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
395             result.insert(segment);
396         } else if (rc == SQLITE_DONE) {
397             break;
398         } else {
399             ReportError(rc);
400         }
401     }
402
403     sqlite3_finalize(stmt);
404
405     return result;
406 }
407
408 void LocalDb::UseObject(const ObjectReference& ref)
409 {
410     int rc;
411     sqlite3_stmt *stmt;
412
413     if (!ref.is_normal())
414         return;
415
416     int64_t old_size = 0;
417     stmt = Prepare("select size from snapshot_refs "
418                    "where segmentid = ? and object = ?");
419     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
420     string obj = ref.get_sequence();
421     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
422     rc = sqlite3_step(stmt);
423     if (rc == SQLITE_ROW) {
424         old_size = sqlite3_column_int64(stmt, 0);
425     }
426     sqlite3_finalize(stmt);
427
428     // Attempt to determine the underlying size of the object.  This may
429     // require a database lookup if the length is not encoded into the object
430     // reference already.
431     int64_t object_size = 0;
432     if (ref.range_is_exact()) {
433         object_size = ref.get_range_length();
434     } else {
435         stmt = Prepare("select size from block_index "
436                        "where segmentid = ? and object = ?");
437         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
438         obj = ref.get_sequence();
439         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
440         rc = sqlite3_step(stmt);
441         if (rc == SQLITE_ROW) {
442             object_size = sqlite3_column_int64(stmt, 0);
443         } else {
444             fprintf(stderr, "Warning: No block found in block_index for %s\n",
445                     ref.to_string().c_str());
446         }
447         sqlite3_finalize(stmt);
448     }
449
450     // Possibly mark additional bytes as being referenced.  The number of bytes
451     // referenced can only be increased (up to the object size).  The bytes
452     // referenced will be set to the object size only if the entire object is
453     // referenced at once: a series of partial ranges that add up to the total
454     // size will have a reference size capped at just less than the full object
455     // size (we can't tell if some bytes were referenced multiple times, and
456     // thus we conservatively assume some bytes might still be unreferenced).
457     int64_t new_refs;
458     if (ref.has_range()) {
459         new_refs = ref.get_range_length();
460     } else {
461         new_refs = object_size;
462     }
463     int64_t new_size = old_size + new_refs;
464     if (old_size < object_size && new_refs < object_size)
465         new_size = min(new_size, object_size - 1);
466     new_size = min(object_size, new_size);
467     new_size = max(new_size, (int64_t)0);
468
469     if (new_size != old_size) {
470         stmt = Prepare("insert or replace "
471                        "into snapshot_refs(segmentid, object, size) "
472                        "values (?, ?, ?)");
473         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
474         obj = ref.get_sequence();
475         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
476         sqlite3_bind_int64(stmt, 3, new_size);
477
478         rc = sqlite3_step(stmt);
479         if (rc != SQLITE_DONE) {
480             fprintf(stderr, "Could not execute INSERT statement!\n");
481             ReportError(rc);
482         }
483
484         sqlite3_finalize(stmt);
485     }
486 }
487
488 void LocalDb::SetSegmentMetadata(const std::string &segment,
489                                  const std::string &path,
490                                  const std::string &checksum,
491                                  const std::string &type,
492                                  int data_size, int disk_size)
493 {
494     int rc;
495     sqlite3_stmt *stmt;
496
497     stmt = Prepare("update segments set path = ?, checksum = ?, "
498                    "type = ?, data_size = ?, disk_size = ?, "
499                    "timestamp = coalesce(julianday(timestamp), "
500                    "                     julianday('now')) "
501                    "where segmentid = ?");
502     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
503                       SQLITE_TRANSIENT);
504     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
505                       SQLITE_TRANSIENT);
506     sqlite3_bind_text(stmt, 3, type.c_str(), type.size(),
507                       SQLITE_TRANSIENT);
508     sqlite3_bind_int64(stmt, 4, data_size);
509     sqlite3_bind_int64(stmt, 5, disk_size);
510     sqlite3_bind_int64(stmt, 6, SegmentToId(segment));
511
512     rc = sqlite3_step(stmt);
513     if (rc != SQLITE_DONE) {
514         fprintf(stderr, "Could not update segment checksum in database!\n");
515         ReportError(rc);
516     }
517
518     sqlite3_finalize(stmt);
519 }
520
521 map<string, string> LocalDb::GetSegmentMetadata(const string &segment)
522 {
523     int rc;
524     sqlite3_stmt *stmt;
525     map<string, string> info;
526
527     // Names in the returned map, in the order returned from the select
528     // statement below.
529     static const char *fields[] = {
530         "datetime", "path", "checksum", "data_size", "disk_size", "type", NULL
531     };
532
533     stmt = Prepare("select datetime(timestamp), path, checksum, "
534                    "    data_size, disk_size, type "
535                    "from segments where segment = ?");
536     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
537                       SQLITE_TRANSIENT);
538
539     rc = sqlite3_step(stmt);
540     if (rc == SQLITE_DONE) {
541     } else if (rc == SQLITE_ROW) {
542         info["segment"] = segment;
543         for (int i = 0; fields[i] != NULL; i++) {
544             const char *val = (const char *)sqlite3_column_text(stmt, i);
545             if (val != NULL)
546                 info[fields[i]] = val;
547         }
548     } else {
549         fprintf(stderr, "Could not execute SELECT statement!\n");
550         ReportError(rc);
551     }
552
553     sqlite3_finalize(stmt);
554
555     return info;
556 }
557
558 /* Look up and return the packed representation of the subblock chunk
559  * signatures.  Returns true if signatures were found for the specified object,
560  * and if so sets *buf to point at a buffer of memory (allocated with malloc;
561  * the caller should free it), and *len to the length of the buffer. */
562 bool LocalDb::LoadChunkSignatures(ObjectReference ref,
563                                   void **buf, size_t *len,
564                                   string *algorithm)
565 {
566     int rc;
567     sqlite3_stmt *stmt;
568     int found = false;
569
570     stmt = Prepare("select signatures, algorithm from subblock_signatures "
571                    "where blockid = (select blockid from block_index "
572                    "                 where segmentid = ? and object = ?)");
573     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
574     string obj = ref.get_sequence();
575     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
576
577     rc = sqlite3_step(stmt);
578     if (rc == SQLITE_DONE) {
579     } else if (rc == SQLITE_ROW) {
580         const void *data = sqlite3_column_blob(stmt, 0);
581         *len = sqlite3_column_bytes(stmt, 0);
582
583         if (*len > 0) {
584             *buf = malloc(*len);
585             if (*buf != NULL) {
586                 memcpy(*buf, data, *len);
587                 *algorithm = (const char *)sqlite3_column_text(stmt, 1);
588                 found = true;
589             }
590         }
591     } else {
592         fprintf(stderr, "Could not execute SELECT statement!\n");
593         ReportError(rc);
594     }
595
596     sqlite3_finalize(stmt);
597
598     return found;
599 }
600
601 /* Store the subblock chunk signatures for a specified object.  The object
602  * itself must have already been indexed in the database. */
603 void LocalDb::StoreChunkSignatures(ObjectReference ref,
604                                    const void *buf, size_t len,
605                                    const string& algorithm)
606 {
607     int rc;
608     sqlite3_stmt *stmt;
609
610     stmt = Prepare("select blockid from block_index "
611                    "where segmentid = ? and object = ?");
612     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
613     string obj = ref.get_sequence();
614     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
615
616     rc = sqlite3_step(stmt);
617     if (rc != SQLITE_ROW) {
618         fprintf(stderr,
619                 "Could not determine blockid in StoreChunkSignatures!\n");
620         ReportError(rc);
621         fatal("Error getting blockid");
622     }
623     int64_t blockid = sqlite3_column_int64(stmt, 0);
624     sqlite3_finalize(stmt);
625
626     stmt = Prepare("insert or replace "
627                    "into subblock_signatures(blockid, algorithm, signatures) "
628                    "values (?, ?, ?)");
629     sqlite3_bind_int64(stmt, 1, blockid);
630     sqlite3_bind_text(stmt, 2, algorithm.c_str(), algorithm.size(),
631                       SQLITE_TRANSIENT);
632     sqlite3_bind_blob(stmt, 3, buf, len, SQLITE_TRANSIENT);
633
634     rc = sqlite3_step(stmt);
635     if (rc != SQLITE_DONE) {
636         fprintf(stderr, "Could not insert sub-block checksums!\n");
637         ReportError(rc);
638     }
639
640     sqlite3_finalize(stmt);
641 }