Add write throttling based on the size of the uncommitted journal
[bluesky.git] / bluesky / bluesky-private.h
index f945a93..2595367 100644 (file)
@@ -14,6 +14,7 @@
 #define _BLUESKY_PRIVATE_H
 
 #include "bluesky.h"
+#include <stdlib.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -78,11 +79,13 @@ BlueSkyRCStr *bluesky_crypt_decrypt(BlueSkyRCStr *in, const uint8_t *key);
 void bluesky_crypt_block_encrypt(gchar *cloud_block, size_t len,
                                  BlueSkyCryptKeys *keys);
 gboolean bluesky_crypt_block_decrypt(gchar *cloud_block, size_t len,
-                                     BlueSkyCryptKeys *keys);
+                                     BlueSkyCryptKeys *keys,
+                                     gboolean allow_unauth);
 void bluesky_cloudlog_encrypt(GString *segment, BlueSkyCryptKeys *keys);
 void bluesky_cloudlog_decrypt(char *segment, size_t len,
                               BlueSkyCryptKeys *keys,
-                              BlueSkyRangeset *items);
+                              BlueSkyRangeset *items,
+                              gboolean allow_unauth);
 
 /* Storage layer.  Requests can be performed asynchronously, so these objects
  * help keep track of operations in progress. */
@@ -195,7 +198,7 @@ void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier,
 
 void bluesky_inode_start_sync(BlueSkyInode *inode);
 
-void bluesky_block_touch(BlueSkyInode *inode, uint64_t i);
+void bluesky_block_touch(BlueSkyInode *inode, uint64_t i, gboolean preserve);
 void bluesky_block_fetch(BlueSkyInode *inode, BlueSkyBlock *block,
                          BlueSkyStoreAsync *barrier);
 void bluesky_block_flush(BlueSkyInode *inode, BlueSkyBlock *block,
@@ -207,6 +210,12 @@ void bluesky_file_drop_cached(BlueSkyInode *inode);
  * various pieces of data (both where in the cloud and where cached locally).
  * */
 
+/* Eventually we'll want to support multiple writers.  But for now, hard-code
+ * separate namespaces in the cloud for the proxy and the cleaner to write to.
+ * */
+#define BLUESKY_CLOUD_DIR_PRIMARY 0
+#define BLUESKY_CLOUD_DIR_CLEANER 1
+
 typedef struct {
     char bytes[16];
 } BlueSkyCloudID;
@@ -259,6 +268,10 @@ struct cloudlog_header {
     uint32_t size1, size2, size3;
 } __attribute__((packed));
 
+// Rough size limit for a log segment.  This is not a firm limit and there are
+// no absolute guarantees on the size of a log segment.
+#define LOG_SEGMENT_SIZE (1 << 22)
+
 #define JOURNAL_MAGIC "\nLog"
 #define CLOUDLOG_MAGIC "AgI-"
 #define CLOUDLOG_MAGIC_ENCRYPTED "AgI="     // CLOUDLOG_MAGIC[3] ^= 0x10
@@ -319,6 +332,16 @@ struct BlueSkyCloudLogState {
     GList *inode_list;
     GSList *writeback_list;     // Items which are being serialized right now
     GList *pending_segments;    // Segments which are being uploaded now
+
+    int uploads_pending;        // Count of uploads in progress, not completed
+    GMutex *uploads_pending_lock;
+    GCond *uploads_pending_cond;
+
+    /* What is the most recent sequence number written by the cleaner which we
+     * have processed and incorporated into our own log?  This gets
+     * incorporated into the version vector written out with our checkpoint
+     * records. */
+    int latest_cleaner_seq_seen;
 };
 
 gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b);
@@ -338,6 +361,7 @@ void bluesky_cloudlog_insert_locked(BlueSkyCloudLog *log);
 BlueSkyCloudLog *bluesky_cloudlog_get(BlueSkyFS *fs, BlueSkyCloudID id);
 void bluesky_cloudlog_prefetch(BlueSkyCloudLog *log);
 void bluesky_cloudlog_fetch(BlueSkyCloudLog *log);
+void bluesky_cloudlog_background_fetch(BlueSkyCloudLog *item);
 BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
                                                BlueSkyFS *fs);
 void bluesky_cloudlog_flush(BlueSkyFS *fs);
@@ -390,7 +414,8 @@ struct BlueSkyCacheFile {
     int disk_used;
     BlueSkyFS *fs;
     BlueSkyLog *log;
-    gboolean fetching, ready;   // Cloud data: downloading or ready for use
+    gboolean fetching;          // Cloud data: downloading or ready for use
+    gboolean complete;          // Complete file has been fetched from cloud
     int64_t atime;              // Access time, for cache management
     BlueSkyRangeset *items;     // Locations of valid items
     BlueSkyRangeset *prefetches;// Locations we have been requested to prefetch
@@ -402,6 +427,8 @@ void bluesky_log_finish_all(GList *log_items);
 BlueSkyCloudLog *bluesky_log_get_commit_point(BlueSkyFS *fs);
 void bluesky_log_write_commit_point(BlueSkyFS *fs, BlueSkyCloudLog *marker);
 
+BlueSkyRCStr *bluesky_cachefile_map_raw(BlueSkyCacheFile *cachefile,
+                                        off_t offset, size_t size);
 BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data);
 void bluesky_mmap_unref(BlueSkyCacheFile *mmap);
 void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile);
@@ -415,7 +442,10 @@ void bluesky_replay(BlueSkyFS *fs);
 
 /* Used to track log segments that are being written to the cloud. */
 typedef struct {
-    BlueSkyRCStr *data;
+    BlueSkyFS *fs;
+    char *key;                  /* File name for log segment in backend */
+    GString *raw_data;          /* Data before encryption */
+    BlueSkyRCStr *data;         /* Data after encryption */
     GSList *items;
     GMutex *lock;
     GCond *cond;
@@ -457,6 +487,10 @@ void bluesky_inode_map_minimize(BlueSkyFS *fs);
 
 gboolean bluesky_checkpoint_load(BlueSkyFS *fs);
 
+/* Merging of log state with the work of the cleaner. */
+void bluesky_cleaner_merge(BlueSkyFS *fs);
+void bluesky_cleaner_thread_launch(BlueSkyFS *fs);
+
 #ifdef __cplusplus
 }
 #endif