// threaded benchmark
#include <complex.h>
#include <math.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <liquid/liquid.h>

// print usage/help message
void usage()
{
    printf("%s [options]\n", __FILE__);
    printf(" -h           : print help\n");
    printf(" -v           : verobse\n");
    printf(" -q           : quiet\n");
    printf(" -n <threads> : number of threads,  default:   4\n");
    printf(" -b <length>  : buffer length,      default: 256\n");
    printf(" -f <length>  : filter length,      default:  57\n");
    printf(" -t <time>    : run time [seconds], default:   3\n");
    printf(" -m <mode>    : mode {threads, buffer, filter}\n");
}

// processor object
typedef struct process_s * process;
struct process_s {
    unsigned int    id;             // process id
    unsigned int    buf_len;        // buffer length
    float complex * buf_0;          // ping pong buffer 0
    float complex * buf_1;          // ping pong buffer 1
    int             is_running;     // is object running?
    int             verbose;        // is object verbose?
    unsigned int    h_len;          // filter length
    firfilt_crcf    filter;         // filter object
    unsigned int    num_buffers;    // number of buffers processed
    pthread_t       thread;         // threading object
};

// create process
process process_create(unsigned int _id,
                       unsigned int _buf_len,
                       unsigned int _h_len);

// destroy process
void process_destroy(process _q);

// verbose/quiet mode
void process_verbose(process _q) { _q->verbose = 1; }
void process_quiet  (process _q) { _q->verbose = 0; }

// get number of samples processed
unsigned int process_get_num_samples(process _q);

// start/stop threading
void process_start(process _q);
void process_stop (process _q);

// run worker function
int process_worker(void * _q);

// run benchmark
unsigned long int benchmark(unsigned int _num_threads,
                            unsigned int _buf_len,
                            unsigned int _filter_len,
                            float        _runtime,
                            int          _verbose);

int main(int argc, char*argv[])
{
    // parameters
    unsigned int num_threads =   4;     // number of threads
    unsigned int buf_len     = 256;     // buffer length
    unsigned int h_len       =  57;     // filter length
    float        runtime     =   3.0f;  // number of seconds to run
    int          verbose     =   0;     // verbose output?
    enum {
        MODE_THREADS=0,
        MODE_BUFFER,
        MODE_FILTER} mode =0;

    // read command-line options
    int dopt;
    while ((dopt = getopt(argc,argv,"hvqn:b:f:t:m:")) != EOF) {
        switch (dopt) {
        case 'h':   usage();                    return 0;
        case 'v':   verbose     = 1;            break;
        case 'q':   verbose     = 0;            break;
        case 'n':   num_threads = atoi(optarg); break;
        case 'b':   buf_len     = atoi(optarg); break;
        case 'f':   h_len       = atoi(optarg); break;
        case 't':   runtime     = atof(optarg); break;
        case 'm':
            if      (strcmp(optarg,"threads")==0) mode = MODE_THREADS;
            else if (strcmp(optarg,"buffer" )==0) mode = MODE_BUFFER;
            else if (strcmp(optarg,"filter" )==0) mode = MODE_FILTER;
            else {
                fprintf(stderr,"error, %s, invalid mode '%s'\n", __FILE__, optarg);
                return -1;
            }
            break;
        default:
            exit(1);
        }
    }

    // validate input
    if (num_threads < 1 || num_threads > 1000) {
        fprintf(stderr,"error: %s, number of threads must be in [1,1000]\n", __FILE__);
        exit(-1);
    } else if (runtime <= 0) {
        fprintf(stderr,"error: %s, runtime must be greater than zero\n", __FILE__);
        exit(-1);
    } else if (h_len < 4 || h_len > 1000) {
        fprintf(stderr,"error: %s, filter length must be within [4,1000]\n", __FILE__);
        exit(-1);
    }

    unsigned int vmin=0, vmax=0;
    switch (mode) {
    case MODE_THREADS: vmin=1; vmax=  16; break;
    case MODE_BUFFER:  vmin=2; vmax=8192; break;
    case MODE_FILTER:  vmin=4; vmax= 500; break;
    default:;
    }

    // run benchmarks vs. number of threads
    unsigned int v = vmin;
    printf("# auto-generated file\n");
    printf("#");
    int i;
    for (i=0; i<argc; i++)
        printf(" %s", argv[i]);
    printf("\n");
    printf("# %12s %12s %12s %12s %20s %12s\n",
        "threads", "buffer len", "filter len", "runtime [s]", "samples", "samples/s");
    while (v <= vmax) {
        switch (mode) {
        case MODE_THREADS: num_threads = v; v++;               break;
        case MODE_BUFFER:  buf_len     = v; v = ceilf(1.01*v); break;
        case MODE_FILTER:  h_len       = v; v += (v<100)?2:4;  break;
        default:;
        }
        unsigned long int num_total_samples =
            benchmark(num_threads, buf_len, h_len, runtime, verbose);

        float speed = (float)num_total_samples / runtime;

        printf("  %12u %12u %12u %12.4f %20lu %12.4e\n",
            num_threads, buf_len, h_len, runtime, num_total_samples, speed);
    }
    return 0;
}

