forked from mirrors/gecko-dev
		
	
		
			
				
	
	
		
			318 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			318 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # ***** BEGIN LICENSE BLOCK *****
 | |
| # Version: MPL 1.1/GPL 2.0/LGPL 2.1
 | |
| #
 | |
| # The contents of this file are subject to the Mozilla Public License Version
 | |
| # 1.1 (the "License"); you may not use this file except in compliance with
 | |
| # the License. You may obtain a copy of the License at
 | |
| # http://www.mozilla.org/MPL/
 | |
| #
 | |
| # Software distributed under the License is distributed on an "AS IS" basis,
 | |
| # WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 | |
| # for the specific language governing rights and limitations under the
 | |
| # License.
 | |
| #
 | |
| # The Original Code is the Python XPCOM language bindings.
 | |
| #
 | |
| # The Initial Developer of the Original Code is
 | |
| # ActiveState Tool Corp.
 | |
| # Portions created by the Initial Developer are Copyright (C) 2000, 2001
 | |
| # the Initial Developer. All Rights Reserved.
 | |
| #
 | |
| # Contributor(s):
 | |
| #   Mark Hammond <MarkH@ActiveState.com> (original author)
 | |
| #
 | |
| # Alternatively, the contents of this file may be used under the terms of
 | |
| # either the GNU General Public License Version 2 or later (the "GPL"), or
 | |
| # the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
 | |
| # in which case the provisions of the GPL or the LGPL are applicable instead
 | |
| # of those above. If you wish to allow use of your version of this file only
 | |
| # under the terms of either the GPL or the LGPL, and not to allow others to
 | |
| # use your version of this file under the terms of the MPL, indicate your
 | |
| # decision by deleting the provisions above and replace them with the notice
 | |
| # and other provisions required by the GPL or the LGPL. If you do not delete
 | |
| # the provisions above, a recipient may use your version of this file under
 | |
| # the terms of any one of the MPL, the GPL or the LGPL.
 | |
| #
 | |
| # ***** END LICENSE BLOCK *****
 | |
| 
 | |
| """Implementation of Python file objects for Mozilla/xpcom.
 | |
| 
 | |
| Introduction:
 | |
|   This module defines various class that are implemented using 
 | |
|   Mozilla streams.  This allows you to open Mozilla URI's, and
 | |
|   treat them as Python file object.
 | |
|   
 | |
| Example:
 | |
| >>> file = URIFile("chrome://whatever")
 | |
| >>> data = file.read(5) # Pass no arg to read everything.
 | |
| 
 | |
| Known Limitations:
 | |
|  * Not all URL schemes will work from "python.exe" - most notably
 | |
|    "chrome://" and "http://" URLs - this is because a simple initialization of
 | |
|    xpcom by Python does not load up the full set of Mozilla URL handlers.
 | |
|    If you can work out how to correctly initialize the chrome registry and
 | |
|    setup a message queue.
 | |
| 
 | |
| Known Bugs:
 | |
|   * Only read ("r") mode is supported.  Although write ("w") mode doesn't make
 | |
|     sense for HTTP type URLs, it potentially does for file:// etc type ones.
 | |
|   * No concept of text mode vs binary mode.  It appears Mozilla takes care of
 | |
|     this internally (ie, all "text/???" mime types are text, rest are binary)
 | |
| 
 | |
| """
 | |
| 
 | |
| from xpcom import components, Exception, _xpcom
 | |
| import os
 | |
| import threading # for locks.
 | |
| 
 | |
| NS_RDONLY        = 0x01
 | |
| NS_WRONLY        = 0x02
 | |
| NS_RDWR          = 0x04
 | |
| NS_CREATE_FILE   = 0x08
 | |
| NS_APPEND        = 0x10
 | |
| NS_TRUNCATE      = 0x20
 | |
| NS_SYNC          = 0x40
 | |
| NS_EXCL          = 0x80
 | |
| 
 | |
| # A helper function that may come in useful
 | |
| def LocalFileToURL(localFileName):
 | |
|     "Convert a filename to an XPCOM nsIFileURL object."
 | |
|     # Create an nsILocalFile
 | |
|     localFile = components.classes["@mozilla.org/file/local;1"] \
 | |
|           .createInstance(components.interfaces.nsILocalFile)
 | |
|     localFile.initWithPath(localFileName)
 | |
| 
 | |
|     # Use the IO Service to create the interface, then QI for a FileURL
 | |
|     io_service = components.classes["@mozilla.org/network/io-service;1"] \
 | |
|                     .getService(components.interfaces.nsIIOService)
 | |
|     url = io_service.newFileURI(localFile).queryInterface(components.interfaces.nsIFileURL)
 | |
|     # Setting the "file" attribute causes initialization...
 | |
|     url.file = localFile
 | |
|     return url
 | |
| 
 | |
| # A base class for file objects.
 | |
| class _File:
 | |
|     def __init__(self, name_thingy = None, mode="r"):
 | |
|         self.lockob = threading.Lock()
 | |
|         self.inputStream = self.outputStream = None
 | |
