Newer
Older
XinYang_IOS / Pods / OpenVPNAdapter / Sources / OpenVPN3 / openvpn / ws / httpcliset.hpp
@zhangfeng zhangfeng on 7 Dec 2023 21 KB 1.8.0
//    OpenVPN -- An application to securely tunnel IP networks
//               over a single port, with support for SSL/TLS-based
//               session authentication and key exchange,
//               packet encryption, packet authentication, and
//               packet compression.
//
//    Copyright (C) 2012-2020 OpenVPN Inc.
//
//    This program is free software: you can redistribute it and/or modify
//    it under the terms of the GNU Affero General Public License Version 3
//    as published by the Free Software Foundation.
//
//    This program is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//    GNU Affero General Public License for more details.
//
//    You should have received a copy of the GNU Affero General Public License
//    along with this program in the COPYING file.
//    If not, see <http://www.gnu.org/licenses/>.

#pragma once

#include <string>
#include <sstream>
#include <ostream>
#include <vector>
#include <memory>
#include <utility>
#include <algorithm>
#include <limits>
#include <map>

#include <openvpn/asio/asiostop.hpp>
#include <openvpn/common/cleanup.hpp>
#include <openvpn/common/function.hpp>
#include <openvpn/common/complog.hpp>
#include <openvpn/time/asiotimersafe.hpp>
#include <openvpn/buffer/buflist.hpp>
#include <openvpn/buffer/bufstr.hpp>
#include <openvpn/buffer/zlib.hpp>
#include <openvpn/random/randapi.hpp>
#include <openvpn/http/urlparse.hpp>
#include <openvpn/http/headredact.hpp>
#include <openvpn/ws/httpcli.hpp>
#include <openvpn/ws/resolver_results.hpp>

#ifndef OPENVPN_HTTP_CLISET_RC
#define OPENVPN_HTTP_CLISET_RC RC<thread_unsafe_refcount>
#endif

namespace openvpn {
  namespace WS {

    class ClientSet : public RC<thread_unsafe_refcount>
    {
      class Client;

    public:
      typedef RCPtr<ClientSet> Ptr;

      typedef WS::Client::HTTPDelegate<Client> HTTPDelegate;

      struct SyncPersistState
      {
	std::unique_ptr<openvpn_io::io_context> io_context;
      };

      class HTTPStateContainer
      {
      public:
	void create_container()
	{
	  if (!c)
	    c.reset(new Container);
	}

	void stop(const bool shutdown)
	{
	  if (c && c->http)
	    c->http->stop(shutdown);
	}

	void reset()
	{
	  if (c)
	    c->http.reset();
	}

	void abort(const std::string& message)
	{
	  if (c && c->http)
	    c->http->abort(message);
	}

	bool alive() const
	{
	  return c && c->http && c->http->is_alive();
	}

	bool alive(const std::string& host) const
	{
	  return alive() && c->http->host_match(host);
	}

#ifdef ASIO_HAS_LOCAL_SOCKETS
	int unix_fd()
	{
	  if (!c || !c->http)
	    return -1;
	  AsioPolySock::Unix* us = dynamic_cast<AsioPolySock::Unix*>(c->http->get_socket());
	  if (!us)
	    return -1;
	  return us->socket.native_handle();
	}
#endif

      private:
	friend Client;

	struct Container : public RC<thread_unsafe_refcount>
	{
	  typedef RCPtr<Container> Ptr;
	  HTTPDelegate::Ptr http;
	};

	void attach(Client* parent)
	{
	  c->http->attach(parent);
	}

	void close(const bool keepalive, const bool shutdown)
	{
	  if (c && c->http)
	    {
	      c->http->detach(keepalive, shutdown);
	      if (!keepalive)
		stop(shutdown);
	    }
	}

	void construct(openvpn_io::io_context& io_context,
		       const WS::Client::Config::Ptr config)
	{
	  create_container();
	  close(false, false);
	  c->http.reset(new HTTPDelegate(io_context, std::move(config), nullptr));
	}

	void start_request()
	{
	  c->http->start_request();
	}

	Container::Ptr c;
      };

      class TransactionSet;
      struct Transaction;

