Commits

Mirko Chialastri committed f8664a7

Creati files per ogni Job

  • Participants
  • Parent commits 9348953

Comments (0)

Files changed (10)

File Libs/APIRequest.php

+<?php
+
+Class APIRequestTimeoutException 	extends Exception {}
+Class APIRequestNotFoundedException extends Exception {}
+Class APIRequestBadRequestException extends Exception {}
+Class APIRequestGenericException  extends Exception {}
+
+
+
+Class APIRequest {
+	public  $url 		= null;
+
+
+	protected 	$ch 		= null;
+	protected 	$response 	= array(
+		'headers' 		=> array(),
+		'body'			=> ''
+	);
+	private $errno 			= 0;
+	private $errmsg 		= null;
+
+	public function send() {
+		$this->ch = curl_init();
+
+		// set URL and other appropriate options
+		curl_setopt($this->ch, CURLOPT_URL, $this->url);
+		curl_setopt($this->ch, CURLOPT_HEADER, 0);
+		curl_setopt($this->ch, CURLOPT_RETURNTRANSFER, 1);
+
+		curl_setopt($this->ch, CURLOPT_TIMEOUT_MS, 		50000);
+		curl_setopt($this->ch, CURLOPT_CONNECTTIMEOUT_MS, 50000);
+		curl_setopt($this->ch, CURLOPT_NOSIGNAL, 1);
+
+		// grab URL and pass it to the browser
+		$this->response['body'] 			= curl_exec($this->ch);
+		$this->response['headers']['code']	= curl_getinfo($this->ch, CURLINFO_HTTP_CODE);
+
+		$this->errno  	= curl_errno($this->ch);
+		$this->errmsg 	= curl_error($this->ch);
+
+		curl_close($this->ch);
+		$this->_checkResponse();
+	}
+
+	public function getResponse() {
+		return $this->response;
+	}
+
+	protected function _checkResponse() {
+		switch((int) $this->errno) {
+			case 0:
+			break;
+
+			case 28:  // TIMEOUT: RITENTARE DOPO 10MIN (pauseTube)
+				throw new APIRequestTimeoutException();
+			break; 
+
+			default:
+				throw new APIRequestGenericException($this->errmsg, $this->errno);
+				Log::info(sprintf('CURL ERROR %d (%s) ... re-try', curl_errno($ch), curl_error($ch)), array('prefix' => $job_encoded->class));
+			break;
+		}
+	}
+
+
+}
+
+Interface APIRequestInterface {
+	public function parse();
+}
+
+
+

File Libs/Job.php

 	}
 	
 }
