diff -ruN postgresql-jdbc-8.2-505.src/build.xml postgresql-jdbc-8.2-505.src-copyAPI/build.xml --- postgresql-jdbc-8.2-505.src/build.xml 2006-11-29 05:00:15.000000000 +0100 +++ postgresql-jdbc-8.2-505.src-copyAPI/build.xml 2007-07-03 20:26:57.000000000 +0200 @@ -115,6 +115,7 @@ + diff -ruN postgresql-jdbc-8.2-505.src/org/postgresql/copy/CopyManager.java postgresql-jdbc-8.2-505.src-copyAPI/org/postgresql/copy/CopyManager.java --- postgresql-jdbc-8.2-505.src/org/postgresql/copy/CopyManager.java 1970-01-01 01:00:00.000000000 +0100 +++ postgresql-jdbc-8.2-505.src-copyAPI/org/postgresql/copy/CopyManager.java 2007-07-03 20:17:36.000000000 +0200 @@ -0,0 +1,352 @@ +package org.postgresql.copy; + +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; + +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.util.ArrayList; +import java.util.Vector; + +import org.postgresql.core.BaseConnection; +import org.postgresql.core.PGStream; +import org.postgresql.core.Encoding; +import org.postgresql.core.Query; +import org.postgresql.core.Field; +import org.postgresql.core.Logger; +import org.postgresql.core.Notification; +import org.postgresql.core.ResultCursor; +import org.postgresql.core.ResultHandler; +import org.postgresql.util.GT; +import org.postgresql.util.PSQLState; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLWarning; +import org.postgresql.util.ServerErrorMessage; + +/** + * Implement COPY support in the JDBC driver. This requires a 7.4 server and + * a connection with the V3 protocol. Previous versions could not recover + * from errors and the connection had to be abandoned which was not acceptable. + */ + +public class CopyManager +{ + private static final Logger logger = new Logger(); + + // Helper handler that logs error status. + private static class ErrorLoggingResultHandler implements ResultHandler { + + ErrorLoggingResultHandler() { + } + + public void handleResultRows(Query fromQuery, Field[] fields, Vector tuples, ResultCursor cursor) { + throw new UnsupportedOperationException(); + } + + public void handleCommandStatus(String status, int updateCount, long insertOID) { + throw new UnsupportedOperationException(); + } + + public void handleWarning(SQLWarning warning) { + logger.log("Got warning while copying", warning); + throw new RuntimeException(warning); + } + + public void handleError(SQLException error) { + logger.log("Got error while copying", error); + throw new RuntimeException(error); + } + + public void handleCompletion() throws SQLException { + throw new UnsupportedOperationException(); + } + + } + + private BaseConnection pgConn; + private PGStream pgStream; + + private final ArrayList notifications = new ArrayList(); + private ResultHandler handler = new ErrorLoggingResultHandler(); + + public CopyManager(BaseConnection pgConn, PGStream pgStream) + { + this.pgConn = pgConn; + this.pgStream = pgStream; + } + + /** + * Copy data from the InputStream into the given table using the + * default copy parameters. + */ + public void copyIn(String table, InputStream is) throws SQLException + { + copyInQuery("COPY " + table + " FROM STDIN", is); + } + + /** + * Copy data from the InputStream using the given COPY query. This + * allows specification of additional copy parameters such as the + * delimiter or NULL marker. + */ + public void copyInQuery(String query, InputStream is) throws SQLException + { + + synchronized(pgStream) + { + + sendQuery(query); + try + { + copyResultLoop(is,null); + }catch (IOException ex ) + { + throw new PSQLException(GT.tr( "postgresql.copy.type" ), PSQLState.COMMUNICATION_ERROR, ex ); + + } + } + + } + + /** + * Copy data from the given table to the OutputStream using the + * default copy parameters. + */ + public void copyOut(String table, OutputStream os) throws SQLException + { + copyOutQuery("COPY " + table + " TO STDOUT", os); + } + + /** + * Copy data to the OutputStream using the given COPY query. This + * allows specification of additional copy parameters such as the + * delimiter or NULL marker. + */ + public void copyOutQuery(String query, OutputStream os) throws SQLException + { + synchronized(pgStream) + { + sendQuery(query); + try + { + copyResultLoop(null,os); + } + catch( IOException ex ) + { + throw new PSQLException(GT.tr( "postgresql.copy.type" ), PSQLState.COMMUNICATION_ERROR,ex ); + + } + } + } + + private String receiveCommandStatus() throws IOException + { + //TODO: better handle the msg len + int l_len = pgStream.ReceiveIntegerR(4); + + //read l_len -5 bytes (-4 for l_len and -1 for trailing \0) + String status = pgStream.ReceiveString(l_len - 5); + + //now read and discard the trailing \0 + pgStream.Receive(1); + + return status; + } + private SQLException receiveErrorResponse() throws IOException + { + // it's possible to get more than one error message for a query + // see libpq comments wrt backend closing a connection + // so, append messages to a string buffer and keep processing + // check at the bottom to see if we need to throw an exception + + int elen = pgStream.ReceiveIntegerR(4); + String totalMessage = pgStream.ReceiveString(elen - 4); + ServerErrorMessage errorMsg = new ServerErrorMessage(totalMessage, 0); + + return new PSQLException(errorMsg); + } + + private SQLWarning receiveNoticeResponse() throws IOException + { + int nlen = pgStream.ReceiveIntegerR(4); + ServerErrorMessage warnMsg = new ServerErrorMessage(pgStream.ReceiveString(nlen - 4), 0); + + return new PSQLWarning(warnMsg); + } + + /** + * After the copy query has been go through the possible responses. + * The flag which tells us whether we are doing copy in or out is + * simply where the InputStream or OutputStream is null. + * + * This is much like the loop in QueryExecutor, it could be merged + * into that, but it would require some generalization of its + * current specific tasks. Right now it has its query in m_binds[] + * form and expects to return a ResultSet. A more pluggable network + * layer would be nice so we could support the V2 and V3 protocols + * more cleanly and consider a SPI based layer for an in server + * pl/java. In general I think it's a bad idea for PGStream to + * be seen anywhere outside of the QueryExecutor. + */ + private void copyResultLoop(InputStream is, OutputStream os) throws SQLException, IOException + { + + + + PSQLException topLevelError = null; + boolean queryDone = false; + while (!queryDone) + { + int c = pgStream.ReceiveChar(); + switch (c) + { + case 'A': // Asynch Notify + int pid = pgStream.ReceiveIntegerR(4); + String msg = pgStream.ReceiveString(); + notifications.add(new Notification(msg, pid)); + break; + + case 'C': // Command Complete + receiveCommandStatus(); + break; + case 'E': // Error Message + SQLException error = receiveErrorResponse(); + handler.handleError(error); + + // keep processing + break; + case 'N': // Error Notification + SQLWarning warning = receiveNoticeResponse(); + handler.handleWarning(warning); + break; + case 'G': // CopyInResponse + if (is == null) + throw new PSQLException(GT.tr("postgresql.copy.type"), PSQLState.COMMUNICATION_ERROR ); + receiveCopyInOutResponse(); + sendCopyData(is); + break; + case 'H': // CopyOutResponse + if (os == null) + throw new PSQLException(GT.tr( "postgresql.copy.type" ), PSQLState.COMMUNICATION_ERROR ); + receiveCopyInOutResponse(); + break; + case 'd': // CopyData + if (os == null) + throw new PSQLException( GT.tr( "postgresql.copy.type"), PSQLState.COMMUNICATION_ERROR ); + receiveCopyData(os); + break; + case 'c': // CopyDone + int copyDoneLength = pgStream.ReceiveIntegerR(4); + break; + case 'Z': // ReadyForQuery + int messageLength = pgStream.ReceiveIntegerR(4); + char messageStatus = (char)pgStream.ReceiveChar(); + queryDone = true; + break; + default: + throw new PSQLException(GT.tr( "postgresql.copy.type" ), PSQLState.COMMUNICATION_ERROR); + } + } + + if (topLevelError != null) + throw topLevelError; + + } + + private void sendQuery(String query) throws SQLException + { + Encoding encoding = pgConn.getEncoding(); + try { + pgStream.SendChar('Q'); + byte message[] = encoding.encode(query); + int messageSize = 4 + message.length + 1; + pgStream.SendInteger4( messageSize ); + pgStream.Send(message); + pgStream.SendChar(0); + pgStream.flush(); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + private void sendCopyData(InputStream is) throws SQLException + { + byte buf[] = new byte[8192]; + + int read = 0; + + while (read >= 0) { + try { + read = is.read(buf); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.inputsource", PSQLState.DATA_ERROR, ioe); + } + + if (read > 0) { + try { + pgStream.SendChar('d'); + int messageSize = read+4; + pgStream.SendInteger4(messageSize); + pgStream.Send(buf, read); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + } + + // Send the CopyDone message + try { + pgStream.SendChar('c'); + pgStream.SendInteger4(4); + pgStream.flush(); + } + catch (IOException ioe) + { + throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + /** + * CopyInResponse and CopyOutResponse have the same field + * layouts and we simply discard the results. + */ + private void receiveCopyInOutResponse() throws SQLException + { + try + { + int messageLength = pgStream.ReceiveIntegerR(4); + int copyFormat = pgStream.ReceiveIntegerR(1); + + int numColumns = pgStream.ReceiveIntegerR(2); + for (int i=0; i