From 5272f75587abb4cb396059ecbf1a6130bb2e69d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20Th=C3=A1i=20Ng=E1=BB=8Dc=20Duy?= Date: Sun, 6 May 2012 19:31:54 +0700 Subject: index-pack: restructure pack processing into three main functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The second pass in parse_pack_objects() are split into resolve_deltas(). The final phase, fixing thin pack or just seal the pack, is now in conclude_pack() function. Main pack processing is now a sequence of these functions: - parse_pack_objects() reads through the input pack - resolve_deltas() makes sure all deltas can be resolved - conclude_pack() seals the output pack - write_idx_file() writes companion index file - final() moves the pack/index to proper place Signed-off-by: Nguyễn Thái Ngọc Duy Signed-off-by: Junio C Hamano --- builtin/index-pack.c | 128 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 75 insertions(+), 53 deletions(-) (limited to 'builtin/index-pack.c') diff --git a/builtin/index-pack.c b/builtin/index-pack.c index dd1c5c961..a4be4a656 100644 --- a/builtin/index-pack.c +++ b/builtin/index-pack.c @@ -682,19 +682,26 @@ static int compare_delta_entry(const void *a, const void *b) objects[delta_b->obj_no].type); } -/* Parse all objects and return the pack content SHA1 hash */ +static void resolve_base(struct object_entry *obj) +{ + struct base_data *base_obj = alloc_base_data(); + base_obj->obj = obj; + base_obj->data = NULL; + find_unresolved_deltas(base_obj); +} + +/* + * First pass: + * - find locations of all objects; + * - calculate SHA1 of all non-delta objects; + * - remember base (SHA1 or offset) for all deltas. + */ static void parse_pack_objects(unsigned char *sha1) { int i; struct delta_entry *delta = deltas; struct stat st; - /* - * First pass: - * - find locations of all objects; - * - calculate SHA1 of all non-delta objects; - * - remember base (SHA1 or offset) for all deltas. - */ if (verbose) progress = start_progress( from_stdin ? "Receiving objects" : "Indexing objects", @@ -728,6 +735,19 @@ static void parse_pack_objects(unsigned char *sha1) if (S_ISREG(st.st_mode) && lseek(input_fd, 0, SEEK_CUR) - input_len != st.st_size) die("pack has junk at the end"); +} + +/* + * Second pass: + * - for all non-delta objects, look if it is used as a base for + * deltas; + * - if used as a base, uncompress the object and apply all deltas, + * recursively checking if the resulting object is used as a base + * for some more deltas. + */ +static void resolve_deltas(void) +{ + int i; if (!nr_deltas) return; @@ -736,29 +756,63 @@ static void parse_pack_objects(unsigned char *sha1) qsort(deltas, nr_deltas, sizeof(struct delta_entry), compare_delta_entry); - /* - * Second pass: - * - for all non-delta objects, look if it is used as a base for - * deltas; - * - if used as a base, uncompress the object and apply all deltas, - * recursively checking if the resulting object is used as a base - * for some more deltas. - */ if (verbose) progress = start_progress("Resolving deltas", nr_deltas); for (i = 0; i < nr_objects; i++) { struct object_entry *obj = &objects[i]; - struct base_data *base_obj = alloc_base_data(); if (is_delta_type(obj->type)) continue; - base_obj->obj = obj; - base_obj->data = NULL; - find_unresolved_deltas(base_obj); + resolve_base(obj); display_progress(progress, nr_resolved_deltas); } } +/* + * Third pass: + * - append objects to convert thin pack to full pack if required + * - write the final 20-byte SHA-1 + */ +static void fix_unresolved_deltas(struct sha1file *f, int nr_unresolved); +static void conclude_pack(int fix_thin_pack, const char *curr_pack, unsigned char *pack_sha1) +{ + if (nr_deltas == nr_resolved_deltas) { + stop_progress(&progress); + /* Flush remaining pack final 20-byte SHA1. */ + flush(); + return; + } + + if (fix_thin_pack) { + struct sha1file *f; + unsigned char read_sha1[20], tail_sha1[20]; + char msg[48]; + int nr_unresolved = nr_deltas - nr_resolved_deltas; + int nr_objects_initial = nr_objects; + if (nr_unresolved <= 0) + die("confusion beyond insanity"); + objects = xrealloc(objects, + (nr_objects + nr_unresolved + 1) + * sizeof(*objects)); + f = sha1fd(output_fd, curr_pack); + fix_unresolved_deltas(f, nr_unresolved); + sprintf(msg, "completed with %d local objects", + nr_objects - nr_objects_initial); + stop_progress_msg(&progress, msg); + sha1close(f, tail_sha1, 0); + hashcpy(read_sha1, pack_sha1); + fixup_pack_header_footer(output_fd, pack_sha1, + curr_pack, nr_objects, + read_sha1, consumed_bytes-20); + if (hashcmp(read_sha1, tail_sha1) != 0) + die("Unexpected tail checksum for %s " + "(disk corruption?)", curr_pack); + } + if (nr_deltas != nr_resolved_deltas) + die("pack has %d unresolved deltas", + nr_deltas - nr_resolved_deltas); +} + static int write_compressed(struct sha1file *f, void *in, unsigned int size) { git_zstream stream; @@ -1196,40 +1250,8 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix) objects = xcalloc(nr_objects + 1, sizeof(struct object_entry)); deltas = xcalloc(nr_objects, sizeof(struct delta_entry)); parse_pack_objects(pack_sha1); - if (nr_deltas == nr_resolved_deltas) { - stop_progress(&progress); - /* Flush remaining pack final 20-byte SHA1. */ - flush(); - } else { - if (fix_thin_pack) { - struct sha1file *f; - unsigned char read_sha1[20], tail_sha1[20]; - char msg[48]; - int nr_unresolved = nr_deltas - nr_resolved_deltas; - int nr_objects_initial = nr_objects; - if (nr_unresolved <= 0) - die("confusion beyond insanity"); - objects = xrealloc(objects, - (nr_objects + nr_unresolved + 1) - * sizeof(*objects)); - f = sha1fd(output_fd, curr_pack); - fix_unresolved_deltas(f, nr_unresolved); - sprintf(msg, "completed with %d local objects", - nr_objects - nr_objects_initial); - stop_progress_msg(&progress, msg); - sha1close(f, tail_sha1, 0); - hashcpy(read_sha1, pack_sha1); - fixup_pack_header_footer(output_fd, pack_sha1, - curr_pack, nr_objects, - read_sha1, consumed_bytes-20); - if (hashcmp(read_sha1, tail_sha1) != 0) - die("Unexpected tail checksum for %s " - "(disk corruption?)", curr_pack); - } - if (nr_deltas != nr_resolved_deltas) - die("pack has %d unresolved deltas", - nr_deltas - nr_resolved_deltas); - } + resolve_deltas(); + conclude_pack(fix_thin_pack, curr_pack, pack_sha1); free(deltas); if (strict) check_objects(); -- cgit v1.2.1 From b8a2486f1524947f232f657e9f2ebf44e3e7a243 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20Th=C3=A1i=20Ng=E1=BB=8Dc=20Duy?= Date: Sun, 6 May 2012 19:31:55 +0700 Subject: index-pack: support multithreaded delta resolving MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This puts delta resolving on each base on a separate thread, one base cache per thread. Per-thread data is grouped in struct thread_local. When running with nr_threads == 1, no pthreads calls are made. The system essentially runs in non-thread mode. An experiment on a Xeon 24 core machine with git.git shows that performance does not increase proportional to the number of cores. So by default, we use maximum 3 cores. Some numbers with --threads from 1 to 16: 1..4 real 0m8.003s 0m5.307s 0m4.321s 0m3.830s user 0m7.720s 0m8.009s 0m8.133s 0m8.305s sys 0m0.224s 0m0.372s 0m0.360s 0m0.360s 5..8 real 0m3.727s 0m3.604s 0m3.332s 0m3.369s user 0m9.361s 0m9.817s 0m9.525s 0m9.769s sys 0m0.584s 0m0.624s 0m0.540s 0m0.560s 9..12 real 0m3.036s 0m3.139s 0m3.177s 0m2.961s user 0m8.977s 0m10.205s 0m9.737s 0m10.073s sys 0m0.596s 0m0.680s 0m0.684s 0m0.680s 13..16 real 0m2.985s 0m2.894s 0m2.975s 0m2.971s user 0m9.825s 0m10.573s 0m10.833s 0m11.361s sys 0m0.788s 0m0.732s 0m0.904s 0m1.016s On an Intel dual core and linux-2.6.git 1..4 real 2m37.789s 2m7.963s 2m0.920s 1m58.213s user 2m28.415s 2m52.325s 2m50.176s 2m41.187s sys 0m7.808s 0m11.181s 0m11.224s 0m10.731s Thanks Ramsay Jones for troubleshooting and support on MinGW platform. Signed-off-by: Nguyễn Thái Ngọc Duy Signed-off-by: Junio C Hamano --- builtin/index-pack.c | 204 ++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 193 insertions(+), 11 deletions(-) (limited to 'builtin/index-pack.c') diff --git a/builtin/index-pack.c b/builtin/index-pack.c index a4be4a656..d4685c50d 100644 --- a/builtin/index-pack.c +++ b/builtin/index-pack.c @@ -9,6 +9,7 @@ #include "progress.h" #include "fsck.h" #include "exec_cmd.h" +#include "thread-utils.h" static const char index_pack_usage[] = "git index-pack [-v] [-o ] [--keep | --keep=] [--verify] [--strict] ( | --stdin [--fix-thin] [])"; @@ -38,6 +39,14 @@ struct base_data { int ofs_first, ofs_last; }; +struct thread_local { +#ifndef NO_PTHREADS + pthread_t thread; +#endif + struct base_data *base_cache; + size_t base_cache_used; +}; + /* * Even if sizeof(union delta_base) == 24 on 64-bit archs, we really want * to memcmp() only the first 20 bytes. @@ -54,11 +63,11 @@ struct delta_entry { static struct object_entry *objects; static struct delta_entry *deltas; -static struct base_data *base_cache; -static size_t base_cache_used; +static struct thread_local nothread_data; static int nr_objects; static int nr_deltas; static int nr_resolved_deltas; +static int nr_threads; static int from_stdin; static int strict; @@ -75,6 +84,77 @@ static git_SHA_CTX input_ctx; static uint32_t input_crc32; static int input_fd, output_fd, pack_fd; +#ifndef NO_PTHREADS + +static struct thread_local *thread_data; +static int nr_dispatched; +static int threads_active; + +static pthread_mutex_t read_mutex; +#define read_lock() lock_mutex(&read_mutex) +#define read_unlock() unlock_mutex(&read_mutex) + +static pthread_mutex_t counter_mutex; +#define counter_lock() lock_mutex(&counter_mutex) +#define counter_unlock() unlock_mutex(&counter_mutex) + +static pthread_mutex_t work_mutex; +#define work_lock() lock_mutex(&work_mutex) +#define work_unlock() unlock_mutex(&work_mutex) + +static pthread_key_t key; + +static inline void lock_mutex(pthread_mutex_t *mutex) +{ + if (threads_active) + pthread_mutex_lock(mutex); +} + +static inline void unlock_mutex(pthread_mutex_t *mutex) +{ + if (threads_active) + pthread_mutex_unlock(mutex); +} + +/* + * Mutex and conditional variable can't be statically-initialized on Windows. + */ +static void init_thread(void) +{ + init_recursive_mutex(&read_mutex); + pthread_mutex_init(&counter_mutex, NULL); + pthread_mutex_init(&work_mutex, NULL); + pthread_key_create(&key, NULL); + thread_data = xcalloc(nr_threads, sizeof(*thread_data)); + threads_active = 1; +} + +static void cleanup_thread(void) +{ + if (!threads_active) + return; + threads_active = 0; + pthread_mutex_destroy(&read_mutex); + pthread_mutex_destroy(&counter_mutex); + pthread_mutex_destroy(&work_mutex); + pthread_key_delete(key); + free(thread_data); +} + +#else + +#define read_lock() +#define read_unlock() + +#define counter_lock() +#define counter_unlock() + +#define work_lock() +#define work_unlock() + +#endif + + static int mark_link(struct object *obj, int type, void *data) { if (!obj) @@ -223,6 +303,25 @@ static NORETURN void bad_object(unsigned long offset, const char *format, ...) die("pack has bad object at offset %lu: %s", offset, buf); } +static inline struct thread_local *get_thread_data(void) +{ +#ifndef NO_PTHREADS + if (threads_active) + return pthread_getspecific(key); + assert(!threads_active && + "This should only be reached when all threads are gone"); +#endif + return ¬hread_data; +} + +#ifndef NO_PTHREADS +static void set_thread_data(struct thread_local *data) +{ + if (threads_active) + pthread_setspecific(key, data); +} +#endif + static struct base_data *alloc_base_data(void) { struct base_data *base = xmalloc(sizeof(struct base_data)); @@ -237,15 +336,16 @@ static void free_base_data(struct base_data *c) if (c->data) { free(c->data); c->data = NULL; - base_cache_used -= c->size; + get_thread_data()->base_cache_used -= c->size; } } static void prune_base_data(struct base_data *retain) { struct base_data *b; - for (b = base_cache; - base_cache_used > delta_base_cache_limit && b; + struct thread_local *data = get_thread_data(); + for (b = data->base_cache; + data->base_cache_used > delta_base_cache_limit && b; b = b->child) { if (b->data && b != retain) free_base_data(b); @@ -257,12 +357,12 @@ static void link_base_data(struct base_data *base, struct base_data *c) if (base) base->child = c; else - base_cache = c; + get_thread_data()->base_cache = c; c->base = base; c->child = NULL; if (c->data) - base_cache_used += c->size; + get_thread_data()->base_cache_used += c->size; prune_base_data(c); } @@ -272,7 +372,7 @@ static void unlink_base_data(struct base_data *c) if (base) base->child = NULL; else - base_cache = NULL; + get_thread_data()->base_cache = NULL; free_base_data(c); } @@ -461,19 +561,24 @@ static void sha1_object(const void *data, unsigned long size, enum object_type type, unsigned char *sha1) { hash_sha1_file(data, size, typename(type), sha1); + read_lock(); if (has_sha1_file(sha1)) { void *has_data; enum object_type has_type; unsigned long has_size; has_data = read_sha1_file(sha1, &has_type, &has_size); + read_unlock(); if (!has_data) die("cannot read existing object %s", sha1_to_hex(sha1)); if (size != has_size || type != has_type || memcmp(data, has_data, size) != 0) die("SHA1 COLLISION FOUND WITH %s !", sha1_to_hex(sha1)); free(has_data); - } + } else + read_unlock(); + if (strict) { + read_lock(); if (type == OBJ_BLOB) { struct blob *blob = lookup_blob(sha1); if (blob) @@ -507,6 +612,7 @@ static void sha1_object(const void *data, unsigned long size, } obj->flags |= FLAG_CHECKED; } + read_unlock(); } } @@ -552,7 +658,7 @@ static void *get_base_data(struct base_data *c) if (!delta_nr) { c->data = get_data_from_pack(obj); c->size = obj->size; - base_cache_used += c->size; + get_thread_data()->base_cache_used += c->size; prune_base_data(c); } for (; delta_nr > 0; delta_nr--) { @@ -568,7 +674,7 @@ static void *get_base_data(struct base_data *c) free(raw); if (!c->data) bad_object(obj->idx.offset, "failed to apply delta"); - base_cache_used += c->size; + get_thread_data()->base_cache_used += c->size; prune_base_data(c); } free(delta); @@ -596,7 +702,9 @@ static void resolve_delta(struct object_entry *delta_obj, bad_object(delta_obj->idx.offset, "failed to apply delta"); sha1_object(result->data, result->size, delta_obj->real_type, delta_obj->idx.sha1); + counter_lock(); nr_resolved_deltas++; + counter_unlock(); } static struct base_data *find_unresolved_deltas_1(struct base_data *base, @@ -690,6 +798,30 @@ static void resolve_base(struct object_entry *obj) find_unresolved_deltas(base_obj); } +#ifndef NO_PTHREADS +static void *threaded_second_pass(void *data) +{ + set_thread_data(data); + for (;;) { + int i; + work_lock(); + display_progress(progress, nr_resolved_deltas); + while (nr_dispatched < nr_objects && + is_delta_type(objects[nr_dispatched].type)) + nr_dispatched++; + if (nr_dispatched >= nr_objects) { + work_unlock(); + break; + } + i = nr_dispatched++; + work_unlock(); + + resolve_base(&objects[i]); + } + return NULL; +} +#endif + /* * First pass: * - find locations of all objects; @@ -758,6 +890,24 @@ static void resolve_deltas(void) if (verbose) progress = start_progress("Resolving deltas", nr_deltas); + +#ifndef NO_PTHREADS + nr_dispatched = 0; + if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) { + init_thread(); + for (i = 0; i < nr_threads; i++) { + int ret = pthread_create(&thread_data[i].thread, NULL, + threaded_second_pass, thread_data + i); + if (ret) + die("unable to create thread: %s", strerror(ret)); + } + for (i = 0; i < nr_threads; i++) + pthread_join(thread_data[i].thread, NULL); + cleanup_thread(); + return; + } +#endif + for (i = 0; i < nr_objects; i++) { struct object_entry *obj = &objects[i]; @@ -1016,6 +1166,18 @@ static int git_index_pack_config(const char *k, const char *v, void *cb) die("bad pack.indexversion=%"PRIu32, opts->version); return 0; } + if (!strcmp(k, "pack.threads")) { + nr_threads = git_config_int(k, v); + if (nr_threads < 0) + die("invalid number of threads specified (%d)", + nr_threads); +#ifdef NO_PTHREADS + if (nr_threads != 1) + warning("no threads support, ignoring %s", k); + nr_threads = 1; +#endif + return 0; + } return git_default_config(k, v, cb); } @@ -1174,6 +1336,17 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix) keep_msg = ""; } else if (!prefixcmp(arg, "--keep=")) { keep_msg = arg + 7; + } else if (!prefixcmp(arg, "--threads=")) { + char *end; + nr_threads = strtoul(arg+10, &end, 0); + if (!arg[10] || *end || nr_threads < 0) + usage(index_pack_usage); +#ifdef NO_PTHREADS + if (nr_threads != 1) + warning("no threads support, " + "ignoring %s", arg); + nr_threads = 1; +#endif } else if (!prefixcmp(arg, "--pack_header=")) { struct pack_header *hdr; char *c; @@ -1245,6 +1418,15 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix) if (strict) opts.flags |= WRITE_IDX_STRICT; +#ifndef NO_PTHREADS + if (!nr_threads) { + nr_threads = online_cpus(); + /* An experiment showed that more threads does not mean faster */ + if (nr_threads > 3) + nr_threads = 3; + } +#endif + curr_pack = open_pack_file(pack_name); parse_pack_header(); objects = xcalloc(nr_objects + 1, sizeof(struct object_entry)); -- cgit v1.2.1 From b038a61020573b1be6e5fc5b2be4485b63a9cbf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20Th=C3=A1i=20Ng=E1=BB=8Dc=20Duy?= Date: Sun, 6 May 2012 19:31:56 +0700 Subject: index-pack: disable threading if NO_PREAD is defined MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NO_PREAD simulates pread() as a sequence of seek, read, seek in compat/pread.c. The simulation is not thread-safe because another thread could move the file offset away in the middle of pread operation. Do not allow threading in that case. Signed-off-by: Nguyễn Thái Ngọc Duy Signed-off-by: Junio C Hamano --- builtin/index-pack.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'builtin/index-pack.c') diff --git a/builtin/index-pack.c b/builtin/index-pack.c index d4685c50d..807ee56f7 100644 --- a/builtin/index-pack.c +++ b/builtin/index-pack.c @@ -39,6 +39,11 @@ struct base_data { int ofs_first, ofs_last; }; +#if !defined(NO_PTHREADS) && defined(NO_PREAD) +/* NO_PREAD uses compat/pread.c, which is not thread-safe. Disable threading. */ +#define NO_PTHREADS +#endif + struct thread_local { #ifndef NO_PTHREADS pthread_t thread; -- cgit v1.2.1