|         if name_thingy is not None:
 | |
|                 self.init(name_thingy, mode)
 | |
| 
 | |
|     def __del__(self):
 | |
|         self.close()
 | |
| 
 | |
|     # The Moz file streams are not thread safe.
 | |
|     def _lock(self):
 | |
|         self.lockob.acquire()
 | |
|     def _release(self):
 | |
|         self.lockob.release()
 | |
|     def read(self, n = -1):
 | |
|         assert self.inputStream is not None, "Not setup for read!"
 | |
|         self._lock()
 | |
|         try:
 | |
|             return str(self.inputStream.read(n))
 | |
|         finally:
 | |
|             self._release()
 | |
| 
 | |
|     def readlines(self):
 | |
|         # Not part of the xpcom interface, but handy for direct Python users.
 | |
|         # Not 100% faithful, but near enough for now!
 | |
|         lines = self.read().split("\n")
 | |
|         if len(lines) and len(lines[-1]) == 0:
 | |
|             lines = lines[:-1]
 | |
|         return [s+"\n" for s in lines ]
 | |
| 
 | |
|     def write(self, data):
 | |
|         assert self.outputStream is not None, "Not setup for write!"
 | |
|         self._lock()
 | |
|         try:
 | |
|             self.outputStream.write(data, len(data))
 | |
|         finally:
 | |
|             self._release()
 | |
| 
 | |
|     def close(self):
 | |
|         self._lock()
 | |
|         try:
 | |
|             if self.inputStream is not None:
 | |
|                 self.inputStream.close()
 | |
|                 self.inputStream = None
 | |
|             if self.outputStream is not None:
 | |
|                 self.outputStream.close()
 | |
|                 self.outputStream = None
 | |
|             self.channel = None
 | |
|         finally:
 | |
|             self._release()
 | |
| 
 | |
|     def flush(self):
 | |
|         self._lock()
 | |
|         try:
 | |
|             if self.outputStream is not None: self.outputStream.flush()
 | |
|         finally:
 | |
|             self._release()
 | |
| 
 | |
| # A synchronous "file object" used to open a URI.
 | |
| class URIFile(_File):
 | |
|     def init(self, url, mode="r"):
 | |
|         self.close()
 | |
|         if mode != "r":
 | |
|             raise ValueError, "only 'r' mode supported'"
 | |
|         io_service = components.classes["@mozilla.org/network/io-service;1"] \
 | |
|                         .getService(components.interfaces.nsIIOService)
 | |
|         if hasattr(url, "queryInterface"):
 | |
|             url_ob = url
 | |
|         else:
 | |
|             url_ob = io_service.newURI(url, None, None)
 | |
|         # Mozilla asserts and starts saying "NULL POINTER" if this is wrong!
 | |
|         if not url_ob.scheme:
 | |
|             raise ValueError, ("The URI '%s' is invalid (no scheme)" 
 | |
|                                   % (url_ob.spec,))
 | |
|         self.channel = io_service.newChannelFromURI(url_ob)
 | |
|         self.inputStream = self.channel.open()
 | |
| 
 | |
| # A "file object" implemented using Netscape's native file support.
 | |
| # Based on io.js - http://lxr.mozilla.org/seamonkey/source/xpcom/tests/utils/io.js
 | |
| # You open this file using a local file name (as a string) so it really is pointless - 
 | |
| # you may as well be using a standard Python file object!
 | |
| class LocalFile(_File):
 | |
|     def __init__(self, *args):
 | |
|         self.fileIO = None
 | |
|         _File.__init__(self, *args)
 | |
| 
 | |
|     def init(self, name, mode = "r"):
 | |
|         name = os.path.abspath(name) # Moz libraries under Linux fail with relative paths.
 | |
|         self.close()
 | |
|         file = components.classes['@mozilla.org/file/local;1'].createInstance("nsILocalFile")
 | |
|         file.initWithPath(name)
 | |
|         if mode in ["w","a"]:
 | |
|             self.fileIO = components.classes["@mozilla.org/network/file-output-stream;1"].createInstance("nsIFileOutputStream")
 | |
|             if mode== "w":
 | |
|                 if file.exists():
 | |
|                     file.remove(0)
 | |
|                 moz_mode = NS_CREATE_FILE | NS_WRONLY
 | |
|             elif mode=="a":
 | |
|                 moz_mode = NS_APPEND
 | |
|             else:
 | |
|                 assert 0, "Can't happen!"
 | |
|             self.fileIO.init(file, moz_mode, -1,0)
 | |
|             self.outputStream = self.fileIO
 | |
|         elif mode == "r":
 | |
|             self.fileIO = components.classes["@mozilla.org/network/file-input-stream;1"].createInstance("nsIFileInputStream")
 | |
|             self.fileIO.init(file, NS_RDONLY, -1,0)
 | |
|             self.inputStream = components.classes["@mozilla.org/scriptableinputstream;1"].createInstance("nsIScriptableInputStream")
 | |
|             self.inputStream.init(self.fileIO)
 | |
|         else:
 | |
