Commits

Yuriy Syrovetskiy committed 5ff8329

[+] it works!

  • Participants

Comments (0)

Files changed (5)

File CMakeLists.txt

+cmake_minimum_required(VERSION 2.6)
+
+
+add_definitions( "-Wall -Wextra" )
+add_definitions( "-std=c++0x" )
+
+
+################################################################################
+## Libraries ###################################################################
+
+find_package( Qt4
+    COMPONENTS
+        QtCore
+    REQUIRED
+)
+include( ${QT_USE_FILE} )
+
+
+################################################################################
+## test ########################################################################
+
+add_executable( test
+    test.cpp
+)
+
+target_link_libraries( test
+    ${QT_LIBRARIES}
+)
+#ifndef advq_Future_h
+#define advq_Future_h
+
+
+#include <QtCore>
+
+
+/// \interface
+template <class T>
+class IterableFuture
+{
+
+public:
+
+    virtual
+    bool waitForReadyResult() = 0;
+
+    virtual
+    T takeReadyResult() = 0;
+
+}; // class IterableFuture
+
+
+template <class T>
+class FutureIterator
+{
+
+    QSharedPointer<IterableFuture<T>> future;
+
+public:
+
+    FutureIterator(QSharedPointer<IterableFuture<T>> future):
+        future(future)
+    {
+    }
+
+    /// blocking!
+    bool hasNext()
+    {
+        return future->waitForReadyResult();
+    }
+
+    /// blocking!
+    T next()
+    {
+        return future->takeReadyResult();
+    }
+
+}; // class FutureIterator
+
+
+#endif // advq_Future_h

File ThreadPool.h

+#ifndef advq_ThreadPool_h
+#define advq_ThreadPool_h
+
+
+#include <QtCore>
+
+#include "Future.h"
+
+
+class ThreadPool:
+    public QThreadPool
+{
+
+    template <typename SourceIterator, typename ArgumentType, typename ResultType>
+    struct MapFuture:
+        public IterableFuture<ResultType>
+    {
+
+        typedef ResultType (*MapFunction)(ArgumentType);
+
+        struct Worker:
+            public QRunnable
+        {
+
+            typedef ResultType (*MapFunction)(ArgumentType);
+
+            MapFunction f;
+            ArgumentType arg;
+            ResultType result;
+            QMutex resultReadyMutex;
+            bool resultReady;
+
+            Worker(MapFunction f, ArgumentType arg):
+                f(f),
+                arg(arg),
+                resultReady(false)
+            {
+                setAutoDelete(false);
+            }
+
+            void run()
+            {
+                QMutexLocker locker(&resultReadyMutex);
+                result = f(arg);
+                resultReady = true;
+            }
+
+            void waitForReadyResult()
+            {
+                forever
+                {
+                    if ( resultReady )
+                        return;
+                    QMutexLocker locker(&resultReadyMutex); // wait for result
+                }
+            }
+        };
+
+        QThreadPool & pool;
+
+        SourceIterator sourceIterator;
+        MapFunction f;
+        QQueue<Worker*> workers;
+
+        MapFuture(QThreadPool & pool, SourceIterator sourceIterator, MapFunction f):
+            pool(pool),
+            sourceIterator(sourceIterator),
+            f(f)
+        {
+            for ( int i = 0; i < pool.maxThreadCount(); ++i )
+            {
+                if ( sourceIterator.hasNext() )
+                {
+                    Worker * worker = new Worker(f, sourceIterator.next());
+                    pool.start(worker);
+                    workers.enqueue(worker);
+                }
+                else
+                {
+                    break;
+                }
+            }
+        }
+
+        /// blocking!
+        /// \returns if there is some results
+        virtual
+        bool waitForReadyResult()
+        {
+            if ( workers.isEmpty() )
+                return false;
+            Worker & firstWorker = *workers.first();
+            firstWorker.waitForReadyResult();
+            return true;
+        }
+
+        /// blocking!
+        virtual
+        ResultType takeReadyResult()
+        {
+            Q_ASSERT ( not workers.isEmpty() );
+
+            Worker * firstWorker = workers.dequeue();
+
+            // wait for result ready
+            firstWorker->waitForReadyResult();
+
+            // take the result
+            ResultType result = firstWorker->result;
+
+            // run another worker
+            delete firstWorker;
+            if ( sourceIterator.hasNext() )
+            {
+                Worker * worker = new Worker(f, sourceIterator.next());
+                pool.start(worker);
+                workers.enqueue(worker);
+            }
+
+            return result;
+        }
+
+    };
+
+public:
+
+    template <typename SourceIterator, typename ArgumentType, typename ResultType>
+    FutureIterator<ResultType> map(SourceIterator sourceIterator, ResultType (f)(ArgumentType))
+    {
+        QSharedPointer<IterableFuture<ResultType>> future(
+            new MapFuture<SourceIterator, ArgumentType, ResultType>(
+                *this, sourceIterator, f
+            )
+        );
+        return FutureIterator<ResultType>(future);
+    }
+
+}; // class ThreadPool
+
+
+#endif // advq_ThreadPool_h
+#include <QtCore>
+
+#include "Future.h"
+#include "ThreadPool.h"
+#include "util.h"
+
+
+const uint N = 10000;
+const QString Salt = "5417";
+
+
+QByteArray md5_N_times(QString line)
+{
+    QByteArray data = line.toUtf8();
+    for ( uint i = 0; i < N; ++i )
+        data = QCryptographicHash::hash(Salt.toUtf8() + data, QCryptographicHash::Md5);
+    return data;
+}
+
+
+int main()
+{
+    QTextStream input(stdin);
+    QTextStream output(stdout);
+
+    /*
+
+        // Sequential
+
+        while ( not input.atEnd() )
+            output << md5_N_times(input.readLine()).toHex() << endl;
+
+    /*/
+
+        // Parallel
+
+        ThreadPool pool;
+
+        FutureIterator<QByteArray> results = pool.map(
+            QTextStreamIterator(input),
+            md5_N_times
+        );
+
+        while ( results.hasNext() )
+        {
+            output << results.next().toHex() << endl;
+        }
+
+    //*/
+
+    return 0;
+}
+#ifndef advq_util_h
+#define advq_util_h
+
+
+#include <QtCore>
+
+
+inline
+QStringList sorted(const QStringList & orig)
+{
+    QStringList _r = orig;
+    _r.sort();
+    return _r;
+}
+
+
+class QTextStreamIterator
+{
+
+    QTextStream & ts;
+
+public:
+
+    QTextStreamIterator(QTextStream & ts):
+        ts(ts)
+    {
+    }
+
+    bool hasNext() const
+    {
+        return not ts.atEnd();
+    }
+
+    QString next()
+    {
+        return ts.readLine();
+    }
+
+}; // class QTextStreamIterator
+
+
+#endif // advq_util_h