Some more refactoring of the IPC code. Moved "core" functions into a separate class.

This commit is contained in:
LoRd_MuldeR 2014-01-29 23:59:03 +01:00
parent 34a1cfff51
commit 05026c9c78
5 changed files with 158 additions and 87 deletions

View File

@ -25,8 +25,13 @@
#include <QSharedMemory> #include <QSharedMemory>
#include <QSystemSemaphore> #include <QSystemSemaphore>
#include <QMutexLocker>
#include <QStringList> #include <QStringList>
///////////////////////////////////////////////////////////////////////////////
// Constants
///////////////////////////////////////////////////////////////////////////////
static const size_t MAX_STR_LEN = 1024; static const size_t MAX_STR_LEN = 1024;
static const size_t MAX_ARG_CNT = 3; static const size_t MAX_ARG_CNT = 3;
static const size_t MAX_ENTRIES = 16; static const size_t MAX_ENTRIES = 16;
@ -36,6 +41,7 @@ static const char *s_key_sema_wr = "{B595F47C-0F0F-4B52-9F45-FF524BC5EEBD}";
static const char *s_key_sema_rd = "{D331CBB5-8BCD-4127-9105-E22281130C77}"; static const char *s_key_sema_rd = "{D331CBB5-8BCD-4127-9105-E22281130C77}";
static const wchar_t *EMPTY_STRING = L""; static const wchar_t *EMPTY_STRING = L"";
static unsigned long TIMEOUT_MS = 12000;
typedef struct typedef struct
{ {
@ -53,11 +59,43 @@ x264_ipc_t;
#define IS_FIRST_INSTANCE(X) ((X) > 0) #define IS_FIRST_INSTANCE(X) ((X) > 0)
///////////////////////////////////////////////////////////////////////////////
// IPC Base Class
///////////////////////////////////////////////////////////////////////////////
class IPCCore : public QObject
{
friend class IPC;
friend class IPCReceiveThread;
friend class IPCSendThread;
public:
bool initialize(bool &firstInstance);
inline bool isInitialized(void)
{
return (m_initialized >= 0);
}
protected:
IPCCore(void);
~IPCCore(void);
bool popCommand(int &command, QStringList &args);
bool pushCommand(const int &command, const QStringList *args);
volatile int m_initialized;
QSharedMemory *m_sharedMemory;
QSystemSemaphore *m_semaphoreRd;
QSystemSemaphore *m_semaphoreWr;
};
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// Send Thread // Send Thread
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
IPCSendThread::IPCSendThread(IPC *ipc, const int &command, const QStringList &args) IPCSendThread::IPCSendThread(IPCCore *ipc, const int &command, const QStringList &args)
: :
m_ipc(ipc), m_command(command), m_args(new QStringList(args)) m_ipc(ipc), m_command(command), m_args(new QStringList(args))
{ {
@ -69,7 +107,6 @@ IPCSendThread::~IPCSendThread(void)
X264_DELETE(m_args); X264_DELETE(m_args);
} }
void IPCSendThread::run(void) void IPCSendThread::run(void)
{ {
try try
@ -87,7 +124,7 @@ void IPCSendThread::run(void)
// Receive Thread // Receive Thread
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
IPCReceiveThread::IPCReceiveThread(IPC *ipc) IPCReceiveThread::IPCReceiveThread(IPCCore *ipc)
: :
m_ipc(ipc) m_ipc(ipc)
{ {
@ -112,9 +149,11 @@ void IPCReceiveThread::receiveLoop(void)
{ {
QStringList args; QStringList args;
int command; int command;
if(m_ipc->popCommand(command, args, &m_stopped)) if(m_ipc->popCommand(command, args))
{ {
if((command >= 0) && (command < IPC::IPC_OPCODE_MAX)) if(!m_stopped)
{
if((command >= 0) && (command < IPC_OPCODE_MAX))
{ {
emit receivedCommand(command, args); emit receivedCommand(command, args);
} }
@ -124,37 +163,34 @@ void IPCReceiveThread::receiveLoop(void)
} }
} }
} }
else
{
m_stopped = true;
qWarning("IPC: Receive operation has failed -> stopping thread!");
}
}
} }
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// IPC Class // IPC Core Class
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
IPC::IPC(void) IPCCore::IPCCore(void)
{ {
m_initialized = -1; m_initialized = -1;
m_sharedMemory = NULL; m_sharedMemory = NULL;
m_semaphoreWr = NULL; m_semaphoreWr = NULL;
m_semaphoreRd = NULL; m_semaphoreRd = NULL;
m_recvThread = NULL;
} }
IPC::~IPC(void) IPCCore::~IPCCore(void)
{ {
if(m_recvThread && m_recvThread->isRunning())
{
qWarning("Receive thread still running -> terminating!");
m_recvThread->terminate();
m_recvThread->wait();
}
X264_DELETE(m_recvThread);
X264_DELETE(m_sharedMemory); X264_DELETE(m_sharedMemory);
X264_DELETE(m_semaphoreWr); X264_DELETE(m_semaphoreWr);
X264_DELETE(m_semaphoreRd); X264_DELETE(m_semaphoreRd);
} }
bool IPC::initialize(bool &firstInstance) bool IPCCore::initialize(bool &firstInstance)
{ {
firstInstance = false; firstInstance = false;
@ -198,12 +234,11 @@ bool IPC::initialize(bool &firstInstance)
return false; return false;
} }
bool IPC::pushCommand(const int &command, const QStringList *args) bool IPCCore::pushCommand(const int &command, const QStringList *args)
{ {
if(m_initialized < 0) if(m_initialized < 0)
{ {
qWarning("Error: IPC not initialized yet!"); throw std::runtime_error("IPC not initialized!");
return false;
} }
if(!m_semaphoreWr->acquire()) if(!m_semaphoreWr->acquire())
@ -228,7 +263,7 @@ bool IPC::pushCommand(const int &command, const QStringList *args)
memory->data[memory->posWr].command = command; memory->data[memory->posWr].command = command;
for(int i = 0; i < MAX_ARG_CNT; i++) for(int i = 0; i < MAX_ARG_CNT; i++)
{ {
const wchar_t *current = (i < args->count()) ? ((const wchar_t*)((*args)[i].utf16())) : EMPTY_STRING; const wchar_t *current = (args && (i < args->count())) ? ((const wchar_t*)((*args)[i].utf16())) : EMPTY_STRING;
wcsncpy_s(memory->data[memory->posWr].args[i], MAX_STR_LEN, current, _TRUNCATE); wcsncpy_s(memory->data[memory->posWr].args[i], MAX_STR_LEN, current, _TRUNCATE);
} }
memory->posWr = (memory->posWr + 1) % MAX_ENTRIES; memory->posWr = (memory->posWr + 1) % MAX_ENTRIES;
@ -255,15 +290,14 @@ bool IPC::pushCommand(const int &command, const QStringList *args)
return success; return success;
} }
bool IPC::popCommand(int &command, QStringList &args, volatile bool *abortFlag) bool IPCCore::popCommand(int &command, QStringList &args)
{ {
command = -1; command = -1;
args.clear(); args.clear();
if(m_initialized < 0) if(m_initialized < 0)
{ {
qWarning("Error: IPC not initialized yet!"); throw std::runtime_error("IPC not initialized!");
return false;
} }
if(!m_semaphoreRd->acquire()) if(!m_semaphoreRd->acquire())
@ -296,11 +330,8 @@ bool IPC::popCommand(int &command, QStringList &args, volatile bool *abortFlag)
memory->counter--; memory->counter--;
} }
else else
{
if(!abortFlag)
{ {
qWarning("IPC: Shared memory is empty -> cannot pop string!"); qWarning("IPC: Shared memory is empty -> cannot pop string!");
}
success = false; success = false;
} }
} }
@ -319,18 +350,50 @@ bool IPC::popCommand(int &command, QStringList &args, volatile bool *abortFlag)
return success; return success;
} }
bool IPC::sendAsync(const int &command, const QStringList &args, const int timeout) ///////////////////////////////////////////////////////////////////////////////
// IPC Handler Class
///////////////////////////////////////////////////////////////////////////////
IPC::IPC(void)
:
m_mutex(QMutex::Recursive)
{ {
if(m_initialized < 0) m_ipcCore = new IPCCore();
m_recvThread = NULL;
}
IPC::~IPC(void)
{
if(m_recvThread && m_recvThread->isRunning())
{
qWarning("Receive thread still running -> terminating!");
m_recvThread->terminate();
m_recvThread->wait();
}
X264_DELETE(m_recvThread);
X264_DELETE(m_ipcCore);
}
bool IPC::initialize(bool &firstInstance)
{
QMutexLocker lock(&m_mutex);
return m_ipcCore->initialize(firstInstance);
}
bool IPC::sendAsync(const int &command, const QStringList &args)
{
QMutexLocker lock(&m_mutex);
if(!m_ipcCore->isInitialized())
{ {
qWarning("Error: IPC not initialized yet!"); qWarning("Error: IPC not initialized yet!");
return false; return false;
} }
IPCSendThread sendThread(this, command, args); IPCSendThread sendThread(m_ipcCore, command, args);
sendThread.start(); sendThread.start();
if(!sendThread.wait(timeout)) if(!sendThread.wait(TIMEOUT_MS))
{ {
qWarning("IPC send operation encountered timeout!"); qWarning("IPC send operation encountered timeout!");
sendThread.terminate(); sendThread.terminate();
@ -343,7 +406,9 @@ bool IPC::sendAsync(const int &command, const QStringList &args, const int timeo
bool IPC::startListening(void) bool IPC::startListening(void)
{ {
if(m_initialized < 0) QMutexLocker lock(&m_mutex);
if(!m_ipcCore->isInitialized())
{ {
qWarning("Error: IPC not initialized yet!"); qWarning("Error: IPC not initialized yet!");
return false; return false;
@ -351,7 +416,7 @@ bool IPC::startListening(void)
if(!m_recvThread) if(!m_recvThread)
{ {
m_recvThread = new IPCReceiveThread(this); m_recvThread = new IPCReceiveThread(m_ipcCore);
connect(m_recvThread, SIGNAL(receivedCommand(int,QStringList)), this, SIGNAL(receivedCommand(int,QStringList)), Qt::QueuedConnection); connect(m_recvThread, SIGNAL(receivedCommand(int,QStringList)), this, SIGNAL(receivedCommand(int,QStringList)), Qt::QueuedConnection);
} }
@ -369,7 +434,9 @@ bool IPC::startListening(void)
bool IPC::stopListening(void) bool IPC::stopListening(void)
{ {
if(m_initialized < 0) QMutexLocker lock(&m_mutex);
if(!m_ipcCore->isInitialized())
{ {
qWarning("Error: IPC not initialized yet!"); qWarning("Error: IPC not initialized yet!");
return false; return false;
@ -378,9 +445,9 @@ bool IPC::stopListening(void)
if(m_recvThread && m_recvThread->isRunning()) if(m_recvThread && m_recvThread->isRunning())
{ {
m_recvThread->stop(); m_recvThread->stop();
m_semaphoreRd->release(); sendAsync(IPC_OPCODE_MAX, QStringList()); //push dummy command to unblock thread!
if(!m_recvThread->wait(5000)) if(!m_recvThread->wait(TIMEOUT_MS))
{ {
qWarning("Receive thread seems deadlocked -> terminating!"); qWarning("Receive thread seems deadlocked -> terminating!");
m_recvThread->terminate(); m_recvThread->terminate();
@ -394,3 +461,15 @@ bool IPC::stopListening(void)
return true; return true;
} }
bool IPC::isInitialized(void)
{
QMutexLocker lock(&m_mutex);
return m_ipcCore->isInitialized();
}
bool IPC::isListening(void)
{
QMutexLocker lock(&m_mutex);
return (isInitialized() && m_recvThread && m_recvThread->isRunning());
}

View File

@ -22,33 +22,38 @@
#pragma once #pragma once
#include <QThread> #include <QThread>
#include <QMutex>
class QSharedMemory; class QSharedMemory;
class QStringList; class QStringList;
class QSystemSemaphore; class QSystemSemaphore;
class IPCSendThread;
class IPCCore;
class IPCReceiveThread; class IPCReceiveThread;
class IPCSendThread;
//IPC Commands
static const int IPC_OPCODE_PING = 0;
static const int IPC_OPCODE_ADD_FILE = 1;
static const int IPC_OPCODE_ADD_JOB = 2;
static const int IPC_OPCODE_MAX = 3;
///////////////////////////////////////////////////////////////////////////////
// IPC Handler Class
///////////////////////////////////////////////////////////////////////////////
class IPC : public QObject class IPC : public QObject
{ {
Q_OBJECT Q_OBJECT
friend class IPCReceiveThread;
friend class IPCSendThread;
public: public:
IPC(void); IPC(void);
~IPC(void); ~IPC(void);
static const int IPC_OPCODE_PING = 0;
static const int IPC_OPCODE_ADD_FILE = 1;
static const int IPC_OPCODE_ADD_JOB = 2;
static const int IPC_OPCODE_MAX = 3;
bool initialize(bool &firstInstance); bool initialize(bool &firstInstance);
bool sendAsync(const int &command, const QStringList &args, const int timeout = 5000); bool sendAsync(const int &command, const QStringList &args);
bool isInitialized(void);
inline bool isInitialized(void) { return (m_initialized >= 0); } bool isListening(void);
inline bool isListening(void);
public slots: public slots:
bool startListening(void); bool startListening(void);
@ -58,15 +63,9 @@ signals:
void receivedCommand(const int &command, const QStringList &args); void receivedCommand(const int &command, const QStringList &args);
protected: protected:
bool popCommand(int &command, QStringList &args, volatile bool *abortFlag); IPCCore *m_ipcCore;
bool pushCommand(const int &command, const QStringList *args);
int m_initialized;
QSharedMemory *m_sharedMemory;
QSystemSemaphore *m_semaphoreRd;
QSystemSemaphore *m_semaphoreWr;
IPCReceiveThread *m_recvThread; IPCReceiveThread *m_recvThread;
QMutex m_mutex;
}; };
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
@ -79,16 +78,15 @@ class IPCSendThread : public QThread
friend class IPC; friend class IPC;
protected: protected:
IPCSendThread(IPC *ipc, const int &command, const QStringList &args); IPCSendThread(IPCCore *ipc, const int &command, const QStringList &args);
IPCSendThread::~IPCSendThread(void); IPCSendThread::~IPCSendThread(void);
inline bool result(void) { return m_result; } inline bool result(void) { return m_result; }
virtual void run(void); virtual void run(void);
private: private:
volatile bool m_result; volatile bool m_result;
IPC *const m_ipc; IPCCore *const m_ipc;
const int m_command; const int m_command;
const QStringList *m_args; const QStringList *m_args;
}; };
@ -99,9 +97,9 @@ class IPCReceiveThread : public QThread
friend class IPC; friend class IPC;
protected: protected:
IPCReceiveThread(IPC *ipc); IPCReceiveThread(IPCCore *ipc);
inline void stop(void) { m_stopped = true; }
inline void stop(void) { m_stopped = true; }
virtual void run(void); virtual void run(void);
signals: signals:
@ -110,14 +108,5 @@ signals:
private: private:
void receiveLoop(void); void receiveLoop(void);
volatile bool m_stopped; volatile bool m_stopped;
IPC *const m_ipc; IPCCore *const m_ipc;
}; };
///////////////////////////////////////////////////////////////////////////////
// Inline Functions
///////////////////////////////////////////////////////////////////////////////
inline bool IPC::isListening(void)
{
return (m_recvThread && m_recvThread->isRunning());
}

View File

@ -150,7 +150,7 @@ void handleMultipleInstances(QStringList args, IPC *ipc)
commandSent = true; commandSent = true;
if(!args.isEmpty()) if(!args.isEmpty())
{ {
if(!ipc->sendAsync(IPC::IPC_OPCODE_ADD_FILE, QStringList() << args.takeFirst())) if(!ipc->sendAsync(IPC_OPCODE_ADD_FILE, QStringList() << args.takeFirst()))
{ {
break; break;
} }
@ -170,7 +170,7 @@ void handleMultipleInstances(QStringList args, IPC *ipc)
{ {
lst << args.takeFirst(); lst << args.takeFirst();
} }
if(!ipc->sendAsync(IPC::IPC_OPCODE_ADD_JOB, lst)) if(!ipc->sendAsync(IPC_OPCODE_ADD_JOB, lst))
{ {
break; break;
} }
@ -194,7 +194,7 @@ void handleMultipleInstances(QStringList args, IPC *ipc)
//If no argument has been sent yet, send a ping! //If no argument has been sent yet, send a ping!
if(!commandSent) if(!commandSent)
{ {
ipc->sendAsync(IPC::IPC_OPCODE_PING, QStringList()); ipc->sendAsync(IPC_OPCODE_PING, QStringList());
} }
} }

View File

@ -26,7 +26,7 @@
#define VER_X264_MAJOR 2 #define VER_X264_MAJOR 2
#define VER_X264_MINOR 3 #define VER_X264_MINOR 3
#define VER_X264_PATCH 0 #define VER_X264_PATCH 0
#define VER_X264_BUILD 734 #define VER_X264_BUILD 738
#define VER_X264_MINIMUM_REV 2380 #define VER_X264_MINIMUM_REV 2380
#define VER_X264_CURRENT_API 142 #define VER_X264_CURRENT_API 142

View File

@ -168,6 +168,7 @@ MainWindow::MainWindow(const x264_cpu_t *const cpuFeatures, IPC *ipc)
connect(ui->actionWebJEEB, SIGNAL(triggered()), this, SLOT(showWebLink())); connect(ui->actionWebJEEB, SIGNAL(triggered()), this, SLOT(showWebLink()));
connect(ui->actionWebAvisynth32, SIGNAL(triggered()), this, SLOT(showWebLink())); connect(ui->actionWebAvisynth32, SIGNAL(triggered()), this, SLOT(showWebLink()));
connect(ui->actionWebAvisynth64, SIGNAL(triggered()), this, SLOT(showWebLink())); connect(ui->actionWebAvisynth64, SIGNAL(triggered()), this, SLOT(showWebLink()));
connect(ui->actionWebAvisynthPlus, SIGNAL(triggered()), this, SLOT(showWebLink()));
connect(ui->actionWebVapourSynth, SIGNAL(triggered()), this, SLOT(showWebLink())); connect(ui->actionWebVapourSynth, SIGNAL(triggered()), this, SLOT(showWebLink()));
connect(ui->actionWebVapourSynthDocs, SIGNAL(triggered()), this, SLOT(showWebLink())); connect(ui->actionWebVapourSynthDocs, SIGNAL(triggered()), this, SLOT(showWebLink()));
connect(ui->actionWebWiki, SIGNAL(triggered()), this, SLOT(showWebLink())); connect(ui->actionWebWiki, SIGNAL(triggered()), this, SLOT(showWebLink()));
@ -573,6 +574,7 @@ void MainWindow::showWebLink(void)
if(QObject::sender() == ui->actionWebJEEB) QDesktopServices::openUrl(QUrl("http://x264.fushizen.eu/")); if(QObject::sender() == ui->actionWebJEEB) QDesktopServices::openUrl(QUrl("http://x264.fushizen.eu/"));
if(QObject::sender() == ui->actionWebAvisynth32) QDesktopServices::openUrl(QUrl("http://sourceforge.net/projects/avisynth2/files/AviSynth%202.5/")); if(QObject::sender() == ui->actionWebAvisynth32) QDesktopServices::openUrl(QUrl("http://sourceforge.net/projects/avisynth2/files/AviSynth%202.5/"));
if(QObject::sender() == ui->actionWebAvisynth64) QDesktopServices::openUrl(QUrl("http://code.google.com/p/avisynth64/downloads/list")); if(QObject::sender() == ui->actionWebAvisynth64) QDesktopServices::openUrl(QUrl("http://code.google.com/p/avisynth64/downloads/list"));
if(QObject::sender() == ui->actionWebAvisynthPlus) QDesktopServices::openUrl(QUrl("http://www.avs-plus.net/"));
if(QObject::sender() == ui->actionWebVapourSynth) QDesktopServices::openUrl(QUrl("http://www.vapoursynth.com/")); if(QObject::sender() == ui->actionWebVapourSynth) QDesktopServices::openUrl(QUrl("http://www.vapoursynth.com/"));
if(QObject::sender() == ui->actionWebVapourSynthDocs) QDesktopServices::openUrl(QUrl("http://www.vapoursynth.com/doc/")); if(QObject::sender() == ui->actionWebVapourSynthDocs) QDesktopServices::openUrl(QUrl("http://www.vapoursynth.com/doc/"));
if(QObject::sender() == ui->actionWebWiki) QDesktopServices::openUrl(QUrl("http://mewiki.project357.com/wiki/X264_Settings")); if(QObject::sender() == ui->actionWebWiki) QDesktopServices::openUrl(QUrl("http://mewiki.project357.com/wiki/X264_Settings"));
@ -1011,11 +1013,11 @@ void MainWindow::handleCommand(const int &command, const QStringList &args)
switch(command) switch(command)
{ {
case IPC::IPC_OPCODE_PING: case IPC_OPCODE_PING:
qDebug("Received a PING request from another instance!"); qDebug("Received a PING request from another instance!");
x264_blink_window(this, 5, 125); x264_blink_window(this, 5, 125);
break; break;
case IPC::IPC_OPCODE_ADD_FILE: case IPC_OPCODE_ADD_FILE:
if(!args.isEmpty()) if(!args.isEmpty())
{ {
if(QFileInfo(args[0]).exists() && QFileInfo(args[0]).isFile()) if(QFileInfo(args[0]).exists() && QFileInfo(args[0]).isFile())
@ -1033,7 +1035,7 @@ void MainWindow::handleCommand(const int &command, const QStringList &args)
} }
} }
break; break;
case IPC::IPC_OPCODE_ADD_JOB: case IPC_OPCODE_ADD_JOB:
if(args.size() >= 3) if(args.size() >= 3)
{ {
if(QFileInfo(args[0]).exists() && QFileInfo(args[0]).isFile()) if(QFileInfo(args[0]).exists() && QFileInfo(args[0]).isFile())
@ -1121,6 +1123,7 @@ void MainWindow::closeEvent(QCloseEvent *e)
{ {
if((m_status != STATUS_IDLE) && (m_status != STATUS_EXITTING)) if((m_status != STATUS_IDLE) && (m_status != STATUS_EXITTING))
{ {
e->ignore();
qWarning("Cannot close window at this time!"); qWarning("Cannot close window at this time!");
return; return;
} }
@ -1504,7 +1507,7 @@ bool MainWindow::parseCommandLineArgs(void)
bCommandAccepted = true; bCommandAccepted = true;
if(!args.isEmpty()) if(!args.isEmpty())
{ {
handleCommand(IPC::IPC_OPCODE_ADD_FILE, QStringList() << args.takeFirst()); handleCommand(IPC_OPCODE_ADD_FILE, QStringList() << args.takeFirst());
} }
else else
{ {
@ -1517,7 +1520,7 @@ bool MainWindow::parseCommandLineArgs(void)
if(args.size() >= 3) if(args.size() >= 3)
{ {
const QStringList list = args.mid(0, 3); const QStringList list = args.mid(0, 3);
handleCommand(IPC::IPC_OPCODE_ADD_JOB, list); handleCommand(IPC_OPCODE_ADD_JOB, list);
args.erase(args.begin(), args.begin() + 3); args.erase(args.begin(), args.begin() + 3);
} }
else else