|             raise ValueError, "Unknown mode"
 | |
| 
 | |
|     def close(self):
 | |
|         if self.fileIO is not None:
 | |
|             self.fileIO.close()
 | |
|             self.fileIO = None
 | |
|         _File.close(self)
 | |
| 
 | |
|     def read(self, n = -1):
 | |
|         return _File.read(self, n)
 | |
| 
 | |
| 
 | |
| ##########################################################
 | |
| ##
 | |
| ## Test Code
 | |
| ##
 | |
| ##########################################################
 | |
| def _DoTestRead(file, expected):
 | |
|     # read in a couple of chunks, just to test that our various arg combinations work.
 | |
|     got = file.read(3)
 | |
|     got = got + file.read(300)
 | |
|     got = got + file.read(0)
 | |
|     got = got + file.read()
 | |
|     if got != expected:
 | |
|         raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
 | |
| 
 | |
| def _DoTestBufferRead(file, expected):
 | |
|     # read in a couple of chunks, just to test that our various arg combinations work.
 | |
|     buffer = _xpcom.AllocateBuffer(50)
 | |
|     got = ''
 | |
|     while 1:
 | |
|         # Note - we need to reach into the file object so we
 | |
|         # can get at the native buffer supported function.
 | |
|         num = file.inputStream.read(buffer)
 | |
|         if num == 0:
 | |
|             break
 | |
|         got = got + str(buffer[:num])
 | |
|     if got != expected:
 | |
|         raise RuntimeError, "Reading '%s' failed - got %d bytes, but expected %d bytes" % (file, len(got), len(expected))
 | |
| 
 | |
| def _TestLocalFile():
 | |
|     import tempfile, os
 | |
|     fname = tempfile.mktemp()
 | |
|     data = "Hello from Python"
 | |
|     test_file = LocalFile(fname, "w")
 | |
|     try:
 | |
|         test_file.write(data)
 | |
|         test_file.close()
 | |
|         # Make sure Python can read it OK.
 | |
|         f = open(fname, "r")
 | |
|         assert f.read() == data, "Eeek - Python could not read the data back correctly!"
 | |
|         f.close()
 | |
|         # For the sake of the test, try a re-init.
 | |
|         test_file.init(fname, "r")
 | |
|         got = str(test_file.read())
 | |
|         assert got == data, got
 | |
|         test_file.close()
 | |
|         # Try reading in chunks.
 | |
|         test_file = LocalFile(fname, "r")
 | |
|         got = test_file.read(10) + test_file.read()
 | |
|         assert got == data, got
 | |
|         test_file.close()
 | |
|         # Open the same file again for writing - this should delete the old one.
 | |
|         if not os.path.isfile(fname):
 | |
|             raise RuntimeError, "The file '%s' does not exist, but we are explicitly testing create semantics when it does" % (fname,)
 | |
|         test_file = LocalFile(fname, "w")
 | |
|         test_file.write(data)
 | |
|         test_file.close()
 | |
|         # Make sure Python can read it OK.
 | |
|         f = open(fname, "r")
 | |
|         assert f.read() == data, "Eeek - Python could not read the data back correctly after recreating an existing file!"
 | |
|         f.close()
 | |
| 
 | |
|         # XXX - todo - test "a" mode!
 | |
|     finally:
 | |
|         os.unlink(fname)
 | |
| 
 | |
| def _TestAll():
 | |
|     # A mini test suite.
 | |
|     # Get a test file, and convert it to a file:// URI.
 | |
|     # check what we read is the same as when
 | |
|     # we read this file "normally"
 | |
|     fname = components.__file__
 | |
|     if fname[-1] in "cCoO": # fix .pyc/.pyo
 | |
|             fname = fname[:-1]
 | |
|     expected = open(fname, "rb").read()
 | |
|     # convert the fname to a URI.
 | |
|     url = LocalFileToURL(fname)
 | |
|     # First try passing a URL as a string.
 | |
|     _DoTestRead( URIFile( url.spec), expected)
 | |
|     # Now with a URL object.
 | |
|     _DoTestRead( URIFile( url ), expected)
 | |
| 
 | |
|     _DoTestBufferRead( URIFile( url ), expected)
 | |
| 
 | |
|     # For the sake of testing, do our pointless, demo object!
 | |
|     _DoTestRead( LocalFile(fname), expected )
 | |
| 
 | |
|     # Now do the full test of our pointless, demo object!
 | |
|     _TestLocalFile()
 | |
| 
 | |
| def _TestURI(url):
 | |
|     test_file = URIFile(url)
 | |
|     print "Opened file is", test_file
 | |
|     got = test_file.read()
 | |
|     print "Read %d bytes of data from %r" % (len(got), url)
 | |
|     test_file.close()
 | |
| 
 | |
| if __name__=='__main__':
 | |
|     import sys
 | |
|     if len(sys.argv) < 2:
 | |
|         print "No URL specified on command line - performing self-test"
 | |
|         _TestAll()
 | |
|     else:
 | |
|         _TestURI(sys.argv[1])
 | 