      struct ErrorRecovery : public RC<thread_unsafe_refcount>
      {
	typedef RCPtr<ErrorRecovery> Ptr;
	virtual void retry(TransactionSet& ts, Transaction& t) = 0;
      };

      struct Transaction
      {
	static constexpr int UNDEF = -1;

	// input
	WS::Client::Request req;
	WS::Client::ContentInfo ci;
	BufferList content_out;
	bool accept_gzip_in = false;
	bool randomize_resolver_results = false;
	IP::Addr::Version ip_version_preference = IP::Addr::UNSPEC;

	// output
	int status = UNDEF;
	std::string description;
	HTTP::Reply reply;
	BufferList content_in;

	std::string url(const TransactionSet& ts) const
	{
	  URL::Parse u = URL::Parse::from_components(bool(ts.http_config->ssl_factory),
						     ts.host.host,
						     ts.host.port,
						     req.uri);
	  return u.to_string();
	}

	std::string title(const TransactionSet& ts) const
	{
	  return req.method + ' ' + url(ts);
	}

	void compress_content_out(const unsigned int min_size=64,
				  const bool verbose=false)
	{
#ifdef HAVE_ZLIB
	  if (content_out.join_size() >= min_size)
	    {
	      BufferPtr co = content_out.join();
	      content_out.clear();
	      const size_t orig_size = co->size();
	      co = ZLib::compress_gzip(co, 0, 0, 1);
	      if (verbose)
		log_compress("HTTPClientSet: GZIP COMPRESS", orig_size, co->size());
	      ci.length = co->size();
	      content_out.push_back(std::move(co));
	      ci.content_encoding = "gzip";
	    }
#endif
	}

	// Return true if and only if HTTP transaction
	// succeeded AND HTTP status code was in the
	// successful range of 2xx.
	bool http_status_success() const
	{
	  return comm_status_success() && request_status_success();
	}

	// Return true if communication succeeded
	bool comm_status_success() const
	{
	  return status == WS::Client::Status::E_SUCCESS;
	}

	bool comm_status_timeout() const
	{
	  return status == WS::Client::Status::E_CONNECT_TIMEOUT;
	}

	// Return true if request succeeded, i.e. HTTP status
	// code was in the successful range of 2xx.
	bool request_status_success() const
	{
	  return reply.status_code >= 200 && reply.status_code < 300;
	}

	bool is_redirect() const
	{
	  return reply.status_code >= 300 && reply.status_code < 400 && reply.headers.get("location");
	}

	std::string get_redirect_location() const
	{
	  return reply.headers.get_value_trim("location");
	}

	void dump(std::ostream& os, const TransactionSet& ts) const
	{
	  os << "----- " << format_status(ts) << " -----\n";
	  BufferPtr in = content_in.join();
	  const std::string s = buf_to_string(*in);
	  os << s;
	  if (!s.empty() && !string::ends_with_newline(s))
	    os << '\n';
	}

	std::string content_in_string() const
	{
	  BufferPtr in = content_in.join();
	  return buf_to_string(*in);
	}

	BufferPtr content_in_buffer() const
	{
	  return content_in.join();
	}

	std::string format_status(const TransactionSet& ts) const
	{
	  std::string ret;
	  ret.reserve(256);
	  ret += title(ts);
	  ret += " : ";
	  ret += format_status();
	  return ret;
	}

	std::string format_status() const
	{
	  std::string ret;
	  ret.reserve(64);
	  if (status == WS::Client::Status::E_SUCCESS)
	    {
	      ret += openvpn::to_string(reply.status_code);
	      ret += ' ';
	      ret += reply.status_text;
	    }
	  else
	    {
	      ret += WS::Client::Status::error_str(status);
	      ret += ' ';
	      ret += description;
	    }
	  return ret;
	}
      };

      class TransactionSet : public RC<thread_unsafe_refcount>
      {
      private:
	// optionally contains an openvpn_io::io_context for
	// persistent synchronous operations
	friend class ClientSet;
	SyncPersistState sps;

      public:
	typedef RCPtr<TransactionSet> Ptr;
	typedef std::vector<std::unique_ptr<Transaction>> Vector;

	// Enable preserve_http_state to reuse HTTP session
	// across multiple completions.
	// hsc.stop() can be called to explicitly
	// close persistent state.
	bool preserve_http_state = false;
	HTTPStateContainer hsc;