-
-
-
-
-Class APIRequestTimeoutException 	extends Exception {}
-Class APIRequestNotFoundedException extends Exception {}
-Class APIRequestBadRequestException extends Exception {}
-Class APIRequestGenericException extends Exception {}
-
-
-Class APIRequest {
-	public  $url 		= null;
-
-
-	protected 	$ch 		= null;
-	protected 	$response 	= array(
-		'headers' 		=> array(),
-		'body'			=> ''
-	);
-	private $errno 			= 0;
-	private $errmsg 		= null;
-
-	public function send() {
-		$this->ch = curl_init();
-
-		// set URL and other appropriate options
-		curl_setopt($this->ch, CURLOPT_URL, $this->url);
-		curl_setopt($this->ch, CURLOPT_HEADER, 0);
-		curl_setopt($this->ch, CURLOPT_RETURNTRANSFER, 1);
-
-		curl_setopt($this->ch, CURLOPT_TIMEOUT_MS, 		50000);
-		curl_setopt($this->ch, CURLOPT_CONNECTTIMEOUT_MS, 50000);
-		curl_setopt($this->ch, CURLOPT_NOSIGNAL, 1);
-
-		// grab URL and pass it to the browser
-		$this->response['body'] 			= curl_exec($this->ch);
-		$this->response['headers']['code']	= curl_getinfo($this->ch, CURLINFO_HTTP_CODE);
-
-		$this->errno  	= curl_errno($this->ch);
-		$this->errmsg 	= curl_error($this->ch);
-
-		curl_close($this->ch);
-		$this->_checkResponse();
-	}
-
-	public function getResponse() {
-		return $this->response;
-	}
-
-	protected function _checkResponse() {
-		switch((int) $this->errno) {
-			case 0:
-			break;
-
-			case 28:  // TIMEOUT: RITENTARE DOPO 10MIN (pauseTube)
-				throw new APIRequestTimeoutException();
-			break; 
-
-			default:
-				throw new APIRequestGenericException($this->errmsg, $this->errno);
-				Log::info(sprintf('CURL ERROR %d (%s) ... re-try', curl_errno($ch), curl_error($ch)), array('prefix' => $job_encoded->class));
-			break;
-		}
-	}
-
-
-}
-
-Interface APIRequestInterface {
-	public function parse();
-}
-
-
-
-// API
-
-Class APIGraph extends APIRequest implements APIRequestInterface {
-	public $url 	= 'http://graph.facebook.com/%d';
-	public $class 	= null;
-
-
-	public $id 		= null;
-	public $created = null;
-
-	public function __construct() {
-		$this->class 		= get_class($this);
-
-		$this->id 			= rand();
-		$this->url 			= sprintf($this->url, $this->id);
-		$this->created 		= date('Y-m-d H:i:s', time());
-	}
-
-	public function parse() {
-		// ERRORE CHIAMATA API: BLOCCARE CODA
-		$status 	= $this->response['body'] !== FALSE;
-		$result 	= json_decode($this->response['body'], true, 500);
-
-		if ($status == true && !isset($result['error'])) {
-			Log::success(sprintf('Trovato avatar utente Facebook <img src="http://graph.facebook.com/%d/picture" /> (http://graph.facebook.com/%d/picture)', $result['id'], $result['id']), array('prefix' => 'APIGraph.request.parse'));
-			return true;
-		} else {
-			Log::error(sprintf('Impossibile trovare avatar per questo facebook user id', array('prefix' => 'APIGraph.request.parse')));
-			return false;
-		} 
-	}
-
-}
-
-
-Class APITimeout extends APIRequest {
-	public $url 	= 'http://localhost/test/beanstalkd/test_api_timeout.php';
-	public $class 	= null;
-
-	public $id 		= null;
-	public $created = null;
-
-	public function __construct() {
-		$this->class 		= get_class($this);
-
-		$this->id 			= rand();
-		$this->created 		= date('Y-m-d H:i:s', time());
-	}
-
-	public function send() {
-		$this->ch = curl_init();
-
-		// set URL and other appropriate options
-		curl_setopt($this->ch, CURLOPT_URL, $this->url);
-		curl_setopt($this->ch, CURLOPT_HEADER, 0);
-		curl_setopt($this->ch, CURLOPT_RETURNTRANSFER, 1);
-
-		curl_setopt($this->ch, CURLOPT_TIMEOUT_MS, 		10);
-		curl_setopt($this->ch, CURLOPT_CONNECTTIMEOUT_MS, 10);
-		curl_setopt($this->ch, CURLOPT_NOSIGNAL, 1);
-
-		// grab URL and pass it to the browser
-		$this->response['body'] 			= curl_exec($this->ch);
-		$this->response['headers']['code']	= curl_getinfo($this->ch, CURLINFO_HTTP_CODE);
-
-		$this->errno  	= curl_errno($this->ch);
-		$this->errmsg 	= curl_error($this->ch);
-
-		curl_close($this->ch);
-		$this->_checkResponse();
-	}
-
-	public function parse() {
-		return false;
-	}
-	
-}
-
-Class APINotFounded extends APIRequest {
-	public $url 	= 'http://localhost/test/beanstalkd/test_api_404.php';
-	public $class 	= null;
-
-	public $id 		= null;
-	public $created = null;
-
-	public function __construct() {
-		$this->class 		= get_class($this);
-
-		$this->id 			= rand();
-		$this->created 		= date('Y-m-d H:i:s', time());
-	}
-	
-	public function parse() {
-		var_dump( $this->respone->headers['code'] );
-		return $this->respone->headers['code'] == 404;
-	}
-}
-
-
-Class APIBadRequest extends APIRequest {
-	public $url 	= 'http://localhost/test/beanstalkd/test_api_500.php';
-
-	public $id 		= null;
-	public $created = null;
-
-	public function __construct() {
-		$this->id 			= rand();
-		$this->created 		= date('Y-m-d H:i:s', time());
-	}
-	
-}
-
-
-

File Libs/Services/APIBadRequest.php

+<?php
+require_once __DIR__ .'/../APIRequest.php';
+
+Class APIBadRequest extends APIRequest {
+	public $url 	= 'http://localhost/test/beanstalkd/test_api_500.php';
+
+	public $id 		= null;
+	public $created = null;
+
+	public function __construct() {
+		$this->id 			= rand();
+		$this->created 		= date('Y-m-d H:i:s', time());
+	}
+	
+}

File Libs/Services/APIGraph.php

