transfer.py 7.24 KB
Newer Older
jtchilders's avatar
jtchilders committed
1 2 3 4
from django.conf import settings
import subprocess
import logging
import os,sys,traceback
Thomas Uram's avatar
Thomas Uram committed
5 6 7 8
try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse
jtchilders's avatar
jtchilders committed
9 10 11 12 13 14 15 16 17

# temporary 
import shutil
import tempfile


logger = logging.getLogger(__name__)

# - GridFTP implementation
18
GRIDFTP_PROTOCOL='gsiftp'
jtchilders's avatar
jtchilders committed
19 20 21 22 23 24 25 26 27 28 29
class GridFTPHandler:
   def pre_stage_hook(self):
      # check to see if proxy already exists
      p = subprocess.Popen([settings.GRIDFTP_PROXY_INFO,'-exists'])
      p.wait()
      if p.returncode is not 0: # valid proxy does not exist so create one
         command = str(settings.GRIDFTP_PROXY_INIT) + ' -verify -debug -bits 2048 -valid 96:00'
         logger.debug('command=' + command)
         try:
            p = subprocess.Popen(command.split(' '),stdout=subprocess.PIPE,stderr=subprocess.PIPE)
            out,err = p.communicate()
Thomas Uram's avatar
Thomas Uram committed
30
         except OSError as e:
jtchilders's avatar
jtchilders committed
31 32
            logger.error('command failed with OSError, exception: ' + str(e))
            raise Exception('Error in pre_stage_hook, OSError raised')
Thomas Uram's avatar
Thomas Uram committed
33
         except ValueError as e:
jtchilders's avatar
jtchilders committed
34 35
            logger.error('command failed with ValueError, exception: ' + str(e))
            raise Exception('Error in pre_stage_hook, ValueError raised')
Thomas Uram's avatar
Thomas Uram committed
36
         except Exception as e:
jtchilders's avatar
jtchilders committed
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
            logger.error('command failed, exception traceback: \n' + traceback.format_exc() )
            raise Exception('Error in stage_in, unknown exception raised')

         if p.returncode:
            logger.error('command failed with return value: ' + str(p.returncode) + '\n  stdout: \n' + out + '\n stderr: \n' + err)
            raise Exception('Error in pre_stage_hook, return value = ' + str(p.returncode) )
         else:
            logger.debug('gridftp initialization completed, stdout:\n' + out + '\n stderr:\n' + err )

   def stage_in( self, source_url, destination_directory ):
      # ensure that source and destination each have a trailing '/'
      if source_url[-1] != '/':
         source_url += '/'
      if destination_directory[-1] != '/':
         destination_directory += '/'
      command = str(settings.GRIDFTP_GLOBUS_URL_COPY) + ' -dbg -nodcau -r %s %s' % (source_url, destination_directory)
      logger.debug('command=' + command )
      try:
         p = subprocess.Popen(command.split(' '),stdout=subprocess.PIPE,stderr=subprocess.PIPE)
         out,err = p.communicate()
Thomas Uram's avatar
Thomas Uram committed
57
      except OSError as e:
jtchilders's avatar
jtchilders committed
58 59
         logger.error('command failed with OSError, exception: ' + str(e))
         raise Exception('Error in stage_in, OSError raised')
Thomas Uram's avatar
Thomas Uram committed
60
      except ValueError as e:
jtchilders's avatar
jtchilders committed
61 62
         logger.error('command failed with ValueError, exception: ' + str(e))
         raise Exception('Error in stage_in, ValueError raised')
Thomas Uram's avatar
Thomas Uram committed
63
      except Exception as e:
jtchilders's avatar
jtchilders committed
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
         logger.error('command failed, exception traceback: \n' + traceback.format_exc() )
         raise Exception('Error in stage_in, unknown exception raised')
      if p.returncode:
         logger.error('command failed with return value: ' + str(p.returncode) + '\n  stdout: \n' + out + '\n stderr: \n' + err)
         raise Exception("Error in stage_in: %s" % str(p.returncode))

   def stage_out( self, source_directory, destination_url ):
      # ensure that source and destination each have a trailing '/'
      if source_directory[-1] != '/':
         source_directory += '/'
      if destination_url[-1] != '/':
         destination_url += '/'
      command = str(settings.GRIDFTP_GLOBUS_URL_COPY) + ' -dbg -nodcau -cd -r %s %s' % (source_directory, destination_url)
      logger.debug('command=' + command)
      try:
         p = subprocess.Popen(command.split(' '),stdout=subprocess.PIPE,stderr=subprocess.PIPE)
         out,err = p.communicate()
Thomas Uram's avatar
Thomas Uram committed
81
      except OSError as e:
jtchilders's avatar
jtchilders committed
82 83
         logger.error('command failed with OSError, exception: ' + str(e))
         raise Exception('Error in stage_out, OSError raised')