	// configuration
	WS::Client::Config::Ptr http_config;
	WS::Client::Host host;
	unsigned int max_retries = 1;
	int debug_level = 2;
	Time::Duration delayed_start;
	Time::Duration retry_duration = Time::Duration::seconds(5);

	// request/response vector
	Vector transactions;

	// true if all requests were successful
	bool status = false;

	// completion method
	Function<void(TransactionSet& ts)> completion;

	// post-connect method, useful to validate server
	// on local sockets
	Function<void(TransactionSet& ts, AsioPolySock::Base& sock)> post_connect;

	// error recovery method, called before we retry a request
	// after an error to possibly modify connection parameters
	// such as the hostname.
	ErrorRecovery::Ptr error_recovery;

	void assign_http_state(HTTPStateContainer& http_state)
	{
	  http_state.create_container();
	  hsc = http_state;
	  preserve_http_state = true;
	}

	bool alive() const
	{
	  return hsc.alive(host.host);
	}

	WS::ClientSet::Transaction& first_transaction()
	{
	  if (transactions.empty())
	    throw Exception("TransactionSet::first_transaction: transaction list is empty");
	  return *transactions[0];
	}

	// Return true if and only if all HTTP transactions
	// succeeded AND each HTTP status code was in the
	// successful range of 2xx.
	bool http_status_success() const
	{
	  if (!status)
	    return false;
	  if (transactions.empty())
	    return false;
	  for (auto &t : transactions)
	    {
	      if (!t->http_status_success())
		return false;
	    }
	  return true;
	}

	void reset_callbacks()
	{
	  completion.reset();
	  post_connect.reset();
	}

	void stop(const bool shutdown)
	{
	  reset_callbacks();
	  hsc.stop(shutdown);
	}

	void dump(std::ostream& os, const bool content_only=false) const
	{
	  for (auto &t : transactions)
	    {
	      if (content_only)
		os << t->content_in_string();
	      else
		t->dump(os, *this);
	    }
	}
      };

      class HostRetry : public std::vector<std::string>,
			public ErrorRecovery
      {
      public:
	typedef RCPtr<HostRetry> Ptr;

	HostRetry() {}

	template<typename T, typename... Args>
	HostRetry(T first, Args... args)
	{
	  reserve(1 + sizeof...(args));
	  from_list(first, args...);
	}

	void shuffle(RandomAPI& prng)
	{
	  std::shuffle(begin(), end(), prng);
	  index = 0;
	}

	std::string next_host()
	{
	  if (empty())
	    throw Exception("HostRetry: empty host list");
	  if (index >= size())
	    index = 0;
	  return (*this)[index++];
	}

	virtual void retry(TransactionSet& ts, Transaction& t) override
	{
	  ts.host.host = next_host();
	}

      private:
	void from_list(std::string arg)
	{
	  push_back(std::move(arg));
	}

	void from_list(const char *arg)
	{
	  push_back(std::string(arg));
	}

	template<typename T, typename... Args>
	void from_list(T first, Args... args)
	{
	  from_list(first);
	  from_list(args...);
	}

	size_t index = 0;
      };

      ClientSet(openvpn_io::io_context& io_context_arg)
	: io_context(io_context_arg),
	  halt(false),
	  next_id(0)
      {
      }

      void set_random(RandomAPI::Ptr prng_arg)
      {
	prng = std::move(prng_arg);
      }

      void new_request(const TransactionSet::Ptr ts)
      {
	const client_t id = new_client_id();
	Client::Ptr cli = new Client(this, std::move(ts), id);
	clients[id] = cli;
	cli->start();
      }