+<?php
+require_once __DIR__ .'/../APIRequest.php';
+
+Class APIGraph extends APIRequest implements APIRequestInterface {
+	public $url 	= 'http://graph.facebook.com/%d';
+	public $class 	= null;
+
+
+	public $id 		= null;
+	public $created = null;
+
+	public function __construct() {
+		$this->class 		= get_class($this);
+
+		$this->id 			= rand();
+		$this->url 			= sprintf($this->url, $this->id);
+		$this->created 		= date('Y-m-d H:i:s', time());
+	}
+
+	public function parse() {
+		// ERRORE CHIAMATA API: BLOCCARE CODA
+		$status 	= $this->response['body'] !== FALSE;
+		$result 	= json_decode($this->response['body'], true, 500);
+
+		if ($status == true && !isset($result['error'])) {
+			Log::success(sprintf('Trovato avatar utente Facebook <img src="http://graph.facebook.com/%d/picture" /> (http://graph.facebook.com/%d/picture)', $result['id'], $result['id']), array('prefix' => 'APIGraph.request.parse'));
+			return true;
+		} else {
+			Log::error(sprintf('Impossibile trovare avatar per questo facebook user id', array('prefix' => 'APIGraph.request.parse')));
+			return false;
+		} 
+	}
+
+}

File Libs/Services/APINotFounded.php

+<?php
+require_once __DIR__ .'/../APIRequest.php';
+
+Class APINotFounded extends APIRequest {
+	public $url 	= 'http://localhost/test/beanstalkd/test_api_404.php';
+	public $class 	= null;
+
+	public $id 		= null;
+	public $created = null;
+
+	public function __construct() {
+		$this->class 		= get_class($this);
+
+		$this->id 			= rand();
+		$this->created 		= date('Y-m-d H:i:s', time());
+	}
+	
+	public function parse() {
+		var_dump( $this->respone->headers['code'] );
+		return $this->respone->headers['code'] == 404;
+	}
+}

File Libs/Services/APITimeout.php

+<?php
+require_once __DIR__ .'/../APIRequest.php';
+
+Class APITimeout extends APIRequest {
+	public $url 	= 'http://localhost/test/beanstalkd/test_api_timeout.php';
+	public $class 	= null;
+
+	public $id 		= null;
+	public $created = null;
+
+	public function __construct() {
+		$this->class 		= get_class($this);
+
+		$this->id 			= rand();
+		$this->created 		= date('Y-m-d H:i:s', time());
+	}
+
+	public function send() {
+		$this->ch = curl_init();
+
+		// set URL and other appropriate options
+		curl_setopt($this->ch, CURLOPT_URL, $this->url);
+		curl_setopt($this->ch, CURLOPT_HEADER, 0);
+		curl_setopt($this->ch, CURLOPT_RETURNTRANSFER, 1);
+
+		curl_setopt($this->ch, CURLOPT_TIMEOUT_MS, 		10);
+		curl_setopt($this->ch, CURLOPT_CONNECTTIMEOUT_MS, 10);
+		curl_setopt($this->ch, CURLOPT_NOSIGNAL, 1);
+
+		// grab URL and pass it to the browser
+		$this->response['body'] 			= curl_exec($this->ch);
+		$this->response['headers']['code']	= curl_getinfo($this->ch, CURLINFO_HTTP_CODE);
+
+		$this->errno  	= curl_errno($this->ch);
+		$this->errmsg 	= curl_error($this->ch);
+
+		curl_close($this->ch);
+		$this->_checkResponse();
+	}
+
+	public function parse() {
+		return false;
+	}
+	
+}

