Commits

Thejesh GN committed 0b7a794

added web application and modified the folder paths.

Comments (0)

Files changed (100)

pp-web.sublime-project

+{
+	"auto_complete":
+	{
+		"selected_items":
+		[
+			[
+				"award_cat",
+				"award_category_personal"
+			],
+			[
+				"award_",
+				"award_category_album"
+			],
+			[
+				"award_c",
+				"award_category_song"
+			]
+		]
+	},
+	"buffers":
+	[
+	],
+	"build_system": "",
+	"command_palette":
+	{
+		"height": 0,
+		"selected_items":
+		[
+		],
+		"width": 0
+	},
+	"console":
+	{
+		"height": 0
+	},
+	"distraction_free":
+	{
+		"menu_visible": true,
+		"show_minimap": false,
+		"show_tabs": false,
+		"side_bar_visible": false,
+		"status_bar_visible": false
+	},
+	"file_history":
+	[
+		"/home/thej/Documents/code/pp-web/src/start.py",
+		"/home/thej/Documents/code/pp-web/src/model/server.py",
+		"/home/thej/Documents/code/pp-web/src/model/worker.py",
+		"/home/thej/Documents/code/pp-web/src/templates/worker.html",
+		"/home/thej/Documents/code/pp-web/start.py",
+		"/home/thej/Documents/code/pp-web/config.py",
+		"/home/thej/Documents/code/pp-web/webapp/model/server.py",
+		"/home/thej/Documents/code/pp-web/webapp/templates/server.html",
+		"/home/thej/Documents/code/pp-web/webapp/model/worker.py",
+		"/home/thej/Documents/code/janaagraha/attendance/templates/index.html",
+		"/home/thej/Documents/code/pp-web/webapp/templates/worker.html",
+		"/home/thej/Documents/code/janaagraha/attendance/start.py",
+		"/home/thej/Documents/code/janaagraha/attendance/templates/header.html",
+		"/home/thej/Documents/code/pp-web/pp/README",
+		"/home/thej/Documents/code/pp-web/webapp/templates/client.html",
+		"/home/thej/Documents/code/pp-web/webapp/model/client.py",
+		"/home/thej/Documents/code/web-up/src/index.py",
+		"/home/thej/Documents/code/pp-web/app.config",
+		"/home/thej/Documents/code/pp-web/pp/doc/example.config",
+		"/home/thej/Documents/code/pp-web/pp/doc/ppserver.1",
+		"/home/thej/Documents/code/pp-web/pp/MANIFEST.in",
+		"/home/thej/Documents/code/pp-web/index.py",
+		"/home/thej/Documents/code/janaagraha/attendance/model/home.py",
+		"/home/thej/Documents/code/pp-web/client/client.py",
+		"/home/thej/Documents/code/pp-web/server/server.py",
+		"/home/thej/Documents/code/pp-web/callback.py",
+		"/home/thej/Documents/code/pp-web/pp/ppserver.py",
+		"/home/thej/Documents/code/pp-web/pp_examples/callback.py",
+		"/home/thej/Documents/code/src/index.py",
+		"/home/thej/Documents/code/pp-web/sum_primes.py",
+		"/home/thej/Documents/code/pp-web/examples/sum_primes.py",
+		"/home/thej/.ssh/known_hosts",
+		"/home/thej/Documents/code/pp-web/pp/examples/sum_primes.py",
+		"/home/thej/Documents/code/pp-web/pp/pp.py",
+		"/home/thej/Documents/code/quake/readme.md",
+		"/home/thej/Documents/code/web-up/src/infra/db.py",
+		"/home/thej/Documents/code/quake/original-readme.txt",
+		"/home/thej/Documents/code/thejeshgn.com/html/presentations/processing101/index.html",
+		"/home/thej/Documents/code/thejeshgn.com/html/presentations/processing101/module1/0.html",
+		"/home/thej/Documents/code/thejeshgn.com/html/presentations/processing101/module3/7.html",
+		"/home/thej/Documents/code/thejeshgn.com/html/presentations/processing101/module4/0.html",
+		"/home/thej/Documents/code/thejeshgn.com/html/presentations/index.html",
+		"/home/thej/Documents/code/mavrix/web/src/infra/external.py",
+		"/home/thej/Documents/code/pycco/pycco/main.py",
+		"/home/thej/Documents/code/quake/readme.txt",
+		"/home/thej/Documents/code/thejeshgn.com/html/presentations/data-visualization-javascript/index.html",
+		"/home/thej/Documents/code/pycco/pycco/AUTHORS",
+		"/home/thej/Documents/web-up/src/algo/recommendations.py",
+		"/home/thej/Documents/web-up/.hgignore",
+		"/home/thej/Documents/web-up/src/index.py",
+		"/media/SONY/web-up/.hgignore",
+		"/home/thej/Documents/code/mavrix/web/README",
+		"/home/thej/Documents/code/thejeshgn.com/html/blog/wp-config.php",
+		"/home/thej/Documents/code/janaagraha/data/README",
+		"/home/thej/Documents/code/processing101/presentation/module3/1.html",
+		"/home/thej/Documents/code/janaagraha/data/README.html",
+		"/home/thej/Documents/code/janaagraha/data/terms.html",
+		"/home/thej/Documents/code/janaagraha/data/README.txt",
+		"/home/thej/Documents/code/mavrix/web/build/fabfile.py",
+		"/home/thej/Documents/code/mavrix/web/src/index.py",
+		"/home/thej/Documents/code/mavrix/web/.hgignore",
+		"/home/thej/Desktop/anju",
+		"/home/thej/Documents/docs/thej/ascii",
+		"/home/thej/Documents/code/mavrix/web/src/functions/general.py",
+		"/home/thej/Documents/code/mavrix/web/src/config/webMsg.py",
+		"/home/thej/.config/sublime-text-2/Packages/User/Base File.sublime-settings",
+		"/home/thej/.config/sublime-text-2/Packages/User/Default (Linux).sublime-keymap",
+		"/home/thej/.config/sublime-text-2/Packages/Default/Default (Linux).sublime-keymap",
+		"/home/thej/.config/sublime-text-2/Packages/Default/Base File.sublime-settings",
+		"/home/thej/.config/sublime-text-2/Packages/User/Distraction Free.sublime-settings",
+		"/home/thej/Documents/code/treemap/src/libraries/treemap/library/treemap.jar"
+	],
+	"find":
+	{
+		"height": 39
+	},
+	"find_in_files":
+	{
+		"height": 87,
+		"include_history":
+		[
+			""
+		],
+		"location_history":
+		[
+			"<open folders>"
+		]
+	},
+	"find_state":
+	{
+		"case_sensitive": false,
+		"find_history":
+		[
+			"static",
+			"stattic",
+			"confirmEmail",
+			"email",
+			"confirm",
+			"USER_PASSWORD_SALT"
+		],
+		"highlight": true,
+		"in_selection": false,
+		"preserve_case": false,
+		"regex": false,
+		"replace_history":
+		[
+		],
+		"reverse": false,
+		"show_context": true,
+		"use_buffer": true,
+		"whole_word": false,
+		"wrap": true
+	},
+	"folders":
+	{
+		"mount_points":
+		[
+			"/home/thej/Documents/code/pp-web"
+		]
+	},
+	"groups":
+	[
+		{
+			"sheets":
+			[
+			]
+		}
+	],
+	"incremental_find":
+	{
+		"height": 0
+	},
+	"input":
+	{
+		"height": 0
+	},
+	"layout":
+	{
+		"cells":
+		[
+			[
+				0,
+				0,
+				1,
+				1
+			]
+		],
+		"cols":
+		[
+			0,
+			1
+		],
+		"rows":
+		[
+			0,
+			1
+		]
+	},
+	"menu_visible": true,
+	"replace":
+	{
+		"height": 0
+	},
+	"save_all_on_build": true,
+	"select_file":
+	{
+		"height": 0,
+		"selected_items":
+		[
+			[
+				"wor",
+				"webapp/templates/worker.html"
+			],
+			[
+				"serv",
+				"webapp/templates/server.html"
+			],
+			[
+				"client",
+				"webapp/templates/client.html"
+			],
+			[
+				"serve",
+				"server/server.py"
+			],
+			[
+				"in",
+				"index.py"
+			],
+			[
+				"sess",
+				"src/infra/session.py"
+			],
+			[
+				"confi",
+				"src/config/settings.py"
+			],
+			[
+				"user",
+				"src/infra/userMgmt.py"
+			],
+			[
+				"we",
+				"config/webMsg.py"
+			]
+		],
+		"width": 0
+	},
+	"select_project":
+	{
+		"height": 500,
+		"selected_items":
+		[
+			[
+				"",
+				"/home/thej/Documents/code/projects/treemap.sublime-project"
+			],
+			[
+				"/home/thej/Documents/docs/projects/",
+				"/home/thej/Documents/docs/projects/mavrix-web.sublime-project"
+			]
+		],
+		"width": 380
+	},
+	"show_minimap": true,
+	"show_tabs": true,
+	"side_bar_visible": true,
+	"side_bar_width": 198,
+	"status_bar_visible": true
+}

pp/AUTHORS

-Vitalii Vanovschi - support@parallelpython.com

pp/CHANGELOG

-pp-1.6.1:
-    1) Fixed struct.unpack("!Q", size_packed) bug which started to happen with Python 2.7 on certain platforms.
-    2) Fixed bug with auto-discovery not working after ppserver is restarted.
-    3) Added full support of python import statements. 
-       For instance "from numpy.linalg import det as determinant" is now supported.
-       For compatibility old module name imports will continue to work.
-    4) Exposed more detailed network error messages in ppserver.py.
-
-pp-1.6.0:
-    1) Changed logging mechanism. Now logger is obtained as logging.getLogger('pp').
-    2) Modified ppworker to use exec instead of eval.
-    3) Modified exception handling on destruction. Now if server was destroyed,
-       uncompleted jobs throw DestroyedServerError exception on call.
-    4) Fixed issue with submitting a method of an instance of a class inherited from another.
-    5) Added timeouts to all socket operations.
-    6) Changed default proto type to 2.
-    7) Moved from thread module to threading. Made all pp threads daemons.
-    8 ) Refactored ppserver.py to improve testability
-    9) Fixed bug with ppsecret in user module
-Changes w.r.t RC1:
-    10) Fixed issue with argument which is an instance of an imported class
-Changes w.r.t RC2:
-    11) Fixed DEBUG logging in ppserver.
-    12) Added a flag (-f) to ppserver to set a custom log format. Changed default log format.
-    13) Made printing of the expected exceptions optional and improved the way they are handled.
-    14) Removed default logging handler from pp module (to improve logging flexibility).
-Changes w.r.t RC3:
-    15) Created a common module ppcommon.py and moved common functions there.
-    16) Fixed issue with pipes not being closed.
-Changes w.r.t. RC4:
-    17) Fixed issues with ppserver exiting on first connection.
-    18) Fixed deadlock when using ppworker restart option.
-    19) Enables support for submodule importing.
-
-pp-1.5.7:
-    1) Added ppworker restart after task completion functionality
-    2) Added pickle protocol option  
-    3) Merged patch for Python 2.6 compatibility (contributed by mrtss)
-    4) Merged patch for config file support (contributed by stevemilner)
-    5) Documentation has been moved to doc folder
-
-pp-1.5.6
-    1) Fixed problem with autodiscovery service on Winsows XP and Vista
-    2) Merged new code quality improvement patches (contributed by stevemilner)
-
-pp-1.5.5
-    1) Fixed bug which caused segmentation fault when calling destroy() method. 
-    2) Merged performance and quality improvement patches (contributed by stevemilner)
-
-pp-1.5.4
-    1) Fixed bug with unindented comments
-    2) easy_intall functionality repaired
-    3) Code quality improved (many small changes)
-
-pp-1.5.3
-    1) Added support for methods of new-style classes.
-    2) Added ability to read secret key from pp_secret variable of .pythonrc.py
-    3) ppdoc.html and ppserver.1 are included in the distribution
-    4) examples bundled with the distribution
-CHANGELOG started
-
-* - nicknames of the contributors refer to the PP forum profile login names.

