fupermod: Functional Performance Models of heterogeneous processors

Jacobi method for solving a system of liner equations on a heterogeneous cluster

#include "config.h"
#include "fupermod/fupermod.h"
#include "libjacobi.h"
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdlib.h>
#include <string.h>

#define ZERO 1e-13

// todo: Dave, if you like, you can use this code here

typedef enum fupermod_algorithm {
    partial,
    full,
    constant1,
    constant2,
    homogeneous,
    manual
} fupermod_algorithm;

void fupermod_balance_init(MPI_Comm comm, int root, fupermod_algorithm algorithm, int D, char* info);

void fupermod_balance_finalise(int size, fupermod_model** models);

// todo: Dave, all these variables can easily be created by user in their application.
// For example, user is able to count the number of iterations himself.

FILE* balance_times = NULL;
int iter;
int balance;
int continue_balancing = 1;
double threshold;
fupermod_algorithm algorithm = -1;
int D;
struct timeval total_start;

void fupermod_balance_init(MPI_Comm comm, int root, fupermod_algorithm _algorithm, int _D, char* info) {
    algorithm = _algorithm;
    D = _D;
    int rank;
    MPI_Comm_rank(comm, &rank);
    int size;
    MPI_Comm_size(comm, &size);
    char hostname[MPI_MAX_PROCESSOR_NAME];
    int len;
    MPI_Get_processor_name(hostname, &len);
    char* hostnames = (rank == root ? (char*)malloc(sizeof(char) * size * MPI_MAX_PROCESSOR_NAME) : NULL);
    MPI_Gather(hostname, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, hostnames, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, root, comm);
    if (rank == root) {
        char filename[1024];
        char sizes[16];
        if (size <= 9)
            sprintf(sizes, "0%d", size);
        else
            sprintf(sizes, "%d", size);
        sprintf(filename, "balance_times.%d.%d.%s.dat", algorithm, D, sizes);
        balance_times = fopen(filename, "w");
        time_t now = time(NULL);
        struct tm *tm_now;
        char datetime[25];
        tm_now = localtime(&now);
        strftime(datetime, sizeof datetime, "%d-%m-%y %H:%M", tm_now);

        iter = 0;
        balance = 0;
        fprintf(balance_times, "#Fupermod Balance iteration times. Num machines:%d\tAt:%s\n", size, datetime);
        fprintf(balance_times, "# %s \n", info);
        fprintf(balance_times, "#host\t");
        int i;
        for (i = 0; i < size; i++) {
            fprintf(balance_times, "        \t%12s\t", (hostnames + i * MPI_MAX_PROCESSOR_NAME));
        }
        fprintf(balance_times, "\n");
        fprintf(balance_times, "#itt \t");
        for (i = 0; i < size; i++) {
            fprintf(balance_times, "dist%4d\ttime%3d     \t", i, i);
        }
        fprintf(balance_times, "\n");
        free(hostnames); hostnames = NULL;
        gettimeofday(&total_start, NULL);
    }
}

void fupermod_balance_finalise(int size, fupermod_model** models) {
    struct timeval end;
    gettimeofday(&end, NULL);
    double time = ((end.tv_sec + end.tv_usec / 1000000.) - (total_start.tv_sec + total_start.tv_usec / 1000000.));
    fprintf(balance_times, "#total time: %le\n", time);
    fclose(balance_times);
    char filename[1024];
    int i;
    for (i = 0; i < size; i++){
        sprintf(filename, "datapoints.%d.%d.%d.dat", algorithm, D, i);
        FILE* datapoints = fopen(filename, "w");
        fprintf(datapoints, "# Datapoints for rank:%d, algorithm:%d problem size:%d\n", i, algorithm, D);
        fprintf(datapoints, "#d\ttime\tspeed\n");
        int j, d;
        double t;
        for (j = 0; j < models[i]->data->count; j++){
            d = models[i]->data->points[j].d;
            t = models[i]->data->points[j].t;
            fprintf(datapoints, "%d\t%le\t%le\n", d, t, models[i]->complexity(d) / t);
        }
        fclose(datapoints);
    }
}

// todo: Dave, all user printouts should be in the user application!

/*
#if PRINT_MODELS
        if (balance){
            char filename[256];
            sprintf(filename, "models.%d.dat", iter);
            FILE* modelf = fopen(filename, "w");
            fupermod_print_models(modelf, size, models, D);
            fclose(modelf);

            sprintf(filename, "dist_actual.%d.dat", iter);
            FILE* dist = fopen(filename, "w");
            fprintf(dist, "#d\ts\n");
            for (i = 0; i < size; i++) {
                fprintf(dist, "%d\t%le\n", distr->d[i],  models[i]->kernel->complexity(distr->d[i]) / times[i]);

            }
            fclose(dist);
        }
#endif
#if PRINT_MODELS
            {
                char filename[256];
                sprintf(filename, "dist_predicted.%d.dat", iter);
                FILE* dist = fopen(filename, "w");
                fprintf(dist, "#d\ts\n");
                for (i = 0; i < size; i++) {
                    fprintf(dist, "%d\t%le\n", distr->d[i], distr->s[i]);
                }
                fclose(dist);
            }
#endif
 */