Thomas Uram's avatar
Thomas Uram committed
84
      except ValueError as e:
jtchilders's avatar
jtchilders committed
85 86
         logger.error('command failed with ValueError, exception: ' + str(e))
         raise Exception('Error in stage_out, ValueError raised')
Thomas Uram's avatar
Thomas Uram committed
87
      except Exception as e:
jtchilders's avatar
jtchilders committed
88 89 90 91 92 93 94 95 96
         logger.error('command failed, exception traceback: \n' + traceback.format_exc() )
         raise Exception('Error in stage_out, unknown exception raised')

      if p.returncode:
         logger.error('command failed with return value: ' + str(p.returncode) + '\n  stdout: \n' + out + '\n stderr: \n' + err)
         raise Exception("Error in stage_out: %s" % str(p.returncode))


# - Local implementation
97
LOCAL_PROTOCOL='local'
jtchilders's avatar
jtchilders committed
98 99 100 101
class LocalHandler:
   def pre_stage_hook(self):
      pass

jtchilders's avatar
jtchilders committed
102
   def stage_in(self, source_url, destination_directory):
103 104 105
      parts = urlparse.urlparse( source_url )
      command = 'cp -p -r /%s%s* %s' % (parts.netloc,parts.path,destination_directory)
      logger.debug('transfer.stage_in: command=' + command )
106 107
      p = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,
              shell=True)
108 109 110
      stdout,stderr = p.communicate()
      if p.returncode != 0:
         raise Exception("Error in stage_in: %d output:\n" % (p.returncode,stdout))
jtchilders's avatar
jtchilders committed
111 112

   def stage_out( self, source_directory, destination_url ):
113 114 115
      parts = urlparse.urlparse( destination_url )
      command = 'cp -r %s/* /%s/%s' % (source_directory,parts.netloc,parts.path)
      logger.debug( 'transfer.stage_out: command=' + command )
116 117
      p = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, 
              shell=True)
118 119 120
      stdout,stderr = p.communicate()
      if p.returncode != 0:
         raise Exception("Error in stage_out: %d output:\n" % (p.returncode,stdout))
jtchilders's avatar
jtchilders committed
121 122

# - SCP implementation
123
SCP_PROTOCOL='scp'
jtchilders's avatar
jtchilders committed
124 125 126 127 128 129 130
class SCPHandler:
   def pre_stage_hook(self):
      pass

   def stage_in( self, source_url, destination_directory ):
      parts = urlparse.urlparse( source_url )
      command = 'scp -p -r %s:%s %s' % (source_url, destination_directory)
Michael Salim's avatar
Michael Salim committed
131
      logger.debug('transfer.stage_in: command=' + command )
jtchilders's avatar
jtchilders committed
132 133 134 135 136 137 138
      ret = os.system(command)
      if ret:
         raise Exception("Error in stage_in: %d" % ret)

   def stage_out( self, source_directory, destination_url ):
      # ensure that source and destination each have a trailing '/'
      command = 'scp -p -r %s %s' % (source_directory, destination_url)
Michael Salim's avatar
Michael Salim committed
139
      logger.debug('transfer.stage_out: command=' + command)
jtchilders's avatar
jtchilders committed
140 141 142 143 144 145 146 147 148
      ret = os.system(command)
      if ret:
         raise Exception("Error in stage_out: %d" % ret)


# - Generic interface

def get_handler(url):
   handlers = {
149 150 151
     GRIDFTP_PROTOCOL:GridFTPHandler,
     SCP_PROTOCOL    :SCPHandler,
     LOCAL_PROTOCOL  :LocalHandler
jtchilders's avatar
jtchilders committed
152 153 154 155 156 157 158
   }
   proto = url.split(':')[0]
   if proto in handlers.keys():
      handler_class = handlers[proto]
      handler = handler_class()
   else:
      raise Exception('Unknown transfer protocol: %s' % proto)
jtchilders's avatar
jtchilders committed
159
   return handler
jtchilders's avatar
jtchilders committed
160 161 162 163 164 165

# def pre_stage_hook(url):
#     handler = get_handler(url)
#     handler.pre_stage_hook()

def stage_in( source_url, destination_directory ):
jtchilders's avatar
jtchilders committed
166 167 168 169 170 171 172 173 174
   try:
      handler = get_handler(source_url)
      logger.debug('pre-stage hook')
      handler.pre_stage_hook()
      logger.debug('stage-in')
      handler.stage_in( source_url, destination_directory )
   except Exception as e:
      logger.exception('Exception: ' + str(e))
      raise
jtchilders's avatar
jtchilders committed
175 176 177 178 179

def stage_out( source_directory, destination_url ):
   handler = get_handler(destination_url)
   handler.pre_stage_hook()
   handler.stage_out( source_directory, destination_url )