pp/COPYING

-Parallel Python Software: http://www.parallelpython.com
-Copyright (c) 2005-2010, Vitalii Vanovschi
-All rights reserved.
-Redistribution and use in source and binary forms, with or without 
-modification, are permitted provided that the following conditions are met:
-   * Redistributions of source code must retain the above copyright notice, 
-     this list of conditions and the following disclaimer.
-   * Redistributions in binary form must reproduce the above copyright notice,
-     this list of conditions and the following disclaimer in the documentation 
-     and/or other materials provided with the distribution.
-   * Neither the name of the author nor the names of its contributors 
-     may be used to endorse or promote products derived from this software 
-     without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
-ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
-LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
-CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
-SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF 
-THE POSSIBILITY OF SUCH DAMAGE.

pp/MANIFEST.in

-include AUTHORS
-include COPYING
-include MANIFEST.in
-include CHANGELOG
-include PKG-INFO
-include README
-include python-restlib.spec
-include examples/auto_diff.py
-include examples/callback.py
-include examples/dynamic_ncpus.py
-include examples/quicksort.py
-include examples/reverse_md5.py
-include examples/sum_primes.py
-include examples/sum_primes_functor.py
-recursive-include doc/ *

pp/PKG-INFO

-Metadata-Version: 1.0
-Name: pp
-Version: 1.6.1
-Summary: Parallel and distributed programming for Python
-Home-page: http://www.parallelpython.com
-Author: Vitalii Vanovschi
-Author-email: support@parallelpython.com
-License: BSD-like
-Download-URL: http://www.parallelpython.com/downloads/pp/pp-1.6.1.zip
-Description: 
-        Parallel Python module (PP) provides an easy and efficient way to create parallel-enabled applications for SMP computers and clusters. PP module features cross-platform portability and dynamic load balancing. Thus application written with PP will parallelize efficiently even on heterogeneous and multi-platform clusters (including clusters running other application with variable CPU loads). Visit http://www.parallelpython.com for further information.
-        
-Platform: Windows
-Platform: Linux
-Platform: Unix
-Classifier: Topic :: Software Development
-Classifier: Topic :: System :: Distributed Computing
-Classifier: Programming Language :: Python
-Classifier: Operating System :: OS Independent
-Classifier: License :: OSI Approved :: BSD License
-Classifier: Natural Language :: English
-Classifier: Intended Audience :: Developers
-Classifier: Development Status :: 5 - Production/Stable

pp/README

-Visit http://www.parallelpython.com for up-to-date documentation, examples and support forums
-
-INSTALATION: 
-    python setup.py install
-
-LOCAL DOCUMENTATION:
-    <htmlviewer> pydoc.html

pp/__init__.py

Empty file removed.

pp/__init__.pyc

Binary file removed.

pp/doc/example.config

-[general]
-debug = True
-workers = 2
-secret = epo20pdosl;dksldkmm
-proto = 0
-restart = False
-
-[network]
-autodiscovery = False
-interface = 0.0.0.0
-broadcast = 255.255.255.255
-port = 60000
-timeout = 10

pp/doc/ppdoc.html