      static void new_request_synchronous(const TransactionSet::Ptr ts,
					  Stop* stop=nullptr,
					  RandomAPI* prng=nullptr,
					  const bool sps=false)
      {
	std::unique_ptr<openvpn_io::io_context> io_context;
	auto clean = Cleanup([&]() {
	    // ensure that TransactionSet reference to socket
	    // is reset before method returns (unless sps is true
	    // in which case we should retain it).
	    if (!sps)
	      ts->hsc.reset();
	  });
	ts->preserve_http_state = sps;
	if (sps)
	  io_context = std::move(ts->sps.io_context);
	if (!io_context)
	  io_context.reset(new openvpn_io::io_context(1));
	ClientSet::Ptr cs;
	try {
	  AsioStopScope scope(*io_context, stop, [&]() {
	      if (cs)
		cs->abort("stop message received");
	    });
	  cs.reset(new ClientSet(*io_context));
	  if (prng)
	    cs->set_random(RandomAPI::Ptr(prng));
	  cs->new_request(ts);
	  if (sps)
	    {
	      while (cs->clients.size())
		io_context->run_one();
	    }
	  else
	    io_context->run();
	}
	catch (...)
	  {
	    if (cs)
	      cs->stop();        // on exception, stop ClientSet
	    io_context->poll();   // execute completion handlers
	    throw;
	  }
	if (sps)
	  ts->sps.io_context = std::move(io_context);
      }

      static void run_synchronous(Function<void(ClientSet::Ptr)> job,
				  Stop* stop=nullptr,
				  RandomAPI* prng=nullptr)
      {
	std::unique_ptr<openvpn_io::io_context> io_context(new openvpn_io::io_context(1));
	ClientSet::Ptr cs;
	try {
	  AsioStopScope scope(*io_context, stop, [&]() {
	      if (cs)
		cs->abort("stop message received");
	    });
	  cs.reset(new ClientSet(*io_context));
	  cs->set_random(prng);
	  job(cs);
	  io_context->run();
	}
	catch (...)
	  {
	    if (cs)
	      cs->stop();         // on exception, stop ClientSet
	    io_context->poll();   // execute completion handlers
	    throw;
	  }
      }

      void stop()
      {
	if (halt)
	  return;
	halt = true;
	for (auto &c : clients)
	  {
	    c.second->stop(false, false);
	    c.second->reset_callbacks();
	  }
      }

      void abort(const std::string& message)
      {
	for (auto &c : clients)
	  c.second->abort(message);
      }

    private:
      typedef unsigned int client_t;

      class Client : public OPENVPN_HTTP_CLISET_RC
      {
      public:
	typedef RCPtr<Client> Ptr;
	friend HTTPDelegate;

	Client(ClientSet* parent_arg,
	       const TransactionSet::Ptr ts_arg,
	       client_t client_id_arg)
	  : parent(parent_arg),
	    ts(std::move(ts_arg)),
	    n_retries(0),
	    buf_tailroom((*ts->http_config->frame)[Frame::READ_HTTP].tailroom()),
	    reconnect_timer(parent_arg->io_context),
	    client_id(client_id_arg),
	    halt(false),
	    started(false)
	{
	}

	bool start()
	{
	  if (started || halt)
	    return false;
	  started = true;
	  ts->status = false;
	  ts_iter = ts->transactions.begin();
	  if (ts->delayed_start.defined())
	    {
	      retry_duration = ts->delayed_start;
	      reconnect_schedule(false);
	    }
	  else
	    {
	      next_request(false);
	    }
	  return true;
	}

	void stop(const bool keepalive, const bool shutdown)
	{
	  if (halt)
	    return;
	  halt = true;
	  reconnect_timer.cancel();
	  close_http(keepalive, shutdown);
	}

	void reset_callbacks()
	{
	  if (ts)
	    ts->reset_callbacks(); // break refcount cycles in callback closures
	}

	void abort(const std::string& message)
	{
	  if (ts)
	    ts->hsc.abort(message);
	}

      private:
	void close_http(const bool keepalive, const bool shutdown)
	{
	  ts->hsc.close(keepalive, shutdown);
	}

	void remove_self_from_map()
	{
	  openvpn_io::post(parent->io_context, [id=client_id, parent=ClientSet::Ptr(parent)]()
		     {
		       parent->remove_client_id(id);
		     });
	}

	bool check_if_done()
	{
	  if (ts_iter == ts->transactions.end())
	    {
	      done(true, true);
	      return true;
	    }
	  else
	    return false;
	}

	void done(const bool status, const bool shutdown)
	{
	  {
	    auto clean = Cleanup([this, shutdown]() {
		if (!ts->preserve_http_state)
		  ts->hsc.stop(shutdown);
	      });
	    stop(status, shutdown);
	    remove_self_from_map();
	    ts->status = status;
	  }
	  if (ts->completion)
	    ts->completion(*ts);
	}