// create process
process process_create(unsigned int _id,
                       unsigned int _buf_len,
                       unsigned int _h_len)
{
    // create object
    process q = (process) malloc(sizeof(struct process_s));

    q->id      = _id;
    q->buf_len = _buf_len;
    q->h_len   = _h_len;

    q->buf_0 = (float complex*) malloc(q->buf_len*sizeof(float complex));
    q->buf_1 = (float complex*) malloc(q->buf_len*sizeof(float complex));

    q->filter = firfilt_crcf_create_kaiser(q->h_len, 0.25f, 60.0f, 0.0f);
    firfilt_crcf_set_scale(q->filter, 0.5f);
    
    q->is_running  = 0;
    q->verbose     = 1;
    q->num_buffers = 0;

    // fill input buffer with random samples
    unsigned int i;
    for (i=0; i<q->buf_len; i++)
        q->buf_0[i] = randnf() + randnf()*_Complex_I;

    return q;
}

//
void process_destroy(process _q)
{
    process_stop(_q);

    firfilt_crcf_destroy(_q->filter);
    free(_q->buf_0);
    free(_q->buf_1);
    free(_q);
}

// get number of samples processed
unsigned int process_get_num_samples(process _q)
{
    return _q->num_buffers * _q->buf_len;
}

// start/stop threading
void process_start(process _q)
{
    if (_q->is_running)
        return;

    pthread_attr_t attributes;
    pthread_attr_init(&attributes);
    pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE);

    _q->is_running = 1;
    pthread_create(&_q->thread, &attributes, (void*)process_worker, (void*)_q);

    pthread_attr_destroy(&attributes);
    if (_q->verbose)
        printf("[%3u] started\n", _q->id);
}

void process_stop(process _q)
{
    if (!_q->is_running)
        return;

    _q->is_running = 0;
    void * status;
    if ( pthread_join(_q->thread, &status) && _q->verbose )
        fprintf(stderr,"[%3u] : error from pthread_join()\n", _q->id);
    if (_q->verbose)
        printf("[%3u] joined\n", _q->id);
}

int process_worker(void * _userdata)
{
    process _q = (process) _userdata;
    while (_q->is_running) {
        // get filter source and destination buffers
        float complex * buf_src = _q->num_buffers & 1 ? _q->buf_1 : _q->buf_0;
        float complex * buf_dst = _q->num_buffers & 1 ? _q->buf_0 : _q->buf_1;

        // process results in a block
        firfilt_crcf_execute_block(_q->filter, buf_src, _q->buf_len, buf_dst);

        // update state
        _q->num_buffers++;
    }
    pthread_exit(NULL);
}

// run benchmark
unsigned long int benchmark(unsigned int _num_threads,
                            unsigned int _buf_len,
                            unsigned int _filter_len,
                            float        _runtime,
                            int          _verbose)
{
    unsigned int i;

    // create processes
    process threads[_num_threads];
    for (i=0; i<_num_threads; i++) {
        threads[i] = process_create(i, _buf_len, _filter_len);
        if (_verbose)
            process_verbose(threads[i]);
        else
            process_quiet(threads[i]);
    }

    // start processes
    for (i=0; i<_num_threads; i++)
        process_start(threads[i]);

    // sleep
    unsigned int num_iterations = roundf(10*_runtime);
    _runtime = (float)num_iterations / 10.0f;
    for (i=0; i<num_iterations; i++) {
        usleep(100000);
        if (_verbose) {
            printf("  working... runtime = %6.1fs, processed about %12.3f k samples/second\r",
                    (float)i/10.0f,
                    process_get_num_samples(threads[0]) / ((float)i/10.0f) * _num_threads * 1e-3f);
            fflush(stdout);
        }
    }
    if (_verbose)
        printf("\n");

    // stop processes
    for (i=0; i<_num_threads; i++)
        process_stop(threads[i]);

    // accumulate results
    unsigned long int num_total_samples = 0;
    for (i=0; i<_num_threads; i++) {
        unsigned int num_samples = process_get_num_samples(threads[i]);
        if (_verbose)
            printf("thread %2u processed %6u samples\n", i, num_samples);
        num_total_samples += num_samples;
    }

    // clean up
    for (i=0; i<_num_threads; i++)
        process_destroy(threads[i]);

    // finish benchmark
    return num_total_samples;
}