-<?xml version="1.0" encoding="iso-8859-1"?><!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
-<head>
-<title>Parallel Python - Parallel Python documentation</title>
-</head>
-<body>
-&nbsp;&nbsp;Please visit <a href='http://www.parallelpython.com'>http://www.parallelpython.com</a> for extended up-to-date
-documentation, examples and support forums	
-
-		<div class="mainbody">
-						<table class="contentpaneopen">
-			<tr>
-								<td class="contentheading" width="100%">
-			<h1>		Parallel Python documentation	</h1>								</td>
-						</tr>
-			</table>
-
-		<table class="contentpaneopen">
-				<tr>
-			<td valign="top" colspan="2">
-				<p>&nbsp;<a href="#API">Module API</a> <br />&nbsp;<a href="#QUICKSMP">Quick start guide, SMP</a><br />&nbsp;<a href="#QUICKCLUSTERS">Quick start guide, clusters<br />&nbsp;</a><a href="#QUICKCLUSTERSAUTO">Quick start guide, clusters with auto-discovery</a><br /> &nbsp;<a href="#ADVANCEDCLUSTERS">Advanced guide, clusters</a><br /> &nbsp;<a href="#COMMANDLINE">Command line arguments, ppserver.py</a><br />&nbsp;<a href="#SECURITY">Security and secret key</a>  </p><hr /> <p>&nbsp;</p><h1 id="API">&nbsp; pp 1.6.0 module API</h1>   <p> <table border="0" cellspacing="0" cellpadding="2" width="100%" summary="section"> <tbody><tr bgcolor="#ffc8d8"> <td colspan="3" valign="bottom">&nbsp;<br /> <font face="helvetica, arial" color="#000000"><a name="Server" title="Server"></a>class <strong>Server</strong></font></td></tr>      <tr bgcolor="#ffc8d8"><td rowspan="2">&nbsp;&nbsp;&nbsp;</td> <td colspan="2">Parallel&nbsp;Python&nbsp;SMP&nbsp;execution&nbsp;server&nbsp;class<br />&nbsp;</td></tr> <tr><td>&nbsp;</td> <td width="100%">Methods defined here:<br />  <dl><dt><a name="Server-__init__" title="Server-__init__"></a><strong>__init__</strong>(self, ncpus<font color="#909090">=&#39;autodetect&#39;</font>, ppservers<font color="#909090">=()</font>, secret<font color="#909090">=None</font>, restart<font color="#909090">=False</font>, proto<font color="#909090">=0</font>)</dt><dd>Creates&nbsp;<a href="#Server">Server</a>&nbsp;instance<br />  &nbsp;<br /> ncpus&nbsp;-&nbsp;the&nbsp;number&nbsp;of&nbsp;worker&nbsp;processes&nbsp;to&nbsp;start&nbsp;on&nbsp;the&nbsp;local&nbsp;<br /> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;computer,&nbsp;if&nbsp;parameter&nbsp;is&nbsp;omitted&nbsp;it&nbsp;will&nbsp;be&nbsp;set&nbsp;to&nbsp;<br />  &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;the&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system<br /> ppservers&nbsp;-&nbsp;list&nbsp;of&nbsp;active&nbsp;parallel&nbsp;python&nbsp;execution&nbsp;servers&nbsp;<br />  &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;to&nbsp;connect&nbsp;with<br /> secret&nbsp;-&nbsp;passphrase&nbsp;for&nbsp;network&nbsp;connections,&nbsp;if&nbsp;omitted&nbsp;a&nbsp;default<br /> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;passphrase&nbsp;will&nbsp;be&nbsp;used.&nbsp;It&#39;s&nbsp;highly&nbsp;recommended&nbsp;to&nbsp;use&nbsp;a&nbsp;<br />  &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;custom&nbsp;passphrase&nbsp;for&nbsp;all&nbsp;network&nbsp;connections.<br /> restart&nbsp;-&nbsp;whether&nbsp;to&nbsp;restart&nbsp;worker&nbsp;process&nbsp;after&nbsp;each&nbsp;task&nbsp;completion&nbsp;<br/> proto&nbsp;-&nbsp;protocol&nbsp;number&nbsp;for&nbsp;pickle&nbsp;module&nbsp;<br /><br /> With&nbsp;ncpus&nbsp;=&nbsp;1&nbsp;all&nbsp;tasks&nbsp;are&nbsp;executed&nbsp;consequently<br /> For&nbsp;the&nbsp;best&nbsp;performance&nbsp;either&nbsp;use&nbsp;the&nbsp;default&nbsp;&quot;autodetect&quot;&nbsp;value<br />  or&nbsp;set&nbsp;ncpus&nbsp;to&nbsp;the&nbsp;total&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system</dd></dl><dl><dt><a name="Server-destroy" title="Server-destroy"></a><strong>destroy</strong>(self)</dt><dd>Kills&nbsp;local&nbsp;ppworkers&nbsp;and&nbsp;closes&nbsp;open&nbsp;files</dd></dl>  <dl><dt><a name="Server-get_active_nodes" title="Server-get_active_nodes"></a><strong>get_active_nodes</strong>(self)</dt><dd>Returns&nbsp;active&nbsp;nodes&nbsp;as&nbsp;a&nbsp;dictionary&nbsp;<br />  [keys&nbsp;-&nbsp;nodes,&nbsp;values&nbsp;-&nbsp;ncpus]</dd></dl>  <dl><dt><a name="Server-get_ncpus" title="Server-get_ncpus"></a><strong>get_ncpus</strong>(self)</dt><dd>Returns&nbsp;the&nbsp;number&nbsp;of&nbsp;local&nbsp;worker&nbsp;processes&nbsp;(ppworkers)</dd></dl>  <dl><dt><a name="Server-get_stats" title="Server-get_stats"></a><strong>get_stats</strong>(self)</dt><dd>Returns&nbsp;job&nbsp;execution&nbsp;statistics&nbsp;as&nbsp;a&nbsp;dictionary</dd></dl><dl><dt><a name="Server-print_stats" title="Server-print_stats"></a><strong>print_stats</strong>(self)</dt><dd>Prints&nbsp;job&nbsp;execution&nbsp;statistics.&nbsp;Useful&nbsp;for&nbsp;benchmarking&nbsp;on&nbsp;<br />  clusters</dd></dl>  <dl><dt><a name="Server-set_ncpus" title="Server-set_ncpus"></a><strong>set_ncpus</strong>(self, ncpus<font color="#909090">=&#39;autodetect&#39;</font>)</dt><dd>Sets&nbsp;the&nbsp;number&nbsp;of&nbsp;local&nbsp;worker&nbsp;processes&nbsp;(ppworkers)<br /> &nbsp;<br />  ncpus&nbsp;-&nbsp;the&nbsp;number&nbsp;of&nbsp;worker&nbsp;processes,&nbsp;if&nbsp;parammeter&nbsp;is&nbsp;omitted<br /> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;it&nbsp;will&nbsp;be&nbsp;set&nbsp;to&nbsp;the&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system</dd></dl>  <dl><dt><a name="Server-submit" title="Server-submit"></a><strong>submit</strong>(self, func, args<font color="#909090">=()</font>, depfuncs<font color="#909090">=()</font>, modules<font color="#909090">=()</font>, callback<font color="#909090">=None</font>, callbackargs<font color="#909090">=()</font>, group<font color="#909090">=&#39;default&#39;</font>, globals<font color="#909090">=None</font>)</dt><dd>Submits&nbsp;function&nbsp;to&nbsp;the&nbsp;execution&nbsp;queue<br />  &nbsp;<br /> func&nbsp;-&nbsp;function&nbsp;to&nbsp;be&nbsp;executed<br /> args&nbsp;-&nbsp;tuple&nbsp;with&nbsp;arguments&nbsp;of&nbsp;the&nbsp;&#39;func&#39;<br />  depfuncs&nbsp;-&nbsp;tuple&nbsp;with&nbsp;functions&nbsp;which&nbsp;might&nbsp;be&nbsp;called&nbsp;from&nbsp;&#39;func&#39;<br /> modules&nbsp;-&nbsp;tuple&nbsp;with&nbsp;module&nbsp;names&nbsp;to&nbsp;import<br />  callback&nbsp;-&nbsp;callback&nbsp;function&nbsp;which&nbsp;will&nbsp;be&nbsp;called&nbsp;with&nbsp;argument&nbsp;<br /> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;list&nbsp;equal&nbsp;to&nbsp;callbackargs+(result,)&nbsp;<br /> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;as&nbsp;soon&nbsp;as&nbsp;calculation&nbsp;is&nbsp;done<br />  callbackargs&nbsp;-&nbsp;additional&nbsp;arguments&nbsp;for&nbsp;callback&nbsp;function<br /> group&nbsp;-&nbsp;job&nbsp;group,&nbsp;is&nbsp;used&nbsp;when&nbsp;<a href="#Server-wait">wait</a>(group)&nbsp;is&nbsp;called&nbsp;to&nbsp;wait&nbsp;for<br />  jobs&nbsp;in&nbsp;a&nbsp;given&nbsp;group&nbsp;to&nbsp;finish<br /> globals&nbsp;-&nbsp;dictionary&nbsp;from&nbsp;which&nbsp;all&nbsp;modules,&nbsp;functions&nbsp;and&nbsp;classes<br />  will&nbsp;be&nbsp;imported,&nbsp;for&nbsp;instance:&nbsp;globals=globals()</dd></dl>  <dl><dt><a name="Server-wait" title="Server-wait"></a><strong>wait</strong>(self, group<font color="#909090">=None</font>)</dt><dd>Waits&nbsp;for&nbsp;all&nbsp;jobs&nbsp;in&nbsp;a&nbsp;given&nbsp;group&nbsp;to&nbsp;finish.<br />  If&nbsp;group&nbsp;is&nbsp;omitted&nbsp;waits&nbsp;for&nbsp;all&nbsp;jobs&nbsp;to&nbsp;finish</dd></dl>  <dl><dt><strong>default_port</strong> = 60000</dt></dl>  <dl><dt><strong>default_secret</strong> = &#39;epo20pdosl;dksldkmm&#39;</dt></dl>  </td></tr></tbody></table> </p>
-
-
-<p> <table border="0" cellspacing="0" cellpadding="2" width="100%" summary="section"> <tbody><tr bgcolor="#ffc8d8"> <td colspan="3" valign="bottom">&nbsp;<br /> <font face="helvetica, arial" color="#000000"><a name="Template" title="Template"></a>class <strong>Template</strong></font></td></tr>      <tr bgcolor="#ffc8d8"><td rowspan="2">&nbsp;&nbsp;&nbsp;</td> <td colspan="2"><a href="#Template">Template</a>&nbsp;class<br />&nbsp;</td></tr>  <tr><td>&nbsp;</td> <td width="100%">Methods defined here:<br /> <dl><dt><a name="Template-__init__" title="Template-__init__"></a><strong>__init__</strong>(self, job_server, func, depfuncs<font color="#909090">=()</font>, modules<font color="#909090">=()</font>, callback<font color="#909090">=None</font>, callbackargs<font color="#909090">=()</font>, group<font color="#909090">=&#39;default&#39;</font>, globals<font color="#909090">=None</font>)</dt><dd>Creates&nbsp;<a href="#Template">Template</a>&nbsp;instance<br />  &nbsp;<br /> jobs_server&nbsp;-&nbsp;pp&nbsp;server&nbsp;for&nbsp;submitting&nbsp;jobs<br /> func&nbsp;-&nbsp;function&nbsp;to&nbsp;be&nbsp;executed<br /> depfuncs&nbsp;-&nbsp;tuple&nbsp;with&nbsp;functions&nbsp;which&nbsp;might&nbsp;be&nbsp;called&nbsp;from&nbsp;&#39;func&#39;<br />  modules&nbsp;-&nbsp;tuple&nbsp;with&nbsp;module&nbsp;names&nbsp;to&nbsp;import<br /> callback&nbsp;-&nbsp;callback&nbsp;function&nbsp;which&nbsp;will&nbsp;be&nbsp;called&nbsp;with&nbsp;argument&nbsp;<br />  &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;list&nbsp;equal&nbsp;to&nbsp;callbackargs+(result,)&nbsp;<br /> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;as&nbsp;soon&nbsp;as&nbsp;calculation&nbsp;is&nbsp;done<br /> callbackargs&nbsp;-&nbsp;additional&nbsp;arguments&nbsp;for&nbsp;callback&nbsp;function<br />  group&nbsp;-&nbsp;job&nbsp;group,&nbsp;is&nbsp;used&nbsp;when&nbsp;wait(group)&nbsp;is&nbsp;called&nbsp;to&nbsp;wait&nbsp;for<br /> jobs&nbsp;in&nbsp;a&nbsp;given&nbsp;group&nbsp;to&nbsp;finish<br />  globals&nbsp;-&nbsp;dictionary&nbsp;from&nbsp;which&nbsp;all&nbsp;modules,&nbsp;functions&nbsp;and&nbsp;classes<br /> will&nbsp;be&nbsp;imported,&nbsp;for&nbsp;instance:&nbsp;globals=globals()</dd></dl>  <dl><dt><a name="Template-submit" title="Template-submit"></a><strong>submit</strong>(self, *args)</dt><dd>Submits&nbsp;function&nbsp;with&nbsp;*arg&nbsp;arguments&nbsp;to&nbsp;the&nbsp;execution&nbsp;queue</dd></dl>   </td></tr></tbody></table></p>
-
-
-<p> <table border="0" cellspacing="0" cellpadding="2" width="100%" summary="section">  <tbody><tr bgcolor="#55aa55"> <td colspan="3" valign="bottom">&nbsp;<br /> <font face="helvetica, arial" color="#ffffff"><strong>Data</strong></font></td></tr>      <tr><td bgcolor="#55aa55">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;</td><td>&nbsp;</td> <td width="100%"><strong>copyright</strong> = &#39;Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved&#39;<br /> <strong>version</strong> = &#39;1.6.0&#39;</td></tr></tbody></table>      </p><hr /><h1 id="QUICKSMP">&nbsp; Quick start guide, SMP<br /></h1> <p>1) Import pp module:</p><p><strong>&nbsp;&nbsp;&nbsp; import pp</strong></p><p>2) Start pp execution server with the number of workers set to&nbsp;the&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system </p><p><strong>&nbsp;&nbsp;&nbsp; job_server = pp.Server()&nbsp;</strong></p><p>3) Submit all the tasks for parallel execution:</p><p><strong>&nbsp;&nbsp;&nbsp; f1 = job_server.submit(func1, args1, depfuncs1, modules1)</strong></p><p><strong>&nbsp;&nbsp;&nbsp; f2 = job_server.submit(func1, args2, depfuncs1, modules1) </strong></p><p><strong>&nbsp;&nbsp;&nbsp; f3 = job_server.submit(func2, args3, depfuncs2, modules2) </strong><br /> </p><p>&nbsp;&nbsp; ...etc...<br /></p><p>4) Retrieve the results as needed:</p><p><strong>&nbsp;&nbsp;&nbsp; r1 = f1()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r2 = f2()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r3 = f3()&nbsp;</strong> </p><p>&nbsp;&nbsp;&nbsp; ...etc...</p><p>&nbsp;To find out how to achieve efficient parallelization with pp please take a look at <a href="http://www.parallelpython.com/content/view/17/31/" title="Parallel Python Implementation Examples">examples</a> </p> <hr /><h1 id="QUICKCLUSTERS">&nbsp; Quick start guide, clusters&nbsp; </h1><p><em><strong>On the nodes</strong></em> <br /></p><p>1) Start parallel python execution server on all your remote computational nodes:</p><p><strong>&nbsp;&nbsp;&nbsp; node-1&gt; ./ppserver.py </strong></p><p><strong>&nbsp;&nbsp;&nbsp; node-2&gt; ./ppserver.py</strong></p><p><strong>&nbsp;&nbsp;&nbsp; node-3&gt; ./ppserver.py</strong></p><p><em><strong>On the client</strong></em> <br /></p><p>2) Import pp module:</p><p><strong>&nbsp;&nbsp;&nbsp; import pp</strong></p><p>3)&nbsp; Create a list of all the nodes in your cluster (computers where you&#39;ve run ppserver.py) </p><p><strong>&nbsp;&nbsp;&nbsp; ppservers=(&quot;node-1&quot;, &quot;node-2&quot;, &quot;node-3&quot;)</strong><br /></p><p>4) Start pp execution server with the number of workers set to&nbsp;the&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system and list of ppservers to connect with :</p><p><strong>&nbsp;&nbsp;&nbsp; job_server = pp.Server(</strong><strong>ppservers=</strong><strong>ppservers</strong><strong>)&nbsp;</strong></p><p>5) Submit all the tasks for parallel execution:</p><p><strong>&nbsp;&nbsp;&nbsp; f1 = job_server.submit(func1, args1, depfuncs1, modules1)</strong></p><p><strong>&nbsp;&nbsp;&nbsp; f2 = job_server.submit(func1, args2, depfuncs1, modules1) </strong></p><p><strong>&nbsp;&nbsp;&nbsp; f3 = job_server.submit(func2, args3, depfuncs2, modules2) </strong><br /> </p><p>&nbsp;&nbsp; ...etc...<br /></p><p>6) Retrieve the results as needed:</p><p><strong>&nbsp;&nbsp;&nbsp; r1 = f1()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r2 = f2()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r3 = f3()&nbsp;</strong> </p><p>&nbsp;&nbsp;&nbsp; ...etc...</p><p>&nbsp;To find out how to achieve efficient parallelization with pp please take a look at <a href="http://www.parallelpython.com/content/view/17/31/" title="Parallel Python Implementation Examples">examples</a></p> <hr /><h1 id="QUICKCLUSTERSAUTO">&nbsp; Quick start guide, clusters with autodiscovery<br /> </h1><p><em><strong>On the nodes</strong></em>&nbsp;</p><p>1) Start parallel python execution server on all your remote computational nodes:</p><p><strong>&nbsp;&nbsp;&nbsp; node-1&gt; ./ppserver.py -a<br /> </strong></p><p><strong>&nbsp;&nbsp;&nbsp; node-2&gt; ./ppserver.py -a</strong></p><p><strong>&nbsp;&nbsp;&nbsp; node-3&gt; ./ppserver.py -a<br /></strong></p><p><em><strong>On the client</strong></em></p><p>2) Import pp module:</p><p><strong>&nbsp;&nbsp;&nbsp; import pp</strong></p><p>3)&nbsp; Set ppservers list to auto-discovery: </p><p><strong>&nbsp;&nbsp;&nbsp; ppservers=(&quot;*&quot;,)</strong><br /></p><p>4) Start pp execution server with the number of workers set to&nbsp;the&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system and list of ppservers to connect with :</p><p><strong>&nbsp;&nbsp;&nbsp; job_server = pp.Server(</strong><strong>ppservers=</strong><strong>ppservers</strong><strong>)&nbsp;</strong></p><p>5) Submit all the tasks for parallel execution:</p><p><strong>&nbsp;&nbsp;&nbsp; f1 = job_server.submit(func1, args1, depfuncs1, modules1)</strong></p><p><strong>&nbsp;&nbsp;&nbsp; f2 = job_server.submit(func1, args2, depfuncs1, modules1) </strong></p><p><strong>&nbsp;&nbsp;&nbsp; f3 = job_server.submit(func2, args3, depfuncs2, modules2) </strong><br /> </p><p>&nbsp;&nbsp; ...etc...<br /></p><p>6) Retrieve the results as needed:</p><p><strong>&nbsp;&nbsp;&nbsp; r1 = f1()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r2 = f2()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r3 = f3()&nbsp;</strong> </p><p>&nbsp;&nbsp;&nbsp; ...etc...</p><p>&nbsp;To find out how to achieve efficient parallelization with pp please take a look at <a href="http://www.parallelpython.com/content/view/17/31/" title="Parallel Python Implementation Examples">examples</a>&nbsp; </p><hr /><h1 id="ADVANCEDCLUSTERS">&nbsp;&nbsp;&nbsp; Advanced guide, clusters&nbsp; </h1> <p><em><strong>On the nodes</strong></em> &nbsp;</p><p>1) Start parallel python execution server on all your remote computational nodes (listen to a given port 35000,<br /> and local network interface only, accept only connections which know correct secret):</p><p><strong>&nbsp;&nbsp;&nbsp; node-1&gt; ./ppserver.py -p 35000 -i 192.168.0.101 -s &quot;mysecret&quot;<br /></strong></p><p><strong>&nbsp;&nbsp;&nbsp; node-2&gt; ./ppserver.py -p 35000 -i 192.168.0.102</strong><strong> -s &quot;mysecret&quot;</strong></p><p><strong>&nbsp;&nbsp;&nbsp; node-3&gt; ./ppserver.py -p 35000 -i 192.168.0.103</strong><strong> -s &quot;mysecret&quot;</strong></p><p><em><strong>On the client</strong></em> <br /></p> <p>2) Import pp module:</p><p><strong>&nbsp;&nbsp;&nbsp; import pp</strong></p><p>3)&nbsp; Create a list of all the nodes in your cluster (computers where you&#39;ve run ppserver.py) </p><p><strong>&nbsp;&nbsp;&nbsp; ppservers=(&quot;node-1:35000&quot;, &quot;node-2:</strong><strong>35000</strong><strong>&quot;, &quot;node-3:</strong><strong>35000</strong><strong>&quot;)</strong><br /></p><p>4) Start pp execution server with the number of workers set to&nbsp;the&nbsp;number&nbsp;of&nbsp;processors&nbsp;in&nbsp;the&nbsp;system, <br />list of ppservers to connect with and secret key to authorize the connection:</p><p><strong>&nbsp;&nbsp;&nbsp; job_server = pp.Server(</strong><strong>ppservers=</strong><strong>ppservers</strong><strong>, secret=&quot;</strong><strong>mysecret</strong><strong>&quot;)&nbsp;</strong></p><p>5) Submit all the tasks for parallel execution:</p><p><strong>&nbsp;&nbsp;&nbsp; f1 = job_server.submit(func1, args1, depfuncs1, modules1)</strong></p><p><strong>&nbsp;&nbsp;&nbsp; f2 = job_server.submit(func1, args2, depfuncs1, modules1) </strong></p><p><strong>&nbsp;&nbsp;&nbsp; f3 = job_server.submit(func2, args3, depfuncs2, modules2) </strong><br /> </p><p>&nbsp;&nbsp; ...etc...<br /></p><p>6) Retrieve the results as needed:</p><p><strong>&nbsp;&nbsp;&nbsp; r1 = f1()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r2 = f2()</strong></p><p><strong>&nbsp;&nbsp;&nbsp; r3 = f3()&nbsp;</strong> </p><p>&nbsp;&nbsp;&nbsp; ...etc...</p><p>&nbsp;7) Print the execution statistics:<br /></p><p><strong>&nbsp;&nbsp;&nbsp; job_server.print_stats()</strong></p><p>To find out how to achieve efficient parallelization with pp please take a look at <a href="http://www.parallelpython.com/content/view/17/31/" title="Parallel Python Implementation Examples">examples</a> </p><hr /><h1 id="COMMANDLINE">&nbsp; Command line options, ppserver.py </h1> 
-<pre>
-Usage: ppserver.py [-hdar] [-f format] [-n proto] [-c config_path] [-i interface] [-b broadcast] [-p port] [-w nworkers] [-s secret] [-t seconds]
-
-Options:
--h                 : this help message
--d                 : set log level to debug
--f format          : log format
--a                 : enable auto-discovery service
--r                 : restart worker process after each task completion
--n proto           : protocol number for pickle module
--c path            : path to config file
--i interface       : interface to listen
--b broadcast       : broadcast address for auto-discovery service
--p port            : port to listen
--w nworkers        : number of workers to start
--s secret          : secret for authentication
--t seconds         : timeout to exit if no connections with clients exist
-</pre>
-<hr /><h1 id="COMMANDLINE">&nbsp; Security and secret key<a name="SECURITY" title="SECURITY"></a></h1><p>&nbsp;Due to the security concerns it is highly recommended to run ppserver.py with an non-trivial secret key (-s command line argument) which should be paired with the matching <em>secret</em> keyword of PP Server class constructor. Since PP 1.5.3 it is possible to set secret key by assigning <strong>pp_secret</strong> variable in the configuration file <strong>.pythonrc.py</strong> which should be located in the user home directory (please make this file readable and writable only by user). The secret key set in .pythonrc.py could be overridden by command line argument (for ppserver.py) and <em>secret</em> keyword (for PP Server class constructor). </p>
-
-			</td>
-		</tr>
-				</table>
-		
-		<span class="article_seperator">&nbsp;</span>
-		
-				</div>
-	</div>
-						<div class="footer">
-						
-<div align="center">
-<font color="gray">Parallel Python Solutions | Parallel Python Forums | Parallel Python Community</font>
-</div>
-<br/>
-<div align="center">
-<font color="gray">Parallel Python	&copy; 2005 - 2009 Vitalii Vanovschi. All rights reserved</font>
-</div>
-		</div>
-				</div>
-</div>
-</body>
-</html><!-- 1202744006 -->

pp/doc/ppserver.1

-.\" It was generated by help2man 1.36.
-.TH PPSERVER "1" "February 2010" "Parallel Python Network Server" "User Commands"
-.SH NAME
-ppserver \- manual page for Parallel Python Network Server
-.SH SYNOPSIS
-.B ppserver
-[\fI-hda\fR] [\fI-i interface\fR] [\fI-b broadcast\fR] [\fI-p port\fR] [\fI-w nworkers\fR] [\fI-s secret\fR] [\fI-t seconds\fR]
-.SH DESCRIPTION
-Parallel Python Network Server
-.SH OPTIONS
-.TP
-\fB\-h\fR
-this help message
-.TP
-\fB\-d\fR
-debug
-.TP
-\fB\-a\fR
-enable auto\-discovery service
-.TP
-\fB\-r\fR
-restart worker process after each task completion
-.TP
-\fB\-n\fR <proto>
-protocol number for pickle module
-.TP
-\fB\-c\fR <path>
-path to config file
-.TP
-\fB\-i\fR <interface>
-interface to listen
-.TP
-\fB\-b\fR <broadcast>
-broadcast address for auto\-discovery service
-.TP
-\fB\-p\fR <port>
-port to listen
-.TP
-\fB\-w\fR <nworkers>
-number of workers to start
-.TP
-\fB\-s\fR <secret>
-secret for authentication
-.TP
-\fB\-t\fR <seconds>
-timeout to exit if no connections with clients exist
-.PP
-Please visit http://www.parallelpython.com for extended up\-to\-date
-documentation, examples and support forums
-.br
-.SH SECURITY
-Due to the security concerns it is highly recommended to run ppserver.py with an non-trivial secret key (-s command line argument) which should be paired with the matching secret keyword of PP Server class constructor. An alternative way to set a secret key is by assigning 
-.B pp_secret 
-variable in the configuration file 
-.B .pythonrc.py 
-which should be located in the user home directory (please make this file readable and writable only by user). 
-The secret key set in .pythonrc.py could be overridden by command line argument (for ppserver.py) and secret keyword (for PP Server class constructor).
-.SH AUTHOR
-This manual page was written by Sandro Tosi <matrixhasu@gmail.com>, 
-and Vitalii Vanovschi support@parallelpython.com

pp/pp.py

-# Parallel Python Software: http://www.parallelpython.com
-# Copyright (c) 2005-2011, Vitalii Vanovschi
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#    * Redistributions of source code must retain the above copyright notice,
-#      this list of conditions and the following disclaimer.
-#    * Redistributions in binary form must reproduce the above copyright
-#      notice, this list of conditions and the following disclaimer in the
-#      documentation and/or other materials provided with the distribution.
-#    * Neither the name of the author nor the names of its contributors
-#      may be used to endorse or promote products derived from this software
-#      without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
-# THE POSSIBILITY OF SUCH DAMAGE.
-"""
-Parallel Python Software, Execution Server
-
-http://www.parallelpython.com - updates, documentation, examples and support
-forums
-"""
-
-import os
-import threading
-import logging
-import inspect
-import sys
-import types
-import time
-import atexit
-import user
-import cPickle as pickle
-import pptransport
-import ppauto
-import ppcommon
-
-copyright = "Copyright (c) 2005-2011 Vitalii Vanovschi. All rights reserved"
-version = "1.6.1"
-
-# Reconnect persistent rworkers in seconds.
-RECONNECT_WAIT_TIME = 5
-
-# Set timeout for socket operations in seconds.
-SOCKET_TIMEOUT = 5
-
-# If set to true prints out the exceptions which are expected.
-SHOW_EXPECTED_EXCEPTIONS = False
-
-# we need to have set even in Python 2.3
-try:
-    set
-except NameError:
-    from sets import Set as set 
-
-_USE_SUBPROCESS = False
-try:
-    import subprocess
-    _USE_SUBPROCESS = True
-except ImportError:
-    import popen2
-
-class _Task(object):
-    """Class describing single task (job)
-    """
-
-    def __init__(self, server, tid, callback=None,
-            callbackargs=(), group='default'):
-        """Initializes the task"""
-        self.lock = threading.Lock()
-        self.lock.acquire()
-        self.tid = tid
-        self.server = server
-        self.callback = callback
-        self.callbackargs = callbackargs
-        self.group = group
-        self.finished = False
-        self.unpickled = False
-
-    def finalize(self, sresult):
-        """Finalizes the task.
-
-           For internal use only"""
-        self.sresult = sresult
-        if self.callback:
-            self.__unpickle()
-        self.lock.release()
-        self.finished = True
-
-    def __call__(self, raw_result=False):
-        """Retrieves result of the task"""
-        if not self.finished and self.server._exiting:
-            raise DestroyedServerError("Server was destroyed before the job completion")
-        self.wait()
-
-        if not self.unpickled and not raw_result:
-            self.__unpickle()
-
-        if raw_result:
-            return self.sresult
-        else:
-            return self.result
-
-    def wait(self):
-        """Waits for the task"""
-        if not self.finished:
-            self.lock.acquire()
-            self.lock.release()
-
-    def __unpickle(self):
-        """Unpickles the result of the task"""
-        self.result, sout = pickle.loads(self.sresult)
-        self.unpickled = True
-        if len(sout) > 0:
-            print sout,
-        if self.callback:
-            args = self.callbackargs + (self.result, )
-            self.callback(*args)
-
-
-class _Worker(object):
-    """Local worker class
-    """
-    command = [sys.executable, "-u",
-            os.path.dirname(os.path.abspath(__file__))
-            + os.sep + "ppworker.py"]
-
-    command.append("2>/dev/null")
-
-    def __init__(self, restart_on_free, pickle_proto):
-        """Initializes local worker"""
-        self.restart_on_free = restart_on_free
-        self.pickle_proto = pickle_proto
-        self.start()
-
-    def start(self):
-        """Starts local worker"""
-        if _USE_SUBPROCESS:
-            proc = subprocess.Popen(self.command, stdin=subprocess.PIPE, \
-                    stdout=subprocess.PIPE, stderr=subprocess.PIPE, \
-                    shell=False)
-            self.t = pptransport.CPipeTransport(proc.stdout, proc.stdin)
-        else:
-            self.t = pptransport.CPipeTransport(\
-                    *popen2.popen3(self.command)[:2])
-
-        self.pid = int(self.t.receive())
-        self.t.send(str(self.pickle_proto))
-        self.is_free = True
-
-    def stop(self):
-        """Stops local worker"""
-        self.is_free = False
-        self.t.send('EXIT') # can send any string - it will exit
-        self.t.close()
-
-
-    def restart(self):
-        """Restarts local worker"""
-        self.stop()
-        self.start()
-
-    def free(self):
-        """Frees local worker"""
-        if self.restart_on_free:
-            self.restart()
-        else:
-            self.is_free = True
-
-
-class _RWorker(pptransport.CSocketTransport):
-    """Remote worker class
-    """
-
-    def __init__(self, host, port, secret, server,  message=None, persistent=True):
-        """Initializes remote worker"""
-        self.server = server
-        self.persistent = persistent
-        self.host = host
-        self.port = port
-        self.secret = secret
-        self.address = (host, port)
-        self.id = host + ":" + str(port)
-        self.server.logger.debug("Creating Rworker id=%s persistent=%s"
-                % (self.id, persistent))
-        self.connect(message)
-
-    def __del__(self):
-        """Closes connection with remote server"""
-        self.close()
-
-    def connect(self, message=None):
-        """Connects to a remote server"""
-        while True and not self.server._exiting:
-            try:
-                pptransport.SocketTransport.__init__(self)                
-                self._connect(self.host, self.port)                
-                if not self.authenticate(self.secret):
-                    self.server.logger.error("Authentication failed for host=%s, port=%s"
-                            % (self.host, self.port))
-                    return False
-                if message:
-                    self.send(message)
-                self.is_free = True
-                return True
-            except:
-                if SHOW_EXPECTED_EXCEPTIONS:
-                    self.server.logger.debug("Exception in connect method "
-                            "(possibly expected)", exc_info=True)
-                if not self.persistent:
-                    self.server.logger.debug("Deleting from queue Rworker %s"
-                            % (self.id, ))
-                    return False
-                self.server.logger.info("Failed to reconnect with " \
-                        "(host=%s, port=%i), will try again in %i s"
-                        % (self.host, self.port, RECONNECT_WAIT_TIME))
-                time.sleep(RECONNECT_WAIT_TIME)
-
-
-class _Statistics(object):
-    """Class to hold execution statisitcs for a single node
-    """
-
-    def __init__(self, ncpus, rworker=None):
-        """Initializes statistics for a node"""
-        self.ncpus = ncpus
-        self.time = 0.0
-        self.njobs = 0
-        self.rworker = rworker
-
-
-class Template(object):
-    """Template class
-    """
-
-    def __init__(self, job_server, func, depfuncs=(), modules=(),
-            callback=None, callbackargs=(), group='default', globals=None):
-        """Creates Template instance
-
-           jobs_server - pp server for submitting jobs
-           func - function to be executed
-           depfuncs - tuple with functions which might be called from 'func'
-           modules - tuple with module names to import
-           callback - callback function which will be called with argument
-                   list equal to callbackargs+(result,)
-                   as soon as calculation is done
-           callbackargs - additional arguments for callback function
-           group - job group, is used when wait(group) is called to wait for
-           jobs in a given group to finish
-           globals - dictionary from which all modules, functions and classes
-           will be imported, for instance: globals=globals()"""
-        self.job_server = job_server
-        self.func = func
-        self.depfuncs = depfuncs
-        self.modules = modules
-        self.callback = callback
-        self.callbackargs = callbackargs
-        self.group = group
-        self.globals = globals
-
-    def submit(self, *args):
-        """Submits function with *arg arguments to the execution queue
-        """
-        return self.job_server.submit(self.func, args, self.depfuncs,
-                self.modules, self.callback, self.callbackargs,
-                self.group, self.globals)
-
-
-class Server(object):
-    """Parallel Python SMP execution server class
-    """
-
-    default_port = 60000
-    default_secret = "epo20pdosl;dksldkmm"
-
-    def __init__(self, ncpus="autodetect", ppservers=(), secret=None,
-            restart=False, proto=2):
-        """Creates Server instance
-
-           ncpus - the number of worker processes to start on the local
-                   computer, if parameter is omitted it will be set to
-                   the number of processors in the system
-           ppservers - list of active parallel python execution servers
-                   to connect with
-           secret - passphrase for network connections, if omitted a default
-                   passphrase will be used. It's highly recommended to use a
-                   custom passphrase for all network connections.
-           restart - whether to restart worker process after each task completion
-           proto - protocol number for pickle module
-
-           With ncpus = 1 all tasks are executed consequently
-           For the best performance either use the default "autodetect" value
-           or set ncpus to the total number of processors in the system
-        """
-
-        if not isinstance(ppservers, tuple):
-            raise TypeError("ppservers argument must be a tuple")
-
-        self.logger = logging.getLogger('pp')
-        self.logger.info("Creating server instance (pp-" + version+")")
-        self.logger.info("Running on Python %s %s", sys.version.split(" ")[0],
-                sys.platform)
-        self.__tid = 0
-        self.__active_tasks = 0
-        self.__active_tasks_lock = threading.Lock()
-        self.__queue = []
-        self.__queue_lock = threading.Lock()
-        self.__workers = []
-        self.__rworkers = []
-        self.__rworkers_reserved = []
-        self.__sourcesHM = {}
-        self.__sfuncHM = {}
-        self.__waittasks = []
-        self.__waittasks_lock = threading.Lock()
-        self._exiting = False
-        self.__accurate_stats = True
-        self.autopp_list = {}
-        self.__active_rworkers_list_lock = threading.Lock()
-        self.__restart_on_free = restart
-        self.__pickle_proto = proto
-
-        # add local directory and sys.path to PYTHONPATH
-        pythondirs = [os.getcwd()] + sys.path
-
-        if "PYTHONPATH" in os.environ and os.environ["PYTHONPATH"]:
-            pythondirs += os.environ["PYTHONPATH"].split(os.pathsep)
-        os.environ["PYTHONPATH"] = os.pathsep.join(set(pythondirs))
-
-        atexit.register(self.destroy)
-        self.__stats = {"local": _Statistics(0)}
-        self.set_ncpus(ncpus)
-
-        self.ppservers = []
-        self.auto_ppservers = []
-
-        for ppserver in ppservers:
-            ppserver = ppserver.split(":")
-            host = ppserver[0]
-            if len(ppserver)>1:
-                port = int(ppserver[1])
-            else:
-                port = Server.default_port
-            if host.find("*") == -1:
-                self.ppservers.append((host, port))
-            else:
-                if host == "*":
-                    host = "*.*.*.*"
-                interface = host.replace("*", "0")
-                broadcast = host.replace("*", "255")
-                self.auto_ppservers.append(((interface, port),
-                        (broadcast, port)))
-        self.__stats_lock = threading.Lock()
-        if secret is not None:
-            if not isinstance(secret, types.StringType):
-                raise TypeError("secret must be of a string type")
-            self.secret = str(secret)
-        elif hasattr(user, "pp_secret"):
-            secret = getattr(user, "pp_secret")
-            if not isinstance(secret, types.StringType):
-                raise TypeError("secret must be of a string type")
-            self.secret = str(secret)
-        else:
-            self.secret = Server.default_secret
-        self.__connect()
-        self.__creation_time = time.time()
-        self.logger.info("pp local server started with %d workers"
-                % (self.__ncpus, ))
-
-    def submit(self, func, args=(), depfuncs=(), modules=(),
-            callback=None, callbackargs=(), group='default', globals=None):
-        """Submits function to the execution queue
-
-            func - function to be executed
-            args - tuple with arguments of the 'func'
-            depfuncs - tuple with functions which might be called from 'func'
-            modules - tuple with module names to import
-            callback - callback function which will be called with argument
-                    list equal to callbackargs+(result,)
-                    as soon as calculation is done
-            callbackargs - additional arguments for callback function
-            group - job group, is used when wait(group) is called to wait for
-            jobs in a given group to finish
-            globals - dictionary from which all modules, functions and classes
-            will be imported, for instance: globals=globals()
-        """
-
-        # perform some checks for frequent mistakes
-        if self._exiting:
-            raise DestroyedServerError("Cannot submit jobs: server"\
-                    " instance has been destroyed")
-
-        if not isinstance(args, tuple):
-            raise TypeError("args argument must be a tuple")
-
-        if not isinstance(depfuncs, tuple):
-            raise TypeError("depfuncs argument must be a tuple")
-
-        if not isinstance(modules, tuple):
-            raise TypeError("modules argument must be a tuple")
-
-        if not isinstance(callbackargs, tuple):
-            raise TypeError("callbackargs argument must be a tuple")
-
-        if globals is not None and not isinstance(globals, dict):
-            raise TypeError("globals argument must be a dictionary")
-
-        for module in modules:
-            if not isinstance(module, types.StringType):
-                raise TypeError("modules argument must be a list of strings")
-
-        tid = self.__gentid()
-
-        if globals:
-            modules += tuple(self.__find_modules("", globals))
-            modules = tuple(set(modules))
-            self.logger.debug("Task %i will autoimport next modules: %s" %
-                    (tid, str(modules)))
-            for object1 in globals.values():
-                if isinstance(object1, types.FunctionType) \
-                        or isinstance(object1, types.ClassType):
-                    depfuncs += (object1, )
-
-        task = _Task(self, tid, callback, callbackargs, group)
-
-        self.__waittasks_lock.acquire()
-        self.__waittasks.append(task)
-        self.__waittasks_lock.release()
-
-        # if the function is a method of a class add self to the arguments list
-        if isinstance(func, types.MethodType) and func.im_self is not None:
-            args = (func.im_self, ) + args
-
-        # if there is an instance of a user deined class in the arguments add
-        # whole class to dependancies
-        for arg in args:
-            # Checks for both classic or new class instances
-            if isinstance(arg, types.InstanceType) \
-                    or str(type(arg))[:6] == "<class":
-                # do not include source for imported modules
-                if ppcommon.is_not_imported(arg, modules):
-                    depfuncs += tuple(ppcommon.get_class_hierarchy(arg.__class__))
-
-        # if there is a function in the arguments add this
-        # function to dependancies
-        for arg in args:
-            if isinstance(arg, types.FunctionType):
-                depfuncs += (arg, )
-
-        sfunc = self.__dumpsfunc((func, ) + depfuncs, modules)
-        sargs = pickle.dumps(args, self.__pickle_proto)
-
-        self.__queue_lock.acquire()
-        self.__queue.append((task, sfunc, sargs))
-        self.__queue_lock.release()
-
-        self.logger.debug("Task %i submited, function='%s'" %
-                (tid, func.func_name))
-        self.__scheduler()
-        return task
-
-    def wait(self, group=None):
-        """Waits for all jobs in a given group to finish.
-           If group is omitted waits for all jobs to finish
-        """
-        while True:
-            self.__waittasks_lock.acquire()
-            for task in self.__waittasks:
-                if not group or task.group == group:
-                    self.__waittasks_lock.release()
-                    task.wait()
-                    break
-            else:
-                self.__waittasks_lock.release()
-                break
-
-    def get_ncpus(self):
-        """Returns the number of local worker processes (ppworkers)"""
-        return self.__ncpus
-
-    def set_ncpus(self, ncpus="autodetect"):
-        """Sets the number of local worker processes (ppworkers)
-
-        ncpus - the number of worker processes, if parammeter is omitted
-                it will be set to the number of processors in the system"""
-        if ncpus == "autodetect":
-            ncpus = self.__detect_ncpus()
-        if not isinstance(ncpus, int):
-            raise TypeError("ncpus must have 'int' type")
-        if ncpus < 0:
-            raise ValueError("ncpus must be an integer > 0")
-        if ncpus > len(self.__workers):
-            self.__workers.extend([_Worker(self.__restart_on_free, 
-                    self.__pickle_proto) for x in\
-                    range(ncpus - len(self.__workers))])
-        self.__stats["local"].ncpus = ncpus
-        self.__ncpus = ncpus
-
-    def get_active_nodes(self):
-        """Returns active nodes as a dictionary
-        [keys - nodes, values - ncpus]"""
-        active_nodes = {}
-        for node, stat in self.__stats.items():
-            if node == "local" or node in self.autopp_list \
-                    and self.autopp_list[node]:
-                active_nodes[node] = stat.ncpus
-        return active_nodes
-
-    def get_stats(self):
-        """Returns job execution statistics as a dictionary"""
-        for node, stat in self.__stats.items():
-            if stat.rworker:
-                try:
-                    stat.rworker.send("TIME")
-                    stat.time = float(stat.rworker.receive())
-                except:
-                    self.__accurate_stats = False
-                    stat.time = 0.0
-        return self.__stats
-
-    def print_stats(self):
-        """Prints job execution statistics. Useful for benchmarking on
-           clusters"""
-
-        print "Job execution statistics:"
-        walltime = time.time() - self.__creation_time
-        statistics = self.get_stats().items()
-        totaljobs = 0.0
-        for ppserver, stat in statistics:
-            totaljobs += stat.njobs
-        print " job count | % of all jobs | job time sum | " \
-                "time per job | job server"
-        for ppserver, stat in statistics:
-            if stat.njobs:
-                print "    %6i |        %6.2f |     %8.4f |  %11.6f | %s" \
-                        % (stat.njobs, 100.0*stat.njobs/totaljobs, stat.time,
-                        stat.time/stat.njobs, ppserver, )
-        print "Time elapsed since server creation", walltime
-
-        if not self.__accurate_stats:
-            print "WARNING: statistics provided above is not accurate" \
-                  " due to job rescheduling"
-        print
-
-    # all methods below are for internal use only
-
-    def insert(self, sfunc, sargs, task=None):
-        """Inserts function into the execution queue. It's intended for
-           internal use only (ppserver.py).
-        """
-        if not task:
-            tid = self.__gentid()
-            task = _Task(self, tid)
-        self.__queue_lock.acquire()
-        self.__queue.append((task, sfunc, sargs))
-        self.__queue_lock.release()
-
-        self.logger.debug("Task %i inserted" % (task.tid, ))
-        self.__scheduler()
-        return task
-
-    def connect1(self, host, port, persistent=True):
-        """Conects to a remote ppserver specified by host and port"""        
-        try:
-            rworker = _RWorker(host, port, self.secret, self, "STAT", persistent)
-            ncpus = int(rworker.receive())
-            hostid = host+":"+str(port)
-            self.__stats[hostid] = _Statistics(ncpus, rworker)
-
-            for x in range(ncpus):
-                rworker = _RWorker(host, port, self.secret, self, "EXEC", persistent)
-                self.__update_active_rworkers(rworker.id, 1)
-                # append is atomic - no need to lock self.__rworkers
-                self.__rworkers.append(rworker)
-            #creating reserved rworkers
-            for x in range(ncpus):
-                rworker = _RWorker(host, port, self.secret, self, "EXEC", persistent)
-                self.__update_active_rworkers(rworker.id, 1)
-                self.__rworkers_reserved.append(rworker)
-            self.logger.debug("Connected to ppserver (host=%s, port=%i) \
-                    with %i workers" % (host, port, ncpus))
-            self.__scheduler()
-        except:
-            if SHOW_EXPECTED_EXCEPTIONS:
-                self.logger.debug("Exception in connect1 method (possibly expected)", exc_info=True)
-
-    def __connect(self):
-        """Connects to all remote ppservers"""
-        for ppserver in self.ppservers:
-            ppcommon.start_thread("connect1",  self.connect1, ppserver)
-
-        self.discover = ppauto.Discover(self, True)
-        for ppserver in self.auto_ppservers:
-            ppcommon.start_thread("discover.run", self.discover.run, ppserver)
-
-    def __detect_ncpus(self):
-        """Detects the number of effective CPUs in the system"""
-        #for Linux, Unix and MacOS
-        if hasattr(os, "sysconf"):
-            if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
-                #Linux and Unix
-                ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
-                if isinstance(ncpus, int) and ncpus > 0:
-                    return ncpus
-            else:
-                #MacOS X
-                return int(os.popen2("sysctl -n hw.ncpu")[1].read())
-        #for Windows
-        if "NUMBER_OF_PROCESSORS" in os.environ:
-            ncpus = int(os.environ["NUMBER_OF_PROCESSORS"])
-            if ncpus > 0:
-                return ncpus
-        #return the default value
-        return 1
-
-
-    def __dumpsfunc(self, funcs, modules):
-        """Serializes functions and modules"""
-        hashs = hash(funcs + modules)
-        if hashs not in self.__sfuncHM:
-            sources = [self.__get_source(func) for func in funcs]
-            self.__sfuncHM[hashs] = pickle.dumps(
-                    (funcs[0].func_name, sources, modules),
-                    self.__pickle_proto)
-        return self.__sfuncHM[hashs]
-
-    def __find_modules(self, prefix, dict):
-        """recursively finds all the modules in dict"""
-        modules = []
-        for name, object in dict.items():
-            if isinstance(object, types.ModuleType) \
-                    and name not in ("__builtins__", "pp"):
-                if object.__name__ == prefix+name or prefix == "":
-                    modules.append(object.__name__)
-                    modules.extend(self.__find_modules(
-                            object.__name__+".", object.__dict__))
-        return modules
-
-    def __scheduler(self):
-        """Schedules jobs for execution"""
-        self.__queue_lock.acquire()
-        while self.__queue:
-            if self.__active_tasks < self.__ncpus:
-                #TODO: select a job number on the basis of heuristic
-                task = self.__queue.pop(0)
-                for worker in self.__workers:
-                    if worker.is_free:
-                        worker.is_free = False
-                        break
-                else:
-                    self.logger.error("There are no free workers left")
-                    raise RuntimeError("Error: No free workers")
-                self.__add_to_active_tasks(1)
-                try:
-                    self.__stats["local"].njobs += 1
-                    ppcommon.start_thread("run_local",  self._run_local, task+(worker, ))
-                except:
-                    pass
-            else:
-                for rworker in self.__rworkers:
-                    if rworker.is_free:
-                        rworker.is_free = False
-                        task = self.__queue.pop(0)
-                        self.__stats[rworker.id].njobs += 1
-                        ppcommon.start_thread("run_remote",  self._run_remote, task+(rworker, ))
-                        break
-                else:
-                    if len(self.__queue) > self.__ncpus:
-                        for rworker in self.__rworkers_reserved:
-                            if rworker.is_free:
-                                rworker.is_free = False
-                                task = self.__queue.pop(0)
-                                self.__stats[rworker.id].njobs += 1
-                                ppcommon.start_thread("run_remote",  self._run_remote, task+(rworker, ))                                
-                                break
-                        else:
-                                break
-                    else:
-                        break
-
-        self.__queue_lock.release()
-
-    def __get_source(self, func):
-        """Fetches source of the function"""
-        hashf = hash(func)
-        if hashf not in self.__sourcesHM:
-            #get lines of the source and adjust indent
-            sourcelines = inspect.getsourcelines(func)[0]
-            #remove indentation from the first line
-            sourcelines[0] = sourcelines[0].lstrip()
-            self.__sourcesHM[hashf] = "".join(sourcelines)
-        return self.__sourcesHM[hashf]
-
-    def _run_local(self,  job, sfunc, sargs, worker):
-        """Runs a job locally"""
-
-        if self._exiting:
-            return
-        self.logger.info("Task %i started",  job.tid)
-
-        start_time = time.time()
-
-        try:
-            worker.t.csend(sfunc)
-            worker.t.send(sargs)
-            sresult = worker.t.receive()
-        except:
-            if self._exiting:
-                return
-            if SHOW_EXPECTED_EXCEPTIONS:
-                self.logger.debug("Exception in _run_local (possibly expected)", exc_info=True)
-
-        job.finalize(sresult)
-
-        # remove the job from the waiting list
-        if self.__waittasks:
-            self.__waittasks_lock.acquire()
-            self.__waittasks.remove(job)
-            self.__waittasks_lock.release()
-
-        worker.free()
-
-        self.__add_to_active_tasks(-1)
-        if not self._exiting:
-            self.__stat_add_time("local", time.time()-start_time)
-        self.logger.debug("Task %i ended",  job.tid)
-        self.__scheduler()
-
-    def _run_remote(self, job, sfunc, sargs, rworker):
-        """Runs a job remotelly"""
-        self.logger.debug("Task (remote) %i started",  job.tid)
-
-        try:
-            rworker.csend(sfunc)
-            rworker.send(sargs)
-            sresult = rworker.receive()
-            rworker.is_free = True
-        except:
-            self.logger.debug("Task %i failed due to broken network " \
-                    "connection - rescheduling",  job.tid)
-            self.insert(sfunc, sargs, job)
-            self.__scheduler()
-            self.__update_active_rworkers(rworker.id, -1)
-            if rworker.connect("EXEC"):
-                self.__update_active_rworkers(rworker.id, 1)
-                self.__scheduler()
-            return
-
-        job.finalize(sresult)
-
-        # remove the job from the waiting list
-        if self.__waittasks:
-            self.__waittasks_lock.acquire()
-            self.__waittasks.remove(job)
-            self.__waittasks_lock.release()
-
-        self.logger.debug("Task (remote) %i ended",  job.tid)
-        self.__scheduler()
-
-    def __add_to_active_tasks(self, num):
-        """Updates the number of active tasks"""
-        self.__active_tasks_lock.acquire()
-        self.__active_tasks += num
-        self.__active_tasks_lock.release()
-
-    def __stat_add_time(self, node, time_add):
-        """Updates total runtime on the node"""
-        self.__stats_lock.acquire()
-        self.__stats[node].time += time_add
-        self.__stats_lock.release()
-
-    def __stat_add_job(self, node):
-        """Increments job count on the node"""
-        self.__stats_lock.acquire()
-        self.__stats[node].njobs += 1
-        self.__stats_lock.release()
-
-    def __update_active_rworkers(self, id, count):
-        """Updates list of active rworkers"""
-        self.__active_rworkers_list_lock.acquire()
-
-        if id not in self.autopp_list:
-            self.autopp_list[id] = 0
-        self.autopp_list[id] += count
-
-        self.__active_rworkers_list_lock.release()
-
-    def __gentid(self):
-        """Generates a unique job ID number"""
-        self.__tid += 1
-        return self.__tid - 1
-
-    def __del__(self):
-        self._exiting = True
-
-    def destroy(self):
-        """Kills ppworkers and closes open files"""
-        self._exiting = True
-        self.__queue_lock.acquire()
-        self.__queue = []
-        self.__queue_lock.release()
-
-        for worker in self.__workers:
-            try:
-                worker.t.close()
-                if sys.platform.startswith("win"):
-                    os.popen('TASKKILL /PID '+str(worker.pid)+' /F')
-                else:
-                    os.kill(worker.pid, 9)
-                    os.waitpid(worker.pid, 0)
-            except:
-                pass
-
-
-class DestroyedServerError(RuntimeError):
-    pass
-    
-# Parallel Python Software: http://www.parallelpython.com

pp/pp.pyc

Binary file removed.

pp/ppauto.py

-# Parallel Python Software: http://www.parallelpython.com
-# Copyright (c) 2005-2011, Vitalii Vanovschi
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#    * Redistributions of source code must retain the above copyright notice,
-#      this list of conditions and the following disclaimer.
-#    * Redistributions in binary form must reproduce the above copyright
-#      notice, this list of conditions and the following disclaimer in the
-#      documentation and/or other materials provided with the distribution.
-#    * Neither the name of the author nor the names of its contributors
-#      may be used to endorse or promote products derived from this software
-#      without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
-# THE POSSIBILITY OF SUCH DAMAGE.
-"""Parallel Python Software, Auto-Discovery Service
-
-http://www.parallelpython.com - updates, documentation, examples and support
-forums
-"""
-
-import socket
-import sys
-import time
-import threading
-
-import ppcommon
-
-copyright = "Copyright (c) 2005-2011 Vitalii Vanovschi. All rights reserved"
-version = "1.6.1"
-
-# broadcast every 10 sec
-BROADCAST_INTERVAL = 10
-
-
-class Discover(object):
-    """Auto-discovery service class"""
-
-    def __init__(self, base, isclient=False):
-        self.base = base
-        self.hosts = []
-        self.isclient = isclient
-
-    def run(self, interface_addr, broadcast_addr):
-        """Starts auto-discovery"""
-        self.interface_addr = interface_addr
-        self.broadcast_addr = broadcast_addr
-        self.bsocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        self.bsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.bsocket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
-
-        try:
-            self.listen()
-        except:
-            sys.excepthook(*sys.exc_info())
-
-    def broadcast(self):
-        """Sends a broadcast"""
-        if self.isclient:
-            self.base.logger.debug("Client sends initial broadcast to (%s, %i)"
-                    % self.broadcast_addr)
-            self.bsocket.sendto("C", self.broadcast_addr)
-        else:
-            while True:
-                if self.base._exiting:
-                    return
-                self.base.logger.debug("Server sends broadcast to (%s, %i)"
-                        % self.broadcast_addr)
-                self.bsocket.sendto("S", self.broadcast_addr)
-                time.sleep(BROADCAST_INTERVAL)
-                
-
-    def listen(self):
-        """Listens for broadcasts from other clients/servers"""
-        self.base.logger.debug("Listening (%s, %i)" % self.interface_addr)
-        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
-        self.socket.settimeout(5)
-        self.socket.bind(self.interface_addr)
-
-        ppcommon.start_thread("broadcast",  self.broadcast)
-
-        while True:
-            try:
-                if self.base._exiting:
-                    return
-                message, (host, port) = self.socket.recvfrom(1024)
-                remote_address = (host, self.broadcast_addr[1])
-                hostid = host + ":" + str(self.broadcast_addr[1])
-                self.base.logger.debug("Discovered host (%s, %i) message=%c"
-                        % (remote_address + (message[0], )))
-                if not self.base.autopp_list.get(hostid, 0) and self.isclient \
-                        and message[0] == 'S':
-                    self.base.logger.debug("Connecting to host %s" % (hostid, ))
-                    ppcommon.start_thread("ppauto_connect1",  self.base.connect1,
-                            remote_address+(False, ))
-                if not self.isclient and message[0] == 'C':
-                    self.base.logger.debug("Replying to host %s" % (hostid, ))
-                    self.bsocket.sendto("S", self.broadcast_addr)
-            except socket.timeout:
-                pass
-            except:
-                self.base.logger.error("An error has occured during execution of "
-                        "Discover.listen")
-                sys.excepthook(*sys.exc_info())

pp/ppauto.pyc

Binary file removed.

pp/ppcommon.py

-# Parallel Python Software: http://www.parallelpython.com
-# Copyright (c) 2005-2011, Vitalii Vanovschi
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#    * Redistributions of source code must retain the above copyright notice,
-#      this list of conditions and the following disclaimer.
-#    * Redistributions in binary form must reproduce the above copyright
-#      notice, this list of conditions and the following disclaimer in the
-#      documentation and/or other materials provided with the distribution.
-#    * Neither the name of the author nor the names of its contributors
-#      may be used to endorse or promote products derived from this software
-#      without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
-# THE POSSIBILITY OF SUCH DAMAGE.
-"""
-Parallel Python Software, Execution Server
-
-http://www.parallelpython.com - updates, documentation, examples and support
-forums
-"""
-
-import threading
-
-copyright = "Copyright (c) 2005-2011 Vitalii Vanovschi. All rights reserved"
-version = "1.6.1"
-
-def start_thread(name,  target,  args=(),  kwargs={},  daemon=True):
-    """Starts a thread"""
-    thread = threading.Thread(name=name,  target=target, args=args,  kwargs=kwargs)
-    thread.daemon = True
-    thread.start()
-    return thread
-
-
-def get_class_hierarchy(clazz):
-    classes = []
-    if clazz is type(object()):
-        return classes
-    for base_class in clazz.__bases__:
-        classes.extend(get_class_hierarchy(base_class))
-    classes.append(clazz)
-    return classes
-
-
-def is_not_imported(arg, modules):
-    args_module = str(arg.__module__)
-    for module in modules:
-        if args_module == module or args_module.startswith(module + "."):
-            return False
-    return True
-
-# Parallel Python Software: http://www.parallelpython.com

pp/ppcommon.pyc

Binary file removed.

pp/ppserver.py

-#!/usr/bin/env python
-# Parallel Python Software: http://www.parallelpython.com
-# Copyright (c) 2005-2011, Vitalii Vanovschi
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#    * Redistributions of source code must retain the above copyright notice,
-#      this list of conditions and the following disclaimer.
-#    * Redistributions in binary form must reproduce the above copyright
-#      notice, this list of conditions and the following disclaimer in the
-#      documentation and/or other materials provided with the distribution.
-#    * Neither the name of the author nor the names of its contributors
-#      may be used to endorse or promote products derived from this software
-#      without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
-# THE POSSIBILITY OF SUCH DAMAGE.
-"""
-Parallel Python Software, Network Server
-
-http://www.parallelpython.com - updates, documentation, examples and support
-forums
-"""
-
-import logging
-import getopt
-import sys
-import socket
-import threading
-import random
-import string
-import time
-import os
-
-import pp
-import ppauto
-import ppcommon
-import pptransport
-
-copyright = "Copyright (c) 2005-2011 Vitalii Vanovschi. All rights reserved"
-version = "1.6.1"
-
-LISTEN_SOCKET_TIMEOUT = 20
-
-# compatibility with Python 2.6
-try:
-    import hashlib
-    sha_new = hashlib.sha1
-except ImportError:
-    import sha
-    sha_new = sha.new
-
-
-class _NetworkServer(pp.Server):
-    """Network Server Class
-    """
-
-    def __init__(self, ncpus="autodetect", interface="0.0.0.0",
-                broadcast="255.255.255.255", port=None, secret=None,
-                timeout=None, restart=False, proto=2):
-        pp.Server.__init__(self, ncpus, secret=secret, restart=restart,
-                proto=proto)
-        self.host = interface
-        self.bcast = broadcast
-        if port is not None:
-            self.port = port
-        else:
-            self.port = self.default_port
-        self.timeout = timeout
-        self.ncon = 0
-        self.last_con_time = time.time()
-        self.ncon_lock = threading.Lock()
-
-        self.logger.debug("Strarting network server interface=%s port=%i"
-                % (self.host, self.port))
-        if self.timeout is not None:
-            self.logger.debug("ppserver will exit in %i seconds if no "\
-                    "connections with clients exist" % (self.timeout))
-            ppcommon.start_thread("timeout_check",  self.check_timeout)
-
-    def ncon_add(self, val):
-        """Keeps track of the number of connections and time of the last one"""
-        self.ncon_lock.acquire()
-        self.ncon += val
-        self.last_con_time = time.time()
-        self.ncon_lock.release()
-
-    def check_timeout(self):
-        """Checks if timeout happened and shutdowns server if it did"""
-        while True:
-            if self.ncon == 0:
-                idle_time = time.time() - self.last_con_time
-                if idle_time < self.timeout:
-                    time.sleep(self.timeout - idle_time)
-                else:
-                    self.logger.debug("exiting ppserver due to timeout (no client"\
-                            " connections in last %i sec)", self.timeout)
-                    os._exit(0)
-            else:
-                time.sleep(self.timeout)
-
-    def listen(self):
-        """Initiates listenting to incoming connections"""
-        try:
-            self.ssocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            # following allows ppserver to restart faster on the same port
-            self.ssocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-            self.ssocket.settimeout(LISTEN_SOCKET_TIMEOUT)
-            self.ssocket.bind((self.host, self.port))
-            self.ssocket.listen(5)
-        except socket.error, e:
-            self.logger.error("Cannot create socket for %s:%s, %s", self.host, self.port, e)
-
-        try:
-            while 1:
-                csocket = None
-                #accept connections from outside
-                try:
-                    (csocket, address) = self.ssocket.accept()
-                except socket.timeout:
-                    pass
-                if self._exiting:
-                    return                
-                #now do something with the clientsocket
-                #in this case, we'll pretend this is a threaded server
-                if csocket:
-                    ppcommon.start_thread("client_socket",  self.crun, (csocket,  ))
-        except:
-            if pp.SHOW_EXPECTED_EXCEPTIONS:
-                self.logger.debug("Exception in listen method (possibly expected)", exc_info=True)
-            self.logger.debug("Closing server socket")
-            self.ssocket.close()            
-
-    def crun(self, csocket):
-        """Authenticates client and handles its jobs"""
-        mysocket = pptransport.CSocketTransport(csocket)
-        #send PP version
-        mysocket.send(version)
-        #generate a random string
-        srandom = "".join([random.choice(string.ascii_letters)
-                for i in xrange(16)])
-        mysocket.send(srandom)
-        answer = sha_new(srandom+self.secret).hexdigest()
-        clientanswer = mysocket.receive()
-        if answer != clientanswer:
-            self.logger.warning("Authentication failed, client host=%s, port=%i"
-                    % csocket.getpeername())
-            mysocket.send("FAILED")
-            csocket.close()
-            return
-        else:
-            mysocket.send("OK")
-
-        ctype = mysocket.receive()
-        self.logger.debug("Control message received: " + ctype)
-        self.ncon_add(1)
-        try:
-            if ctype == "STAT":
-                #reset time at each new connection
-                self.get_stats()["local"].time = 0.0
-                mysocket.send(str(self.get_ncpus()))
-                while 1:
-                    mysocket.receive()
-                    mysocket.send(str(self.get_stats()["local"].time))
-            elif ctype=="EXEC":
-                while 1:
-                    sfunc = mysocket.creceive()
-                    sargs = mysocket.receive()
-                    fun = self.insert(sfunc, sargs)
-                    sresult = fun(True)
-                    mysocket.send(sresult)
-        except:
-            if self._exiting:
-                return
-            if pp.SHOW_EXPECTED_EXCEPTIONS:
-                self.logger.debug("Exception in crun method (possibly expected)", exc_info=True)
-            self.logger.debug("Closing client socket")
-            csocket.close()
-            self.ncon_add(-1)
-
-    def broadcast(self):
-        """Initiaates auto-discovery mechanism"""
-        discover = ppauto.Discover(self)
-        ppcommon.start_thread("server_broadcast",  discover.run,
-                ((self.host, self.port), (self.bcast, self.port)))
-
-
-def parse_config(file_loc):
-    """
-    Parses a config file in a very forgiving way.
-    """
-    # If we don't have configobj installed then let the user know and exit
-    try:
-        from configobj import ConfigObj
-    except ImportError, ie:
-        print >> sys.stderr, ("ERROR: You must have config obj installed to use"
-                "configuration files. You can still use command line switches.")
-        sys.exit(1)
-
-    if not os.access(file_loc, os.F_OK):
-        print >> sys.stderr, "ERROR: Can not access %s." % arg
-        sys.exit(1)
-
-    # Load the configuration file
-    config = ConfigObj(file_loc)
-    # try each config item and use the result if it exists. If it doesn't
-    # then simply pass and move along
-    try:
-        args['secret'] = config['general'].get('secret')
-    except:
-        pass
-
-    try:
-        autodiscovery = config['network'].as_bool('autodiscovery')
-    except:
-        pass
-
-    try:
-        args['interface'] = config['network'].get('interface',
-                                                  default="0.0.0.0")
-    except:
-        pass
-
-    try:
-        args['broadcast'] = config['network'].get('broadcast')
-    except:
-        pass
-
-    try:
-        args['port'] = config['network'].as_int('port')
-    except:
-        pass
-
-    try:
-        args['loglevel'] = config['general'].as_bool('debug')
-    except:
-        pass
-
-    try:
-        args['ncpus'] = config['general'].as_int('workers')
-    except:
-        pass
-
-    try:
-        args['proto'] = config['general'].as_int('proto')
-    except:
-        pass
-
-    try:
-        args['restart'] = config['general'].as_bool('restart')
-    except:
-        pass
-
-    try:
-        args['timeout'] = config['network'].as_int('timeout')
-    except:
-        pass
-    # Return a tuple of the args dict and autodiscovery variable
-    return args, autodiscovery
-
-
-def print_usage():
-    """Prints help"""
-    print "Parallel Python Network Server (pp-" + version + ")"
-    print "Usage: ppserver.py [-hdar] [-f format] [-n proto]"\
-            " [-c config_path] [-i interface] [-b broadcast]"\
-            " [-p port] [-w nworkers] [-s secret] [-t seconds]"
-    print
-    print "Options: "
-    print "-h                 : this help message"
-    print "-d                 : set log level to debug"
-    print "-f format          : log format"
-    print "-a                 : enable auto-discovery service"
-    print "-r                 : restart worker process after each"\
-            " task completion"
-    print "-n proto           : protocol number for pickle module"
-    print "-c path            : path to config file"
-    print "-i interface       : interface to listen"
-    print "-b broadcast       : broadcast address for auto-discovery service"
-    print "-p port            : port to listen"
-    print "-w nworkers        : number of workers to start"
-    print "-s secret          : secret for authentication"
-    print "-t seconds         : timeout to exit if no connections with "\
-            "clients exist"
-    print
-    print "Due to the security concerns always use a non-trivial secret key."
-    print "Secret key set by -s switch will override secret key assigned by"
-    print "pp_secret variable in .pythonrc.py"
-    print
-    print "Please visit http://www.parallelpython.com for extended up-to-date"
-    print "documentation, examples and support forums"
-
-
-def create_network_server(argv):
-    try:
-        opts, args = getopt.getopt(argv, "hdarn:c:b:i:p:w:s:t:f:", ["help"])
-    except getopt.GetoptError:
-        print_usage()
-        raise
-
-    args = {}
-    autodiscovery = False
-
-    log_level = logging.WARNING
-    log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
-
-    for opt, arg in opts:
-        if opt in ("-h", "--help"):
-            print_usage()
-            sys.exit()
-        elif opt == "-c":
-            args, autodiscovery = parse_config(arg)
-        elif opt == "-d":
-            log_level = logging.DEBUG
-            pp.SHOW_EXPECTED_EXCEPTIONS = True
-        elif opt == "-f":
-            log_format = arg
-        elif opt == "-i":
-            args["interface"] = arg
-        elif opt == "-s":
-            args["secret"] = arg
-        elif opt == "-p":
-            args["port"] = int(arg)
-        elif opt == "-w":
-            args["ncpus"] = int(arg)
-        elif opt == "-a":
-            autodiscovery = True
-        elif opt == "-r":
-            args["restart"] = True
-        elif opt == "-b":
-            args["broadcast"] = arg
-        elif opt == "-n":
-            args["proto"] = int(arg)
-        elif opt == "-t":
-            args["timeout"] = int(arg)
-
-    log_handler = logging.StreamHandler()
-    log_handler.setFormatter(logging.Formatter(log_format))
-    logging.getLogger("pp").setLevel(log_level)
-    logging.getLogger("pp").addHandler(log_handler)
-
-    server = _NetworkServer(**args)
-    if autodiscovery:
-        server.broadcast()
-    return server    
-    
-
-if __name__ == "__main__":
-    server = create_network_server(sys.argv[1:])
-    server.listen()
-    #have to destroy it here explicitly otherwise an exception
-    #comes out in Python 2.4
-    del server
-    
-
-# Parallel Python Software: http://www.parallelpython.com

pp/pptransport.py

-# Parallel Python Software: http://www.parallelpython.com
-# Copyright (c) 2005-2011, Vitalii Vanovschi
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#    * Redistributions of source code must retain the above copyright notice,
-#      this list of conditions and the following disclaimer.
-#    * Redistributions in binary form must reproduce the above copyright
-#      notice, this list of conditions and the following disclaimer in the
-#      documentation and/or other materials provided with the distribution.
-#    * Neither the name of the author nor the names of its contributors
-#      may be used to endorse or promote products derived from this software
-#      without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
-# THE POSSIBILITY OF SUCH DAMAGE.
-"""
-Parallel Python Software, PP Transport
-
-http://www.parallelpython.com - updates, documentation, examples and support
-forums
-"""
-import os
-import struct
-import socket
-import logging
-
-copyright = "Copyright (c) 2005-2011 Vitalii Vanovschi. All rights reserved"
-version = "1.6.1"
-
-TRANSPORT_SOCKET_TIMEOUT = 60
-
-# compartibility with Python 2.6
-try:
-    import hashlib
-    sha_new = hashlib.sha1
-    md5_new = hashlib.md5
-except ImportError:
-    import sha
-    import md5
-    sha_new = sha.new
-    md5_new = md5.new
-
-
-class Transport(object):
-
-    def send(self, msg):
-        raise NotImplemented("abstact function 'send' must be implemented "\
-                "in a subclass")
-
-    def receive(self, preprocess=None):
-        raise NotImplemented("abstact function 'receive' must be implemented "\
-                "in a subclass")
-
-    def authenticate(self, secret):
-        remote_version = self.receive()
-        if version != remote_version:
-            logging.error("PP version mismatch (local: pp-%s, remote: pp-%s)"
-                % (version, remote_version))
-            logging.error("Please install the same version of PP on all nodes")
-            return False
-        srandom = self.receive()
-        answer = sha_new(srandom+secret).hexdigest()
-        self.send(answer)
-        response = self.receive()
-        if response == "OK":
-            return True
-        else:
-            return False
-
-    def close(self):
-        pass
-
-    def _connect(self, host, port):
-        pass
-
-
-class CTransport(Transport):
-    """Cached transport
-    """
-    rcache = {}
-
-    def hash(self, msg):
-        return md5_new(msg).hexdigest()
-
-    def csend(self, msg):
-        hash1 = self.hash(msg)
-        if hash1 in self.scache:
-            self.send("H" + hash1)
-        else:
-            self.send("N" + msg)
-            self.scache[hash1] = True
-
-    def creceive(self, preprocess=None):
-        msg = self.receive()
-        if msg[0] == 'H':
-            hash1 = msg[1:]
-        else:
-            msg = msg[1:]
-            hash1 = self.hash(msg)
-            self.rcache[hash1] = map(preprocess, (msg, ))[0]
-        return self.rcache[hash1]
-
-
-class PipeTransport(Transport):
-
-    def __init__(self, r, w):
-        self.scache = {}
-        self.exiting = False
-        if isinstance(r, file) and isinstance(w, file):
-            self.r = r
-            self.w = w