	Transaction& trans()
	{
	  return **ts_iter;
	}

	const Transaction& trans() const
	{
	  return **ts_iter;
	}

	std::string title() const
	{
	  return trans().title(*ts);
	}

	void next_request(const bool error_retry)
	{
	  if (check_if_done())
	    return;

	  retry_duration = ts->retry_duration;

	  // get current transaction
	  Transaction& t = trans();

	  // set up content out iterator
	  out_iter = t.content_out.begin();

	  // init buffer to receive content in
	  t.content_in.clear();

	  // if this is an error retry, allow user-defined recovery
	  if (error_retry && ts->error_recovery)
	    ts->error_recovery->retry(*ts, t);

	  // init and attach HTTPStateContainer
	  if (!ts->alive())
	    ts->hsc.construct(parent->io_context, ts->http_config);
	  ts->hsc.attach(this);

	  ts->hsc.start_request();
	}

	void reconnect_schedule(const bool error_retry)
	{
	  if (check_if_done())
	    return;
	  reconnect_timer.expires_after(retry_duration);
	  reconnect_timer.async_wait([self=Ptr(this), error_retry](const openvpn_io::error_code& error)
				     {
				       if (!error && !self->halt)
					 self->next_request(error_retry);
				     });
	}

	WS::Client::Host http_host(HTTPDelegate& hd) const
	{
	  return ts->host;
	}

	WS::Client::Request http_request(HTTPDelegate& hd) const
	{
	  return trans().req;
	}

	WS::Client::ContentInfo http_content_info(HTTPDelegate& hd) const
	{
	  const Transaction& t = trans();
	  WS::Client::ContentInfo ci = t.ci;
	  if (!ci.length)
	    ci.length = t.content_out.join_size();
#ifdef HAVE_ZLIB
	  if (t.accept_gzip_in)
	    ci.extra_headers.emplace_back("Accept-Encoding: gzip");
#endif
	  return ci;
	}

	void http_headers_received(HTTPDelegate& hd)
	{
	  if (ts->debug_level >= 2)
	    {
	      std::ostringstream os;
	      os << "----- HEADERS RECEIVED -----\n";
	      os << "    " << title() << '\n';
	      os << "    ENDPOINT: " << hd.remote_endpoint_str() << '\n';
	      os << "    HANDSHAKE_DETAILS: " << hd.ssl_handshake_details() << '\n';
	      os << "    CONTENT-LENGTH: " << hd.content_length() << '\n';
	      os << "    HEADERS: " << string::indent(HTTP::headers_redact(hd.reply().to_string()), 0, 13) << '\n';
	      OPENVPN_LOG_STRING(os.str());
	    }

	  Transaction& t = trans();

	  // save reply
	  t.reply = hd.reply();
	}

	BufferPtr http_content_out(HTTPDelegate& hd)
	{
	  if (out_iter != trans().content_out.end())
	    {
	      BufferPtr ret = new BufferAllocated(**out_iter);
	      ++out_iter;
	      return ret;
	    }
	  else
	    return BufferPtr();
	}

	void http_content_out_needed(HTTPDelegate& hd)
	{
	}

	void http_headers_sent(HTTPDelegate& hd, const Buffer& buf)
	{
	  if (ts->debug_level >= 2)
	    {
	      std::ostringstream os;
	      os << "----- HEADERS SENT -----\n";
	      os << "    " << title() << '\n';
	      os << "    ENDPOINT: " << hd.remote_endpoint_str() << '\n';
	      os << "    HEADERS: " << string::indent(HTTP::headers_redact(buf_to_string(buf)), 0, 13) << '\n';
	      OPENVPN_LOG_STRING(os.str());
	    }
	}

	void http_mutate_resolver_results(HTTPDelegate& hd, openvpn_io::ip::tcp::resolver::results_type& results)
	{
	  // filter results by IP version
	  if (trans().ip_version_preference != IP::Addr::UNSPEC)
	    filter_by_ip_version(results, trans().ip_version_preference);

	  // randomize results
	  if (parent->prng && trans().randomize_resolver_results)
	    randomize_results(results, *parent->prng);
	}

