Convert to sqlite3 module, from pysqlite2.
[cumulus.git] / localdb.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2007-2008 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* When creating backup snapshots, maintain a local database of data blocks and
21  * checksums, in addition to the data contents (which may be stored remotely).
22  * This database is consulted when attempting to build incremental snapshots,
23  * as it says which objects can be reused.
24  *
25  * The database is implemented as an SQLite3 database, but this implementation
26  * detail is kept internal to this file, so that the storage format may be
27  * changed later. */
28
29 #include <assert.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sqlite3.h>
33
34 #include <algorithm>
35 #include <string>
36
37 #include "localdb.h"
38 #include "store.h"
39 #include "util.h"
40
41 using std::max;
42 using std::min;
43 using std::set;
44 using std::string;
45
46 static const int SCHEMA_MAJOR = 0;
47 static const int SCHEMA_MINOR = 11;
48
49 /* Helper function to prepare a statement for execution in the current
50  * database. */
51 sqlite3_stmt *LocalDb::Prepare(const char *sql)
52 {
53     sqlite3_stmt *stmt;
54     int rc;
55     const char *tail;
56
57     rc = sqlite3_prepare_v2(db, sql, strlen(sql), &stmt, &tail);
58     if (rc != SQLITE_OK) {
59         ReportError(rc);
60         fatal(string("Error preparing statement: ") + sql);
61     }
62
63     return stmt;
64 }
65
66 void LocalDb::ReportError(int rc)
67 {
68     fprintf(stderr, "Result code: %d\n", rc);
69     fprintf(stderr, "Error message: %s\n", sqlite3_errmsg(db));
70 }
71
72 void LocalDb::Open(const char *path, const char *snapshot_name,
73                    const char *snapshot_scheme)
74 {
75     int rc;
76
77     rc = sqlite3_open(path, &db);
78     if (rc) {
79         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
80         sqlite3_close(db);
81         fatal("Error opening local database");
82     }
83
84     rc = sqlite3_exec(db, "begin", NULL, NULL, NULL);
85     if (rc) {
86         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
87         sqlite3_close(db);
88         fatal("Error starting transaction");
89     }
90
91     sqlite3_extended_result_codes(db, 1);
92
93     /* Check that the local database format is the correct version; if not,
94      * report an error. */
95     sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
96
97     rc = sqlite3_step(stmt);
98     if (rc != SQLITE_ROW) {
99         fatal("Unable to read local database version from database");
100     } else if (rc == SQLITE_ROW) {
101         int major = sqlite3_column_int(stmt, 0);
102         int minor = sqlite3_column_int(stmt, 1);
103         if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
104             fprintf(stderr,
105                     "Local database does not have required schema version!\n"
106                     "  expected: %d.%d, found: %d.%d\n",
107                     SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
108             fatal("Unable to continue.");
109         }
110     }
111     sqlite3_finalize(stmt);
112
113     if (snapshot_scheme == NULL)
114         snapshot_scheme = "";
115
116     /* Insert this snapshot into the database, and determine the integer key
117      * which will be used to identify it. */
118     stmt = Prepare("insert into snapshots(name, scheme, timestamp) "
119                    "values (?, ?, julianday('now'))");
120     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
121                       SQLITE_TRANSIENT);
122     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
123                       SQLITE_TRANSIENT);
124
125     rc = sqlite3_step(stmt);
126     if (rc != SQLITE_DONE) {
127         ReportError(rc);
128         sqlite3_close(db);
129         fatal("Database execution error!");
130     }
131
132     snapshotid = sqlite3_last_insert_rowid(db);
133     sqlite3_finalize(stmt);
134     if (snapshotid == 0) {
135         ReportError(rc);
136         sqlite3_close(db);
137         fatal("Find snapshot id");
138     }
139
140     /* Create a temporary table which will be used to keep track of the objects
141      * used by this snapshot.  When the database is closed, we will summarize
142      * the results of this table into segments_used. */
143     rc = sqlite3_exec(db,
144                       "create temporary table snapshot_refs ("
145                       "    segmentid integer not null,"
146                       "    object text not null,"
147                       "    size integer not null"
148                       ")", NULL, NULL, NULL);
149     if (rc != SQLITE_OK) {
150         ReportError(rc);
151         sqlite3_close(db);
152         fatal("Database initialization");
153     }
154     rc = sqlite3_exec(db,
155                       "create unique index snapshot_refs_index "
156                       "on snapshot_refs(segmentid, object)",
157                       NULL, NULL, NULL);
158     if (rc != SQLITE_OK) {
159         ReportError(rc);
160         sqlite3_close(db);
161         fatal("Database initialization");
162     }
163 }
164
165 void LocalDb::Close()
166 {
167     int rc;
168
169     /* Summarize the snapshot_refs table into segment_utilization. */
170     sqlite3_stmt *stmt = Prepare(
171         "insert or replace into segment_utilization "
172         "select ? as snapshotid, segmentid, sum(size) "
173         "from snapshot_refs group by segmentid"
174     );
175     sqlite3_bind_int64(stmt, 1, snapshotid);
176     rc = sqlite3_step(stmt);
177     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
178         ReportError(rc);
179         sqlite3_close(db);
180         fprintf(stderr, "DATABASE ERROR: Unable to create segment summary!\n");
181     }
182     sqlite3_finalize(stmt);
183
184     /* Commit changes to the database and close. */
185     rc = sqlite3_exec(db, "commit", NULL, NULL, NULL);
186     if (rc != SQLITE_OK) {
187         fprintf(stderr, "DATABASE ERROR: Can't commit database!\n");
188         ReportError(rc);
189     }
190     sqlite3_close(db);
191 }
192
193 int64_t LocalDb::SegmentToId(const string &segment)
194 {
195     int rc;
196     sqlite3_stmt *stmt;
197     int64_t result;
198
199     stmt = Prepare("insert or ignore into segments(segment) values (?)");
200     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
201                       SQLITE_TRANSIENT);
202     rc = sqlite3_step(stmt);
203     if (rc != SQLITE_DONE) {
204         fatal("Could not execute INSERT statement!");
205     }
206     sqlite3_finalize(stmt);
207
208     stmt = Prepare("select segmentid from segments where segment = ?");
209     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
210                       SQLITE_TRANSIENT);
211
212     rc = sqlite3_step(stmt);
213     if (rc == SQLITE_DONE) {
214         fatal("No segment found by id");
215     } else if (rc == SQLITE_ROW) {
216         result = sqlite3_column_int64(stmt, 0);
217     } else {
218         fatal("Error executing find segment by id query");
219     }
220
221     sqlite3_finalize(stmt);
222
223     return result;
224 }
225
226 string LocalDb::IdToSegment(int64_t segmentid)
227 {
228     int rc;
229     sqlite3_stmt *stmt;
230     string result;
231
232     stmt = Prepare("select segment from segments where segmentid = ?");
233     sqlite3_bind_int64(stmt, 1, segmentid);
234
235     rc = sqlite3_step(stmt);
236     if (rc == SQLITE_DONE) {
237         fatal("No segment found by id");
238     } else if (rc == SQLITE_ROW) {
239         result = (const char *)sqlite3_column_text(stmt, 0);
240     } else {
241         fatal("Error executing find segment by id query");
242     }
243
244     sqlite3_finalize(stmt);
245
246     return result;
247 }
248
249 void LocalDb::StoreObject(const ObjectReference& ref, double age)
250 {
251     int rc;
252     sqlite3_stmt *stmt;
253
254     assert(ref.has_checksum());
255     string checksum = ref.get_checksum();
256     assert(ref.range_is_exact());
257     int64_t size = ref.get_range_length();
258
259     if (age == 0.0) {
260         stmt = Prepare("insert into block_index("
261                        "segmentid, object, checksum, size, timestamp) "
262                        "values (?, ?, ?, ?, julianday('now'))");
263     } else {
264         stmt = Prepare("insert into block_index("
265                        "segmentid, object, checksum, size, timestamp) "
266                        "values (?, ?, ?, ?, ?)");
267     }
268
269     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
270     string obj = ref.get_sequence();
271     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
272     sqlite3_bind_text(stmt, 3, checksum.c_str(), checksum.size(),
273                       SQLITE_TRANSIENT);
274     sqlite3_bind_int64(stmt, 4, size);
275     if (age != 0.0)
276         sqlite3_bind_double(stmt, 5, age);
277
278     rc = sqlite3_step(stmt);
279     if (rc != SQLITE_DONE) {
280         fprintf(stderr, "Could not execute INSERT statement!\n");
281         ReportError(rc);
282     }
283
284     sqlite3_finalize(stmt);
285 }
286
287 ObjectReference LocalDb::FindObject(const string &checksum, int64_t size)
288 {
289     int rc;
290     sqlite3_stmt *stmt;
291     ObjectReference ref;
292
293     stmt = Prepare("select segmentid, object from block_index "
294                    "where checksum = ? and size = ? and expired is null");
295     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
296                       SQLITE_TRANSIENT);
297     sqlite3_bind_int64(stmt, 2, size);
298
299     rc = sqlite3_step(stmt);
300     if (rc == SQLITE_DONE) {
301     } else if (rc == SQLITE_ROW) {
302         ref = ObjectReference(IdToSegment(sqlite3_column_int64(stmt, 0)),
303                               (const char *)sqlite3_column_text(stmt, 1));
304         ref.set_range(0, size, true);
305     } else {
306         fprintf(stderr, "Could not execute SELECT statement!\n");
307         ReportError(rc);
308     }
309
310     sqlite3_finalize(stmt);
311
312     return ref;
313 }
314
315 bool LocalDb::IsOldObject(const string &checksum, int64_t size, double *age,
316                           int *group)
317 {
318     int rc;
319     sqlite3_stmt *stmt;
320     bool found = false;
321
322     stmt = Prepare("select segmentid, object, timestamp, expired "
323                    "from block_index where checksum = ? and size = ?");
324     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
325                       SQLITE_TRANSIENT);
326     sqlite3_bind_int64(stmt, 2, size);
327
328     rc = sqlite3_step(stmt);
329     if (rc == SQLITE_DONE) {
330         found = false;
331     } else if (rc == SQLITE_ROW) {
332         found = true;
333         *age = sqlite3_column_double(stmt, 2);
334         *group = sqlite3_column_int(stmt, 3);
335     } else {
336         fprintf(stderr, "Could not execute SELECT statement!\n");
337         ReportError(rc);
338     }
339
340     sqlite3_finalize(stmt);
341
342     return found;
343 }
344
345 /* Does this object still exist in the database (and not expired)? */
346 bool LocalDb::IsAvailable(const ObjectReference &ref)
347 {
348     int rc;
349     sqlite3_stmt *stmt;
350     bool found = false;
351
352     // Special objects (such as the zero object) aren't stored in segments, and
353     // so are always available.
354     if (!ref.is_normal())
355         return true;
356
357     stmt = Prepare("select count(*) from block_index "
358                    "where segmentid = ? and object = ? and expired is null");
359     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
360     sqlite3_bind_text(stmt, 2, ref.get_sequence().c_str(),
361                       ref.get_sequence().size(), SQLITE_TRANSIENT);
362
363     rc = sqlite3_step(stmt);
364     if (rc == SQLITE_DONE) {
365         found = false;
366     } else if (rc == SQLITE_ROW) {
367         if (sqlite3_column_int(stmt, 0) > 0)
368             found = true;
369     } else {
370         fprintf(stderr, "Could not execute SELECT statement!\n");
371         ReportError(rc);
372     }
373
374     sqlite3_finalize(stmt);
375
376     return found;
377 }
378
379 set<string> LocalDb::GetUsedSegments()
380 {
381     int rc;
382     sqlite3_stmt *stmt;
383     set<string> result;
384
385     stmt = Prepare("select segment from segments "
386                    "where segmentid in (select segmentid from snapshot_refs)");
387
388     while (true) {
389         rc = sqlite3_step(stmt);
390         if (rc == SQLITE_ROW) {
391             const char *segment
392                 = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
393             result.insert(segment);
394         } else if (rc == SQLITE_DONE) {
395             break;
396         } else {
397             ReportError(rc);
398         }
399     }
400
401     sqlite3_finalize(stmt);
402
403     return result;
404 }
405
406 void LocalDb::UseObject(const ObjectReference& ref)
407 {
408     int rc;
409     sqlite3_stmt *stmt;
410
411     if (!ref.is_normal())
412         return;
413
414     int64_t old_size = 0;
415     stmt = Prepare("select size from snapshot_refs "
416                    "where segmentid = ? and object = ?");
417     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
418     string obj = ref.get_sequence();
419     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
420     rc = sqlite3_step(stmt);
421     if (rc == SQLITE_ROW) {
422         old_size = sqlite3_column_int64(stmt, 0);
423     }
424     sqlite3_finalize(stmt);
425
426     // Attempt to determine the underlying size of the object.  This may
427     // require a database lookup if the length is not encoded into the object
428     // reference already.
429     int64_t object_size = 0;
430     if (ref.range_is_exact()) {
431         object_size = ref.get_range_length();
432     } else {
433         stmt = Prepare("select size from block_index "
434                        "where segmentid = ? and object = ?");
435         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
436         obj = ref.get_sequence();
437         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
438         rc = sqlite3_step(stmt);
439         if (rc == SQLITE_ROW) {
440             object_size = sqlite3_column_int64(stmt, 0);
441         } else {
442             fprintf(stderr, "Warning: No block found in block_index for %s\n",
443                     ref.to_string().c_str());
444         }
445         sqlite3_finalize(stmt);
446     }
447
448     // Possibly mark additional bytes as being referenced.  The number of bytes
449     // referenced can only be increased (up to the object size).  The bytes
450     // referenced will be set to the object size only if the entire object is
451     // referenced at once: a series of partial ranges that add up to the total
452     // size will have a reference size capped at just less than the full object
453     // size (we can't tell if some bytes were referenced multiple times, and
454     // thus we conservatively assume some bytes might still be unreferenced).
455     int64_t new_refs = old_size;
456     if (ref.has_range()) {
457         new_refs = ref.get_range_length();
458     } else {
459         new_refs = object_size;
460     }
461     int64_t new_size = old_size + new_refs;
462     if (old_size < object_size && new_refs < object_size)
463         new_size = min(new_size, object_size - 1);
464     new_size = min(object_size, new_size);
465     new_size = max(new_size, (int64_t)0);
466
467     if (new_size != old_size) {
468         stmt = Prepare("insert or replace "
469                        "into snapshot_refs(segmentid, object, size) "
470                        "values (?, ?, ?)");
471         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
472         obj = ref.get_sequence();
473         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
474         sqlite3_bind_int64(stmt, 3, new_size);
475
476         rc = sqlite3_step(stmt);
477         if (rc != SQLITE_DONE) {
478             fprintf(stderr, "Could not execute INSERT statement!\n");
479             ReportError(rc);
480         }
481
482         sqlite3_finalize(stmt);
483     }
484 }
485
486 void LocalDb::SetSegmentMetadata(const std::string &segment,
487                                  const std::string &path,
488                                  const std::string &checksum,
489                                  const std::string &type,
490                                  int data_size, int disk_size)
491 {
492     int rc;
493     sqlite3_stmt *stmt;
494
495     stmt = Prepare("update segments set path = ?, checksum = ?, "
496                    "type = ?, data_size = ?, disk_size = ?, "
497                    "mtime = coalesce(mtime, julianday('now')) "
498                    "where segmentid = ?");
499     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
500                       SQLITE_TRANSIENT);
501     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
502                       SQLITE_TRANSIENT);
503     sqlite3_bind_text(stmt, 3, type.c_str(), type.size(),
504                       SQLITE_TRANSIENT);
505     sqlite3_bind_int64(stmt, 4, data_size);
506     sqlite3_bind_int64(stmt, 5, disk_size);
507     sqlite3_bind_int64(stmt, 6, SegmentToId(segment));
508
509     rc = sqlite3_step(stmt);
510     if (rc != SQLITE_DONE) {
511         fprintf(stderr, "Could not update segment checksum in database!\n");
512         ReportError(rc);
513     }
514
515     sqlite3_finalize(stmt);
516 }
517
518 bool LocalDb::GetSegmentMetadata(const string &segment,
519                                  string *seg_path,
520                                  string *seg_checksum)
521 {
522     int rc;
523     sqlite3_stmt *stmt;
524     ObjectReference ref;
525     int found = false;
526
527     stmt = Prepare("select path, checksum from segments where segment = ?");
528     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
529                       SQLITE_TRANSIENT);
530
531     rc = sqlite3_step(stmt);
532     if (rc == SQLITE_DONE) {
533     } else if (rc == SQLITE_ROW) {
534         found = true;
535         const char *val;
536
537         val = (const char *)sqlite3_column_text(stmt, 0);
538         if (val == NULL)
539             found = false;
540         else
541             *seg_path = val;
542
543         val = (const char *)sqlite3_column_text(stmt, 1);
544         if (val == NULL)
545             found = false;
546         else
547             *seg_checksum = val;
548     } else {
549         fprintf(stderr, "Could not execute SELECT statement!\n");
550         ReportError(rc);
551     }
552
553     sqlite3_finalize(stmt);
554
555     return found;
556 }
557
558 /* Look up and return the packed representation of the subblock chunk
559  * signatures.  Returns true if signatures were found for the specified object,
560  * and if so sets *buf to point at a buffer of memory (allocated with malloc;
561  * the caller should free it), and *len to the length of the buffer. */
562 bool LocalDb::LoadChunkSignatures(ObjectReference ref,
563                                   void **buf, size_t *len,
564                                   string *algorithm)
565 {
566     int rc;
567     sqlite3_stmt *stmt;
568     int found = false;
569
570     stmt = Prepare("select signatures, algorithm from subblock_signatures "
571                    "where blockid = (select blockid from block_index "
572                    "                 where segmentid = ? and object = ?)");
573     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
574     string obj = ref.get_sequence();
575     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
576
577     rc = sqlite3_step(stmt);
578     if (rc == SQLITE_DONE) {
579     } else if (rc == SQLITE_ROW) {
580         const void *data = sqlite3_column_blob(stmt, 0);
581         *len = sqlite3_column_bytes(stmt, 0);
582
583         if (*len > 0) {
584             *buf = malloc(*len);
585             if (*buf != NULL) {
586                 memcpy(*buf, data, *len);
587                 *algorithm = (const char *)sqlite3_column_text(stmt, 1);
588                 found = true;
589             }
590         }
591     } else {
592         fprintf(stderr, "Could not execute SELECT statement!\n");
593         ReportError(rc);
594     }
595
596     sqlite3_finalize(stmt);
597
598     return found;
599 }
600
601 /* Store the subblock chunk signatures for a specified object.  The object
602  * itself must have already been indexed in the database. */
603 void LocalDb::StoreChunkSignatures(ObjectReference ref,
604                                    const void *buf, size_t len,
605                                    const string& algorithm)
606 {
607     int rc;
608     sqlite3_stmt *stmt;
609
610     stmt = Prepare("select blockid from block_index "
611                    "where segmentid = ? and object = ?");
612     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
613     string obj = ref.get_sequence();
614     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
615
616     rc = sqlite3_step(stmt);
617     if (rc != SQLITE_ROW) {
618         fprintf(stderr,
619                 "Could not determine blockid in StoreChunkSignatures!\n");
620         ReportError(rc);
621         fatal("Error getting blockid");
622     }
623     int64_t blockid = sqlite3_column_int64(stmt, 0);
624     sqlite3_finalize(stmt);
625
626     stmt = Prepare("insert or replace "
627                    "into subblock_signatures(blockid, algorithm, signatures) "
628                    "values (?, ?, ?)");
629     sqlite3_bind_int64(stmt, 1, blockid);
630     sqlite3_bind_text(stmt, 2, algorithm.c_str(), algorithm.size(),
631                       SQLITE_TRANSIENT);
632     sqlite3_bind_blob(stmt, 3, buf, len, SQLITE_TRANSIENT);
633
634     rc = sqlite3_step(stmt);
635     if (rc != SQLITE_DONE) {
636         fprintf(stderr, "Could not insert sub-block checksums!\n");
637         ReportError(rc);
638     }
639
640     sqlite3_finalize(stmt);
641 }