[ Index ]

PHP Cross Reference of Unnamed Project

title

Body

[close]

/se3-unattended/var/se3/unattended/install/linuxaux/opt/perl/lib/site_perl/5.10.0/i586-linux-thread-multi/DBD/Gofer/Transport/ -> stream.pm (source)

   1  package DBD::Gofer::Transport::stream;
   2  
   3  #   $Id: stream.pm 10905 2008-03-10 22:01:04Z timbo $
   4  #
   5  #   Copyright (c) 2007, Tim Bunce, Ireland
   6  #
   7  #   You may distribute under the terms of either the GNU General Public
   8  #   License or the Artistic License, as specified in the Perl README file.
   9  
  10  use strict;
  11  use warnings;
  12  
  13  use Carp;
  14  
  15  use base qw(DBD::Gofer::Transport::pipeone);
  16  
  17  our $VERSION = sprintf("0.%06d", q$Revision: 10905 $ =~ /(\d+)/o);
  18  
  19  __PACKAGE__->mk_accessors(qw(
  20      go_persist
  21  )); 
  22  
  23  my $persist_all = 5;
  24  my %persist;
  25  
  26  
  27  sub _connection_key {
  28      my ($self) = @_;
  29      return join "~", $self->go_url||"", @{ $self->go_perl || [] };
  30  }
  31  
  32  
  33  sub _connection_get {
  34      my ($self) = @_;
  35  
  36      my $persist = $self->go_persist; # = 0 can force non-caching
  37      $persist = $persist_all if not defined $persist;
  38      my $key = ($persist) ? $self->_connection_key : '';
  39      if ($persist{$key} && $self->_connection_check($persist{$key})) {
  40          $self->trace_msg("reusing persistent connection $key\n",0) if $self->trace >= 1;
  41          return $persist{$key};
  42      }
  43  
  44      my $connection = $self->_make_connection;
  45  
  46      if ($key) {
  47          %persist = () if keys %persist > $persist_all; # XXX quick hack to limit subprocesses
  48          $persist{$key} = $connection;
  49      }
  50  
  51      return $connection;
  52  }
  53  
  54  
  55  sub _connection_check {
  56      my ($self, $connection) = @_;
  57      $connection ||= $self->connection_info;
  58      my $pid = $connection->{pid};
  59      my $ok = (kill 0, $pid);
  60      $self->trace_msg("_connection_check: $ok (pid $$)\n",0) if $self->trace;
  61      return $ok;
  62  }
  63  
  64  
  65  sub _connection_kill {
  66      my ($self) = @_;
  67      my $connection = $self->connection_info;
  68      my ($pid, $wfh, $rfh, $efh) = @{$connection}{qw(pid wfh rfh efh)};
  69      $self->trace_msg("_connection_kill: closing write handle\n",0) if $self->trace;
  70      # closing the write file handle should be enough, generally
  71      close $wfh;
  72      # in future we may want to be more aggressive
  73      #close $rfh; close $efh; kill 15, $pid
  74      # but deleting from the persist cache...
  75      delete $persist{ $self->_connection_key };
  76      # ... and removing the connection_info should suffice
  77      $self->connection_info( undef );
  78      return;
  79  }
  80  
  81  
  82  sub _make_connection {
  83      my ($self) = @_;
  84  
  85      my $go_perl = $self->go_perl;
  86      my $cmd = [ @$go_perl, qw(-MDBI::Gofer::Transport::stream -e run_stdio_hex)];
  87  
  88      #push @$cmd, "DBI_TRACE=2=/tmp/goferstream.log", "sh", "-c";
  89      if (my $url = $self->go_url) {
  90          die "Only 'ssh:user\@host' style url supported by this transport"
  91              unless $url =~ s/^ssh://;
  92          my $ssh = $url;
  93          my $setup_env = join "||", map { "source $_ 2>/dev/null" } qw(.bash_profile .bash_login .profile);
  94          my $setup = $setup_env.q{; exec "$@"};
  95          # don't use $^X on remote system by default as it's possibly wrong
  96          $cmd->[0] = 'perl' if "@$go_perl" eq $^X;
  97          # -x not only 'Disables X11 forwarding' but also makes connections *much* faster
  98          unshift @$cmd, qw(ssh -xq), split(' ', $ssh), qw(bash -c), $setup;
  99      }
 100  
 101      $self->trace_msg("new connection: @$cmd\n",0) if $self->trace;
 102  
 103      # XXX add a handshake - some message from DBI::Gofer::Transport::stream that's
 104      # sent as soon as it starts that we can wait for to report success - and soak up
 105      # and report useful warnings etc from ssh before we get it? Increases latency though.
 106      my $connection = $self->start_pipe_command($cmd);
 107      return $connection;
 108  }
 109  
 110  
 111  sub transmit_request_by_transport {
 112      my ($self, $request) = @_;
 113      my $trace = $self->trace;
 114  
 115      my $connection = $self->connection_info || do {
 116          my $con = $self->_connection_get;
 117          $self->connection_info( $con );
 118          $con;
 119      };
 120  
 121      my $encoded_request = unpack("H*", $self->freeze_request($request));
 122      $encoded_request .= "\015\012";
 123  
 124      my $wfh = $connection->{wfh};
 125      $self->trace_msg(sprintf("transmit_request_by_transport: to fh %s fd%d\n", $wfh, fileno($wfh)),0)
 126          if $trace >= 4;
 127  
 128      # send frozen request
 129      local $\;
 130      print $wfh $encoded_request # autoflush enabled
 131          or do {
 132              # XXX should make new connection and retry
 133              $self->_connection_kill;
 134              die "Error sending request: $!";
 135          };
 136      $self->trace_msg("Request sent: $encoded_request\n",0) if $trace >= 4;
 137  
 138      return;
 139  }
 140  
 141  
 142  sub receive_response_by_transport {
 143      my $self = shift;
 144      my $trace = $self->trace;
 145  
 146      $self->trace_msg("receive_response_by_transport: awaiting response\n",0) if $trace >= 4;
 147      my $connection = $self->connection_info || die;
 148      my ($pid, $rfh, $efh, $cmd) = @{$connection}{qw(pid rfh efh cmd)};
 149  
 150      my $errno = 0;
 151      my $encoded_response;
 152      my $stderr_msg;
 153  
 154      $self->read_response_from_fh( {
 155          $efh => {
 156              error => sub { warn "error reading response stderr: $!"; $errno||=$!; 1 },
 157              eof   => sub { warn "eof reading efh" if $trace >= 4; 1 },
 158              read  => sub { $stderr_msg .= $_; 0 },
 159          },
 160          $rfh => {
 161              error => sub { warn "error reading response: $!"; $errno||=$!; 1 },
 162              eof   => sub { warn "eof reading rfh" if $trace >= 4; 1 },
 163              read  => sub { $encoded_response .= $_; ($encoded_response=~s/\015\012$//) ? 1 : 0 },
 164          },
 165      });
 166  
 167      # if we got no output on stdout at all then the command has
 168      # probably exited, possibly with an error to stderr.
 169      # Turn this situation into a reasonably useful DBI error.
 170      if (not $encoded_response) {
 171          my @msg;
 172          push @msg, "error while reading response: $errno" if $errno;
 173          if ($stderr_msg) {
 174              chomp $stderr_msg;
 175              push @msg, sprintf "error reported by \"%s\" (pid %d%s): %s",
 176                  $self->cmd_as_string,
 177                  $pid, ((kill 0, $pid) ? "" : ", exited"),
 178                  $stderr_msg;
 179          }
 180          die join(", ", "No response received", @msg)."\n";
 181      }
 182  
 183      $self->trace_msg("Response received: $encoded_response\n",0)
 184          if $trace >= 4;
 185  
 186      $self->trace_msg("Gofer stream stderr message: $stderr_msg\n",0)
 187          if $stderr_msg && $trace;
 188  
 189      my $frozen_response = pack("H*", $encoded_response);
 190  
 191      # XXX need to be able to detect and deal with corruption
 192      my $response = $self->thaw_response($frozen_response);
 193  
 194      if ($stderr_msg) {
 195          # add stderr messages as warnings (for PrintWarn)
 196          $response->add_err(0, $stderr_msg, undef, $trace)
 197              # but ignore warning from old version of blib
 198              unless $stderr_msg =~ /^Using .*blib/ && "@$cmd" =~ /-Mblib/;
 199      }   
 200  
 201      return $response;
 202  }
 203  
 204  sub transport_timedout {
 205      my $self = shift;
 206      $self->_connection_kill;
 207      return $self->SUPER::transport_timedout(@_);
 208  }
 209  
 210  1;
 211  
 212  __END__
 213  
 214  =head1 NAME
 215  
 216  DBD::Gofer::Transport::stream - DBD::Gofer transport for stdio streaming
 217  
 218  =head1 SYNOPSIS
 219  
 220    DBI->connect('dbi:Gofer:transport=stream;url=ssh:username@host.example.com;dsn=dbi:...',...)
 221  
 222  or, enable by setting the DBI_AUTOPROXY environment variable:
 223  
 224    export DBI_AUTOPROXY='dbi:Gofer:transport=stream;url=ssh:username@host.example.com'
 225  
 226  =head1 DESCRIPTION
 227  
 228  Without the C<url=> parameter it launches a subprocess as
 229  
 230    perl -MDBI::Gofer::Transport::stream -e run_stdio_hex
 231  
 232  and feeds requests into it and reads responses from it. But that's not very useful.
 233  
 234  With a C<url=ssh:username@host.example.com> parameter it uses ssh to launch the subprocess
 235  on a remote system. That's much more useful!
 236  
 237  It gives you secure remote access to DBI databases on any system you can login to.
 238  Using ssh also gives you optional compression and many other features (see the
 239  ssh manual for how to configure that and many other options via ~/.ssh/config file).
 240  
 241  The actual command invoked is something like:
 242  
 243    ssh -xq ssh:username@host.example.com bash -c $setup $run
 244  
 245  where $run is the command shown above, and $command is
 246  
 247    . .bash_profile 2>/dev/null || . .bash_login 2>/dev/null || . .profile 2>/dev/null; exec "$@"
 248  
 249  which is trying (in a limited and fairly unportable way) to setup the environment
 250  (PATH, PERL5LIB etc) as it would be if you had logged in to that system.
 251  
 252  The "C<perl>" used in the command will default to the value of $^X when not using ssh.
 253  On most systems that's the full path to the perl that's currently executing.
 254  
 255  
 256  =head1 PERSISTENCE
 257  
 258  Currently gofer stream connections persist (remain connected) after all
 259  database handles have been disconnected. This makes later connections in the
 260  same process very fast.
 261  
 262  Currently up to 5 different gofer stream connections (based on url) can
 263  persist.  If more than 5 are in the cache when a new connection is made then
 264  the cache is cleared before adding the new connection. Simple but effective.
 265  
 266  =head1 TO DO
 267  
 268  Document go_perl attribute
 269  
 270  Automatically reconnect (within reason) if there's a transport error.
 271  
 272  Decide on default for persistent connection - on or off? limits? ttl?
 273  
 274  =head1 AUTHOR
 275  
 276  Tim Bunce, L<http://www.tim.bunce.name>
 277  
 278  =head1 LICENCE AND COPYRIGHT
 279  
 280  Copyright (c) 2007, Tim Bunce, Ireland. All rights reserved.
 281  
 282  This module is free software; you can redistribute it and/or
 283  modify it under the same terms as Perl itself. See L<perlartistic>.
 284  
 285  =head1 SEE ALSO
 286  
 287  L<DBD::Gofer::Transport::Base>
 288  
 289  L<DBD::Gofer>
 290  
 291  =cut


Generated: Tue Mar 17 22:47:18 2015 Cross-referenced by PHPXref 0.7.1