	void http_content_in(HTTPDelegate& hd, BufferAllocated& buf)
	{
	  trans().content_in.put_consume(buf, buf_tailroom);
	}

	void http_done(HTTPDelegate& hd, const int status, const std::string& description)
	{
	  Transaction& t = trans();
	  try {
	    // debug output
	    if (ts->debug_level >= 2)
	      {
		std::ostringstream os;
		os << "----- DONE -----\n";
		os << "    " << title() << '\n';
		os << "    STATUS: " << WS::Client::Status::error_str(status) << '\n';
		os << "    DESCRIPTION: " << description << '\n';
		OPENVPN_LOG_STRING(os.str());
	      }

	    // save status
	    t.status = status;
	    t.description = description;

	    if (status == WS::Client::Status::E_SUCCESS && !http_status_should_retry(hd.reply().status_code))
	      {
		// uncompress if server sent gzip-compressed data
		if (hd.reply().headers.get_value_trim("content-encoding") == "gzip")
		  {
#ifdef HAVE_ZLIB
		    BufferPtr bp = t.content_in.join();
		    t.content_in.clear();
		    bp = ZLib::decompress_gzip(std::move(bp), 0, 0, hd.http_config().max_content_bytes);
		    t.content_in.push_back(std::move(bp));
#else
		    throw Exception("gzip-compressed data returned from server but app not linked with zlib");
#endif
		  }

		// do next request
		++ts_iter;

		// Post a call to next_request() under a fresh stack.
		// Currently we may actually be under tcp_read_handler() and
		// next_request() can trigger destructors.
		post_next_request();
	      }
	    else
	      {
		// failed
		if (++n_retries >= ts->max_retries)
		  {
		    // fail -- no more retries
		    done(false, false);
		  }
		else
		  {
		    // fail -- retry
		    close_http(false, false);

		    // special case -- no delay after TCP EOF on first retry
		    if (status == WS::Client::Status::E_EOF_TCP && n_retries == 1)
		      post_next_request();
		    else
		      reconnect_schedule(true);
		  }
	      }
	  }
	  catch (const std::exception& e)
	    {
	      t.status = WS::Client::Status::E_EXCEPTION;
	      t.description = std::string("http_done: ") + e.what();
	      if (!halt)
		done(false, false);
	    }
	}

	void post_next_request()
	{
	  openvpn_io::post(parent->io_context, [self=Ptr(this)]()
			   {
			     self->next_request(false);
			   });
	}

	void http_keepalive_close(HTTPDelegate& hd, const int status, const std::string& description)
	{
	  // this may be a no-op because ts->hsc.alive() is always tested before construction
	  //OPENVPN_LOG("http_keepalive_close " << WS::Client::Status::error_str(status) << " description=" << description << " http_status=" << std::to_string(hd.reply().status_code) << " http_text=" << hd.reply().status_text);
	}

	void http_post_connect(HTTPDelegate& hd, AsioPolySock::Base& sock)
	{
	  if (ts->post_connect)
	    ts->post_connect(*ts, sock);
	}

	bool http_status_should_retry(const int status) const
	{
	  return status >= 500 && status < 600;
	}

	ClientSet* parent;
	TransactionSet::Ptr ts;
	TransactionSet::Vector::const_iterator ts_iter;
	BufferList content_out;
	BufferList::const_iterator out_iter;
	unsigned int n_retries;
	unsigned int buf_tailroom;
	Time::Duration retry_duration;
	AsioTimerSafe reconnect_timer;
	client_t client_id;
	bool halt;
	bool started;
      };

      void remove_client_id(const client_t client_id)
      {
	auto e = clients.find(client_id);
	if (e != clients.end())
	  clients.erase(e);
      }

      client_t new_client_id()
      {
	while (true)
	  {
	    // find an ID that's not already in use
	    const client_t id = next_id++;
	    if (clients.find(id) == clients.end())
	      return id;
	  }
      }

      openvpn_io::io_context& io_context;
      bool halt;
      client_t next_id;
      RandomAPI::Ptr prng;
      std::map<client_t, Client::Ptr> clients;
    };

  }
}