int main(int argc, char **argv){
    MPI_Init(&argc,&argv);
    MPI_Comm comm = MPI_COMM_WORLD;
    int rank;
    MPI_Comm_rank(comm, &rank);
    int size;
    MPI_Comm_size(comm, &size);
    int root = 0;

// if (!rank) getc(stdin); MPI_Barrier(comm);

    // Options
    int seed = 2;
    int D = 100;
    int max_itt = 20;
    fupermod_algorithm algorithm = 0;
    double threshold = 0.01;
    int verbose = 0;
    int exit = 0;

    if(rank == root){
        int ret;
        while ((ret = getopt(argc, argv, "hD:i:a:vt:")) >=0) {
            switch(ret) {
                case 'h': // help
                    printf("jacobi.c help\n"
                            ""
                            "Description: Jacobi method for solving a system of liner "
                            "equations on a heterogeneous cluster. \n"
                            "\n"
                            "Usage:\n"
                            "-D I Size of matrix\n"
                            "-i I Number of Iterations\n"
                            "-a I   Balancing algorithm. (default: %d)\n"
                            "\t\t0: Partial functional performance model\n"
                            "\t\t1: Full functional performance model\n"
                            "\t\t2: Constant performance model 1 (small benchmark)\n"
                            "\t\t3: Constant performance model 2 (homogeneous benchmark)\n"
                            "\t\t4: Homogeneous distribution\n"
                            "\t\t5: Manual distribution\n"
                            "-t D Relative threshold between 0 and 1, above which balancing is done\n"
                            "-v I Verbose mode\n"
                            , algorithm);
                    exit = 1;
                    break;
                case 'D': // size on matrix
                    D = atoi(optarg);
                    break;
                case 'i': //Number of iterations
                    max_itt = atoi(optarg);
                    break;
                case 'a': // type: homogeneous, heterogeneous
                    algorithm = atoi(optarg);
                    break;
                case 'v':
                    verbose=1;
                    break;
                case 't': // time threshold
                    threshold = atof(optarg);
                    break;
            }
        }
    }
    MPI_Bcast(&exit, 1, MPI_INT, root, comm);
    if (exit) {
        MPI_Finalize();
        return 0;
    }

    MPI_Bcast(&D, 1, MPI_INT, root, comm);
    MPI_Bcast(&max_itt, 1, MPI_INT, root, comm);
    MPI_Bcast(&algorithm, 1, MPI_INT, root, comm);
    MPI_Bcast(&threshold, 1, MPI_DOUBLE, root, comm);

    // TODO: If n < size, computation should still work.
    if (D < size){
        fprintf(stderr,"Error!\nMatrix smaller then number of processors\n");
        MPI_Finalize();
        return FUPERMOD_FAIL;
    }

    fupermod_dist* distr = ((rank == root) ? fupermod_dist_alloc(size, D) : NULL);

    int* d = NULL; // ((rank == root) ? distr->d : (int *)malloc(size * sizeof(int))); todo: what to do here?
    int* old_d = (int *)malloc(size * sizeof(int));
    int* offset = (int *)malloc(size * sizeof(int));
    jacobi_set_D(D);

    fupermod_data** data = (fupermod_data**)malloc(sizeof(fupermod_data*) * size);
    fupermod_model** models = (fupermod_model**)malloc(sizeof(fupermod_model*) * size);
    int i;
    for (i = 0; i < size; i++) {
        data[i] = fupermod_data_alloc();
        models[i] = fupermod_model_interp_alloc(data[i], jacobi_complexity, D, 0);
    }

    double *a, *b, *x;

    //  double diff = 1e10;
    // int balance = 0;
    //  int balance_work = 0; // A measure of how much balance passing done
    char info[1024];
    sprintf(info, "Jacobi. D:%d threshold:%le iterations:%d algorithm:%d", D, threshold, max_itt, algorithm);
    fupermod_balance_init(comm, root, algorithm, D, info);

    int itt;
    for (itt = 0; itt < max_itt; itt++){ // Main iteration loop
        if (rank == root)
            fprintf(stderr, "P%d starting itt: %d\n", rank, itt);
        //  double diff_1=diff;
        double diff = 0.0;

        MPI_Bcast(d, size, MPI_INT, root, comm);
        int i;
        for (i=0; i<size; i++) {
            offset[i] = i == 0 ? 0 : (offset[i-1] + d[i-1]);
        }
        if (itt == 0) {
            // Initialise a, b and x.
            jacobi_fill_matrix(&a, &b, &x, D, d[rank], offset[rank], seed, rank);
            MPI_Bcast(b, D, MPI_DOUBLE, root, comm);
        } else {
            a = jacobi_redistribute(comm, a, D, old_d, d);
        }
        memcpy(old_d, d, sizeof(int) * size);

        fupermod_dynamic balancer = {
                fupermod_partition_multiroot,
                size,
                models,
                distr
        };
        struct timeval start;
        gettimeofday(&start, NULL);
        double* new_x = jacobi_compute(a, b, x, D, d[rank], offset[rank], &diff);
        fupermod_balancer_iterate(&balancer, comm, root, start);

        if(verbose) printf("P%d x before: %f %f %f %f\n",rank, x[0], x[1], x[2], x[3]);
        MPI_Allgatherv(new_x, old_d[rank], MPI_DOUBLE, x, old_d, offset, MPI_DOUBLE, comm);
        if(verbose) printf("P%d x after: %f %f %f %f\n",rank, x[0], x[1], x[2], x[3]);

        free(new_x);
        if (rank == root && verbose > 0) {  
            fprintf(stderr, "dist: ");
            for (i=0; i<size; i++) {
                fprintf(stderr, "%d ", d[i]);
            }
            fprintf(stderr, "\n");
        }
        //
        // if (rank == root) {
        // FILE *outfile = fopen("jacobi_balance.dat", "w");
        // }
        //          if (itt == 0){ // Print header
        //              fprintf(outfile,"# itt ");
        //              for(i=0; i<size; i++)
        //                  fprintf(outfile,"count[%d], AS[%d], Time[%d], ", i, i, i);
        //              fprintf(outfile,"\n");
        //          }
        //          if (!balance){ // ignore time on first iteration after balance
        //              fprintf(outfile,"%d ", itt);
        //              for(i=0; i<size; i++)
        //                  fprintf(outfile,"%d %f %f ", count[i], AS[i], times[i]);
        //              fprintf(outfile,"\n");
        //          }
        //  }
        //  // BEGIN LOAD BALANCING
        //  if (rank == 0){ // do only on root proc
        //      //rank 0
        //      double max,min;
        //      max = min = times[0];
        //      for (i=1; i<size; i++) {
        //          if (times[i] > max)
        //              max = times[i];
        //          if (times[i]<min)
        //              min = times[i];
        //      }
        //      if (!balance && type == heterogeneous){ // only balance if didn't in previous step (avoid timing error due to reading from swap)
        //          if ((max - min)/max > threshold){
        //              fprintf(stderr,"min:%f max:%f diff:%f threshold:%f\n",min,max,max-min,threshold);
        //              balance = 1;
        //          }
        //      } else
        //          balance = 0;
        //  }
        //
        //  MPI_Bcast(RS, size, MPI_DOUBLE, 0, comm);
        //  MPI_Bcast(&balance, 1, MPI_INT, 0, comm);
        //  int j;
        //  // Unbalanced and Heterogeneous - do load balancing.
        //  if (balance && type == heterogeneous){
        //      if (verbose){
        //          printf("\nP%d  A before balance:\n",rank);
        //          for (i = 0; i < count[rank]; i++) {
        //              for (j = 0; j < D; j++)
        //                  printf(" %4.5f ", a[i * D + j]);
        //              printf("\n");
        //          }
        //      }
        // 
        //      a = do_balancing(a, D, count, offset, RS, &balance_work);
        //      if (verbose){
        //          printf("\nP%d  A after balance:\n",rank);
        //          for (i = 0; i < count[rank]; i++) {
        //              for (j = 0; j < D; j++)
        //                  printf(" %4.5f ", a[i * D + j]);
        //              printf("\n");
        //          }
        //      }
        //  }
        // 
        //  double tmp=0.0;
        //  MPI_Allreduce(&diff, &tmp, 1, MPI_DOUBLE, MPI_SUM, comm);
        //  diff=tmp;
        //  if (rank == 0)
        //      fprintf(stderr,"Diff %e %e\n", diff, diff_1);
        // 
        //  if (itt>D && diff>diff_1 && diff>ZERO) {
        //      if (rank == 0)
        //          fprintf(stderr,"Error: Jacobi doesn't converge, use a different seed (-r) %e %e\n", diff, diff_1);
        //      //MPI_Finalize();
        //      //exit(1);
        //  }
        // 
    } // end main loop

    if (rank == root) 
        fupermod_balance_finalise(size, models);
    // 
    //  if (rank == 0 && verbose){
    //      int i;
    //      fprintf(stderr,"Final x: ");
    //      for (i=0;i<D;i++)
    //          fprintf(stderr, "%5.3f ",x[i]);
    //      fprintf(stderr, "\n");
    //  }
    // 
    //  fflush(stdout);
    jacobi_final_test(comm, a, x, b, D, old_d, offset);
    // 
    //  int balance_work_sum;
    //  MPI_Reduce (&balance_work, &balance_work_sum, 1, MPI_INT, MPI_SUM, 0, comm);
    //  if (rank == 0){
    //      fprintf(stderr, "P%d: Completed successfully with %d iterations and %d balances\n", rank, itt, balance_work_sum);fflush(stdout);
    //  }
    //      if(rank == root){
    //          fclose(outfile);
    //      }
    fupermod_dist_free(distr);
    if (rank != root) {
        free(d); d = NULL;
    }
    free(x);
    free(b);
    free(a);

    MPI_Finalize();
    return FUPERMOD_SUCCESS;
}