File Libs/Worker.php

 
 
 Class Worker {
-	public $queue = null;
+	public 	$queue = null;
+	private $tries = array();
 
+	/**
+	 *	Connect to Beanstalkd
+	 */
 	public function __construct($host, $port) {
 		Log::out(sprintf('Connecting to %s %p', $host, $port), array('prefix' => 'Worker::construct'));
 		$this->queue = new Pheanstalk_Pheanstalk($host, $port);
 	}
 
+	/**
+	 *	Exit Worker
+	 */
   	public function __destruct() {
 		Log::out('Quit', array('prefix' => 'Worker::construct'));
   	}
 
 
+	/**
+	 *	Loop
+	 * 	@param 	str 	$tube 	Tube name
+	 */
 	public function loop($tube = 'defaultTube') {
 		if ($this->queue->getConnection()->isServiceListening() == false) {
-			Log::error('BeanStalkd server is offline... cannot run queue', array('prefix' => 'Beanstalkd::connection'));
+			Log::error('Server is offline... cannot run queue', array('prefix' => 'Beanstalkd::connection'));
 			exit(1);
 		}
 
 
 		while( true /*$this->queue->getConnection()->isServiceListening() == true && ($job = $this->queue->reserve(60))*/ ) {
 			$job	= $this->queue->reserve(WORKER_LOOP_DELAY);
+
 			if (!$job) {
 				$stats = $this->queue->statsTube($tube);
 				if ($stats['pause'] > 0) {
 			}
 
 			$job_encoded 	= json_decode($job->getData(), false);
+
 			if ($retries == JOBS_RETRIES) { 
 				Log::out("<hr />");
 				Log::bold('New job founded ... ', array('newline' => false, 'timestamp' => false, 'prefix' => ''));
 				Log::code(print_r($job_encoded, true), array('timestamp' => false));
 			}
 
+			if (!class_exists($job_encoded->class)) {
+				$classFile = sprintf('Libs/Services/%s.php',  $job_encoded->class);
+				require_once $classFile;
+			}
 
 			$JObject = new $job_encoded->class;
 			for($retries = JOBS_RETRIES; $retries > 0; $retries--) {
 					$JObject->send();
 					// Callback per parsare il risultato, ed eventualmente fare operazioni su database
 					if ($JObject->parse()) {
+						$response 	= $JObject->getResponse();
 						$this->queue->delete($job);
-						$retry = false;
 
-						$response = $JObject->getResponse();
 						Log::success(sprintf('API REQUEST - %s', $response['headers']['code']), array('prefix' => $job_encoded->class));
 						Log::out('Removed from queue', array('prefix'  => 'Job::delete'));
 						break;
 					} else {	// Chiamata API OK, ma il risultato non è quello sperato 
 						Log::info('Risultato non valido... tento di nuovo');
 						$this->queue->release($job);
+						break;
 					}
 				} catch(APIRequestTimeoutException $e) {
 					Log::error('CURL timeout error .... pauseTube (3 minutes) !', array('prefix' => $job_encoded->class));
 		}
 	}
 
+	/**
+	 *	Check memory limit
+	 */
 	public function checkProcessMemoryLimit() {
 		// PHP (god)
 		$memory = memory_get_usage(true);
 		}
 	}
 
+	/**
+	 *	Loop worker
+	 *
+	 * 	@param 	str 	$tube 		Nome tube Beanstalkd
+	 * 	@param 	bool 	$return 	if true return stats in array, otherwise print stats
+	 */
 	public function stats($tube = false, $return = false) {
 		if (!$tube) {
 			$stats 	= $this->queue->statsTube();

File client.jobfactory.php

+<?php
+error_reporting(E_ALL);
+ini_set('display_errors', '1');
+
+define('JOBS_TOTAL', isset($argv[1]) ? $argv[1] : 1);
+define('JOBS_RETRIES', 3);
+
+
+// If you aren't using composer, register Pheanstalk class loader
+require_once 'Vendors/Pheanstalk/pheanstalk_init.php';
+require_once 'Libs/Log.php';
+require_once 'Libs/Worker.php';
+require_once 'Libs/Job.php';
+?>
+
+<html>
+	<body>
+		<h1>BeanStalkd test</h1>
+		<div>
+		<?php 
+			Log::out('Jobs Factory ... ', array('prefix' => 'Beanstalkd::init'));
+
+			$tube = 'defaultTube';
+			$BeanStalkWorker = new Worker('127.0.0.1', 6666);
+
+			// JobFactory
+			$API = array(/*'APINotFounded', 'APITimeout',*/ 'APIGraph');
+			for($i=0; $i< JOBS_TOTAL ; $i++) {
+				$apiIndex 	= array_rand($API);
+				$apiClass 	= $API[$apiIndex];
+
+				if (!class_exists($apiClass)) {
+					require_once sprintf('Libs/Services/%s.php', $apiClass);
+				}
+
+				$job 		= new $apiClass;
+				$jobData 	= json_encode($job);
+
+				Job::push($BeanStalkWorker->queue, $jobData, $tube);
+			}
+
+			Log::info(sprintf('%d nuovi jobs sono stati inseriti nel tube %s', JOBS_TOTAL, $tube), array('prefix' => ''));
+		?>
+		</div>
+	</body>
+</html>

File client.worker.php

+<?php
+error_reporting(E_ALL);
+ini_set('display_errors', '1');
+define('WORKER_LOOP_DELAY', 20);
+
+
+// If you aren't using composer, register Pheanstalk class loader
+require_once 'Vendors/Pheanstalk/pheanstalk_init.php';
+require_once 'Libs/Log.php';
+require_once 'Libs/Worker.php';
+?>
+
+<html>
+	<body>
+		<h1>BeanStalkd test</h1>
+		<div>
+		<?php 
+			$tube = 'defaultTube';
+			Log::out('Beanstalk::Worker test...', array('prefix' => 'Benstalk::client'));
+			$BeanStalkWorker = new Worker('127.0.0.1', 6666);
+			$BeanStalkWorker->loop($tube);
+		?>
+		</div>
+	</body>
+</html>

File start-server.sh

 pkill=$(which pkill)
 $pkill beanstealkd
 
-beanstalkd -l 127.0.0.1 -p 6666 -V
+beanstalkd -l 127.0.0.1 -p 6666 -V -b beanstalkd-logs