Add a cache around getpwuid/getgrgid to avoid repeated calls.
[cumulus.git] / localdb.cc
1 /* Cumulus: Smart Filesystem Backup to Dumb Servers
2  *
3  * Copyright (C) 2007-2008  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 /* When creating backup snapshots, maintain a local database of data blocks and
22  * checksums, in addition to the data contents (which may be stored remotely).
23  * This database is consulted when attempting to build incremental snapshots,
24  * as it says which objects can be reused.
25  *
26  * The database is implemented as an SQLite3 database, but this implementation
27  * detail is kept internal to this file, so that the storage format may be
28  * changed later. */
29
30 #include <assert.h>
31 #include <stdio.h>
32 #include <string.h>
33 #include <sqlite3.h>
34
35 #include <algorithm>
36 #include <string>
37
38 #include "localdb.h"
39 #include "store.h"
40 #include "util.h"
41
42 using std::min;
43 using std::string;
44
45 /* Helper function to prepare a statement for execution in the current
46  * database. */
47 sqlite3_stmt *LocalDb::Prepare(const char *sql)
48 {
49     sqlite3_stmt *stmt;
50     int rc;
51     const char *tail;
52
53     rc = sqlite3_prepare_v2(db, sql, strlen(sql), &stmt, &tail);
54     if (rc != SQLITE_OK) {
55         ReportError(rc);
56         fatal(string("Error preparing statement: ") + sql);
57     }
58
59     return stmt;
60 }
61
62 void LocalDb::ReportError(int rc)
63 {
64     fprintf(stderr, "Result code: %d\n", rc);
65     fprintf(stderr, "Error message: %s\n", sqlite3_errmsg(db));
66 }
67
68 void LocalDb::Open(const char *path, const char *snapshot_name,
69                    const char *snapshot_scheme, double intent)
70 {
71     int rc;
72
73     rc = sqlite3_open(path, &db);
74     if (rc) {
75         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
76         sqlite3_close(db);
77         fatal("Error opening local database");
78     }
79
80     rc = sqlite3_exec(db, "begin", NULL, NULL, NULL);
81     if (rc) {
82         fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
83         sqlite3_close(db);
84         fatal("Error starting transaction");
85     }
86
87     sqlite3_extended_result_codes(db, 1);
88
89     if (snapshot_scheme == NULL)
90         snapshot_scheme = "";
91
92     /* Insert this snapshot into the database, and determine the integer key
93      * which will be used to identify it. */
94     sqlite3_stmt *stmt = Prepare("insert into "
95                                  "snapshots(name, scheme, timestamp, intent) "
96                                  "values (?, ?, julianday('now'), ?)");
97     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
98                       SQLITE_TRANSIENT);
99     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
100                       SQLITE_TRANSIENT);
101     sqlite3_bind_double(stmt, 3, intent);
102
103     rc = sqlite3_step(stmt);
104     if (rc != SQLITE_DONE) {
105         ReportError(rc);
106         sqlite3_close(db);
107         fatal("Database execution error!");
108     }
109
110     snapshotid = sqlite3_last_insert_rowid(db);
111     sqlite3_finalize(stmt);
112     if (snapshotid == 0) {
113         ReportError(rc);
114         sqlite3_close(db);
115         fatal("Find snapshot id");
116     }
117
118     /* Create a temporary table which will be used to keep track of the objects
119      * used by this snapshot.  When the database is closed, we will summarize
120      * the results of this table into segments_used. */
121     rc = sqlite3_exec(db,
122                       "create temporary table snapshot_refs ("
123                       "    segmentid integer not null,"
124                       "    object text not null,"
125                       "    size integer not null"
126                       ")", NULL, NULL, NULL);
127     if (rc != SQLITE_OK) {
128         ReportError(rc);
129         sqlite3_close(db);
130         fatal("Database initialization");
131     }
132     rc = sqlite3_exec(db,
133                       "create unique index snapshot_refs_index "
134                       "on snapshot_refs(segmentid, object)",
135                       NULL, NULL, NULL);
136     if (rc != SQLITE_OK) {
137         ReportError(rc);
138         sqlite3_close(db);
139         fatal("Database initialization");
140     }
141 }
142
143 void LocalDb::Close()
144 {
145     int rc;
146
147     /* Summarize the snapshot_refs table into segments_used. */
148     sqlite3_stmt *stmt = Prepare(
149         "insert or replace into segments_used "
150         "select ? as snapshotid, segmentid, max(utilization) from ("
151         "    select segmentid, cast(used as real) / size as utilization "
152         "    from "
153         "    (select segmentid, sum(size) as used from snapshot_refs "
154         "       group by segmentid) "
155         "    join segments using (segmentid) "
156         "  union "
157         "    select segmentid, utilization from segments_used "
158         "    where snapshotid = ? "
159         ") group by segmentid"
160     );
161     sqlite3_bind_int64(stmt, 1, snapshotid);
162     sqlite3_bind_int64(stmt, 2, snapshotid);
163     rc = sqlite3_step(stmt);
164     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
165         ReportError(rc);
166         sqlite3_close(db);
167         fprintf(stderr, "DATABASE ERROR: Unable to create segment summary!\n");
168     }
169     sqlite3_finalize(stmt);
170
171     /* Commit changes to the database and close. */
172     rc = sqlite3_exec(db, "commit", NULL, NULL, NULL);
173     if (rc != SQLITE_OK) {
174         fprintf(stderr, "DATABASE ERROR: Can't commit database!\n");
175         ReportError(rc);
176     }
177     sqlite3_close(db);
178 }
179
180 int64_t LocalDb::SegmentToId(const string &segment)
181 {
182     int rc;
183     sqlite3_stmt *stmt;
184     int64_t result;
185
186     stmt = Prepare("insert or ignore into segments(segment) values (?)");
187     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
188                       SQLITE_TRANSIENT);
189     rc = sqlite3_step(stmt);
190     if (rc != SQLITE_DONE) {
191         fatal("Could not execute INSERT statement!");
192     }
193     sqlite3_finalize(stmt);
194
195     stmt = Prepare("select segmentid from segments where segment = ?");
196     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
197                       SQLITE_TRANSIENT);
198
199     rc = sqlite3_step(stmt);
200     if (rc == SQLITE_DONE) {
201         fatal("No segment found by id");
202     } else if (rc == SQLITE_ROW) {
203         result = sqlite3_column_int64(stmt, 0);
204     } else {
205         fatal("Error executing find segment by id query");
206     }
207
208     sqlite3_finalize(stmt);
209
210     return result;
211 }
212
213 string LocalDb::IdToSegment(int64_t segmentid)
214 {
215     int rc;
216     sqlite3_stmt *stmt;
217     string result;
218
219     stmt = Prepare("select segment from segments where segmentid = ?");
220     sqlite3_bind_int64(stmt, 1, segmentid);
221
222     rc = sqlite3_step(stmt);
223     if (rc == SQLITE_DONE) {
224         fatal("No segment found by id");
225     } else if (rc == SQLITE_ROW) {
226         result = (const char *)sqlite3_column_text(stmt, 0);
227     } else {
228         fatal("Error executing find segment by id query");
229     }
230
231     sqlite3_finalize(stmt);
232
233     return result;
234 }
235
236 void LocalDb::StoreObject(const ObjectReference& ref,
237                           const string &checksum, int64_t size,
238                           double age)
239 {
240     int rc;
241     sqlite3_stmt *stmt;
242
243     if (age == 0.0) {
244         stmt = Prepare("insert into block_index("
245                        "segmentid, object, checksum, size, timestamp) "
246                        "values (?, ?, ?, ?, julianday('now'))");
247     } else {
248         stmt = Prepare("insert into block_index("
249                        "segmentid, object, checksum, size, timestamp) "
250                        "values (?, ?, ?, ?, ?)");
251     }
252
253     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
254     string obj = ref.get_sequence();
255     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
256     sqlite3_bind_text(stmt, 3, checksum.c_str(), checksum.size(),
257                       SQLITE_TRANSIENT);
258     sqlite3_bind_int64(stmt, 4, size);
259     if (age != 0.0)
260         sqlite3_bind_double(stmt, 5, age);
261
262     rc = sqlite3_step(stmt);
263     if (rc != SQLITE_DONE) {
264         fprintf(stderr, "Could not execute INSERT statement!\n");
265         ReportError(rc);
266     }
267
268     sqlite3_finalize(stmt);
269
270     if (age != 0.0) {
271         stmt = Prepare("update segments "
272                        "set mtime = coalesce(max(mtime, ?), ?) "
273                        "where segmentid = ?");
274         sqlite3_bind_double(stmt, 1, age);
275         sqlite3_bind_double(stmt, 2, age);
276         sqlite3_bind_int64(stmt, 3, SegmentToId(ref.get_segment()));
277         rc = sqlite3_step(stmt);
278         sqlite3_finalize(stmt);
279     }
280 }
281
282 ObjectReference LocalDb::FindObject(const string &checksum, int64_t size)
283 {
284     int rc;
285     sqlite3_stmt *stmt;
286     ObjectReference ref;
287
288     stmt = Prepare("select segmentid, object from block_index "
289                    "where checksum = ? and size = ? and expired is null");
290     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
291                       SQLITE_TRANSIENT);
292     sqlite3_bind_int64(stmt, 2, size);
293
294     rc = sqlite3_step(stmt);
295     if (rc == SQLITE_DONE) {
296     } else if (rc == SQLITE_ROW) {
297         ref = ObjectReference(IdToSegment(sqlite3_column_int64(stmt, 0)),
298                               (const char *)sqlite3_column_text(stmt, 1));
299         ref.set_range(0, size, true);
300     } else {
301         fprintf(stderr, "Could not execute SELECT statement!\n");
302         ReportError(rc);
303     }
304
305     sqlite3_finalize(stmt);
306
307     return ref;
308 }
309
310 bool LocalDb::IsOldObject(const string &checksum, int64_t size, double *age,
311                           int *group)
312 {
313     int rc;
314     sqlite3_stmt *stmt;
315     bool found = false;
316
317     stmt = Prepare("select segmentid, object, timestamp, expired "
318                    "from block_index where checksum = ? and size = ?");
319     sqlite3_bind_text(stmt, 1, checksum.c_str(), checksum.size(),
320                       SQLITE_TRANSIENT);
321     sqlite3_bind_int64(stmt, 2, size);
322
323     rc = sqlite3_step(stmt);
324     if (rc == SQLITE_DONE) {
325         found = false;
326     } else if (rc == SQLITE_ROW) {
327         found = true;
328         *age = sqlite3_column_double(stmt, 2);
329         *group = sqlite3_column_int(stmt, 3);
330     } else {
331         fprintf(stderr, "Could not execute SELECT statement!\n");
332         ReportError(rc);
333     }
334
335     sqlite3_finalize(stmt);
336
337     return found;
338 }
339
340 /* Does this object still exist in the database (and not expired)? */
341 bool LocalDb::IsAvailable(const ObjectReference &ref)
342 {
343     int rc;
344     sqlite3_stmt *stmt;
345     bool found = false;
346
347     // Special objects (such as the zero object) aren't stored in segments, and
348     // so are always available.
349     if (!ref.is_normal())
350         return true;
351
352     stmt = Prepare("select count(*) from block_index "
353                    "where segmentid = ? and object = ? and expired is null");
354     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
355     sqlite3_bind_text(stmt, 2, ref.get_sequence().c_str(),
356                       ref.get_sequence().size(), SQLITE_TRANSIENT);
357
358     rc = sqlite3_step(stmt);
359     if (rc == SQLITE_DONE) {
360         found = false;
361     } else if (rc == SQLITE_ROW) {
362         if (sqlite3_column_int(stmt, 0) > 0)
363             found = true;
364     } else {
365         fprintf(stderr, "Could not execute SELECT statement!\n");
366         ReportError(rc);
367     }
368
369     sqlite3_finalize(stmt);
370
371     return found;
372 }
373
374 void LocalDb::UseObject(const ObjectReference& ref)
375 {
376     int rc;
377     sqlite3_stmt *stmt;
378
379     if (!ref.is_normal())
380         return;
381
382     int64_t old_size = 0;
383     stmt = Prepare("select size from snapshot_refs "
384                    "where segmentid = ? and object = ?");
385     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
386     string obj = ref.get_sequence();
387     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
388     rc = sqlite3_step(stmt);
389     if (rc == SQLITE_ROW) {
390         old_size = sqlite3_column_int64(stmt, 0);
391     }
392     sqlite3_finalize(stmt);
393
394     int64_t block_size = 0;
395     stmt = Prepare("select size from block_index "
396                    "where segmentid = ? and object = ?");
397     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
398     obj = ref.get_sequence();
399     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
400     rc = sqlite3_step(stmt);
401     if (rc == SQLITE_ROW) {
402         block_size = sqlite3_column_int64(stmt, 0);
403     } else {
404         string refstr = ref.to_string();
405         fprintf(stderr, "No block found in block_index for %s\n",
406                 refstr.c_str());
407         sqlite3_finalize(stmt);
408         return;
409     }
410     sqlite3_finalize(stmt);
411
412     int64_t new_size = old_size;
413     if (ref.has_range()) {
414         new_size += ref.get_range_length();
415         new_size = min(new_size, block_size);
416     } else {
417         new_size = block_size;
418     }
419
420     if (new_size != old_size) {
421         stmt = Prepare("insert or replace "
422                        "into snapshot_refs(segmentid, object, size) "
423                        "values (?, ?, ?)");
424         sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
425         obj = ref.get_sequence();
426         sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
427         sqlite3_bind_int64(stmt, 3, new_size);
428
429         rc = sqlite3_step(stmt);
430         if (rc != SQLITE_DONE) {
431             fprintf(stderr, "Could not execute INSERT statement!\n");
432             ReportError(rc);
433         }
434
435         sqlite3_finalize(stmt);
436     }
437 }
438
439 void LocalDb::UseSegment(const std::string &segment, double utilization)
440 {
441     int rc;
442     sqlite3_stmt *stmt;
443
444     stmt = Prepare("insert or replace "
445                    "into segments_used(snapshotid, segmentid, utilization) "
446                    "values (?, ?, ?)");
447     sqlite3_bind_int64(stmt, 1, snapshotid);
448     sqlite3_bind_int64(stmt, 2, SegmentToId(segment));
449     sqlite3_bind_double(stmt, 3, utilization);
450
451     rc = sqlite3_step(stmt);
452     if (rc != SQLITE_DONE) {
453         fprintf(stderr, "Could not insert segment use record!\n");
454         ReportError(rc);
455     }
456
457     sqlite3_finalize(stmt);
458 }
459
460 void LocalDb::SetSegmentChecksum(const std::string &segment,
461                                  const std::string &path,
462                                  const std::string &checksum,
463                                  int size)
464 {
465     int rc;
466     sqlite3_stmt *stmt;
467
468     stmt = Prepare("update segments set path = ?, checksum = ?, size = ?, "
469                    "mtime = coalesce(mtime, julianday('now')) "
470                    "where segmentid = ?");
471     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
472                       SQLITE_TRANSIENT);
473     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
474                       SQLITE_TRANSIENT);
475     sqlite3_bind_int64(stmt, 3, size);
476     sqlite3_bind_int64(stmt, 4, SegmentToId(segment));
477
478     rc = sqlite3_step(stmt);
479     if (rc != SQLITE_DONE) {
480         fprintf(stderr, "Could not update segment checksum in database!\n");
481         ReportError(rc);
482     }
483
484     sqlite3_finalize(stmt);
485 }
486
487 bool LocalDb::GetSegmentChecksum(const string &segment,
488                                  string *seg_path,
489                                  string *seg_checksum)
490 {
491     int rc;
492     sqlite3_stmt *stmt;
493     ObjectReference ref;
494     int found = false;
495
496     stmt = Prepare("select path, checksum from segments where segment = ?");
497     sqlite3_bind_text(stmt, 1, segment.c_str(), segment.size(),
498                       SQLITE_TRANSIENT);
499
500     rc = sqlite3_step(stmt);
501     if (rc == SQLITE_DONE) {
502     } else if (rc == SQLITE_ROW) {
503         found = true;
504         const char *val;
505
506         val = (const char *)sqlite3_column_text(stmt, 0);
507         if (val == NULL)
508             found = false;
509         else
510             *seg_path = val;
511
512         val = (const char *)sqlite3_column_text(stmt, 1);
513         if (val == NULL)
514             found = false;
515         else
516             *seg_checksum = val;
517     } else {
518         fprintf(stderr, "Could not execute SELECT statement!\n");
519         ReportError(rc);
520     }
521
522     sqlite3_finalize(stmt);
523
524     return found;
525 }
526
527 /* Look up and return the packed representation of the subblock chunk
528  * signatures.  Returns true if signatures were found for the specified object,
529  * and if so sets *buf to point at a buffer of memory (allocated with malloc;
530  * the caller should free it), and *len to the length of the buffer. */
531 bool LocalDb::LoadChunkSignatures(ObjectReference ref,
532                                   void **buf, size_t *len,
533                                   string *algorithm)
534 {
535     int rc;
536     sqlite3_stmt *stmt;
537     int found = false;
538
539     stmt = Prepare("select signatures, algorithm from subblock_signatures "
540                    "where blockid = (select blockid from block_index "
541                    "                 where segmentid = ? and object = ?)");
542     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
543     string obj = ref.get_sequence();
544     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
545
546     rc = sqlite3_step(stmt);
547     if (rc == SQLITE_DONE) {
548     } else if (rc == SQLITE_ROW) {
549         const void *data = sqlite3_column_blob(stmt, 0);
550         *len = sqlite3_column_bytes(stmt, 0);
551
552         if (*len > 0) {
553             *buf = malloc(*len);
554             if (*buf != NULL) {
555                 memcpy(*buf, data, *len);
556                 *algorithm = (const char *)sqlite3_column_text(stmt, 1);
557                 found = true;
558             }
559         }
560     } else {
561         fprintf(stderr, "Could not execute SELECT statement!\n");
562         ReportError(rc);
563     }
564
565     sqlite3_finalize(stmt);
566
567     return found;
568 }
569
570 /* Store the subblock chunk signatures for a specified object.  The object
571  * itself must have already been indexed in the database. */
572 void LocalDb::StoreChunkSignatures(ObjectReference ref,
573                                    const void *buf, size_t len,
574                                    const string& algorithm)
575 {
576     int rc;
577     sqlite3_stmt *stmt;
578
579     stmt = Prepare("select blockid from block_index "
580                    "where segmentid = ? and object = ?");
581     sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
582     string obj = ref.get_sequence();
583     sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
584
585     rc = sqlite3_step(stmt);
586     if (rc != SQLITE_ROW) {
587         fprintf(stderr,
588                 "Could not determine blockid in StoreChunkSignatures!\n");
589         ReportError(rc);
590         fatal("Error getting blockid");
591     }
592     int64_t blockid = sqlite3_column_int64(stmt, 0);
593     sqlite3_finalize(stmt);
594
595     stmt = Prepare("insert or replace "
596                    "into subblock_signatures(blockid, algorithm, signatures) "
597                    "values (?, ?, ?)");
598     sqlite3_bind_int64(stmt, 1, blockid);
599     sqlite3_bind_text(stmt, 2, algorithm.c_str(), algorithm.size(),
600                       SQLITE_TRANSIENT);
601     sqlite3_bind_blob(stmt, 3, buf, len, SQLITE_TRANSIENT);
602
603     rc = sqlite3_step(stmt);
604     if (rc != SQLITE_DONE) {
605         fprintf(stderr, "Could not insert sub-block checksums!\n");
606         ReportError(rc);
607     }
608
609     sqlite3_finalize(stmt);
610 }