@@ -222,7 +222,10 @@ public void testRecoveryAfterCorruptionMetadataLocation() throws Exception {
222222 DataFile dataFile = ((KahaDBPersistenceAdapter ) broker .getPersistenceAdapter ()).getStore ().getJournal ().getFileMap ().get (Integer .valueOf (location .getDataFileId ()));
223223 RecoverableRandomAccessFile randomAccessFile = dataFile .openRandomAccessFile ();
224224 randomAccessFile .seek (location .getOffset ());
225- randomAccessFile .writeInt (Integer .MAX_VALUE );
225+ // Use an invalid size well past the end of the data file to trigger corruption handling without large allocation.
226+ int bogusSize = ((KahaDBPersistenceAdapter ) broker .getPersistenceAdapter ()).getStore ().getJournal ()
227+ .getFileMap ().get (location .getDataFileId ()).getLength () * 10 ;
228+ randomAccessFile .writeInt (bogusSize );
226229 randomAccessFile .getChannel ().force (true );
227230
228231 ((KahaDBPersistenceAdapter ) broker .getPersistenceAdapter ()).getStore ().getJournal ().close ();
@@ -246,15 +249,26 @@ public void append(LogEvent event) {
246249 * throw new EOFException();
247250 */
248251 if (event != null
249- && event .getLevel () == Level .WARN
250- && event .getMessage () != null
251- && event .getMessage ().getFormattedMessage () != null
252- && event .getMessage ().getFormattedMessage ().contains ("Cannot recover message audit" )
252+ && event .getLevel () == Level .WARN
253+ && event .getMessage () != null
254+ && event .getMessage ().getFormattedMessage () != null ) {
255+
256+ final String msg = event .getMessage ().getFormattedMessage ();
257+
258+ boolean auditCorruption =
259+ msg .contains ("Cannot recover message audit" )
253260 && event .getThrown () != null
254- && event .getThrown () instanceof EOFException
255- && event .getThrown ().getMessage () == null ) {
261+ && (event .getThrown () instanceof EOFException
262+ || (event .getThrown () instanceof IOException
263+ && event .getThrown ().getMessage () != null
264+ && event .getThrown ().getMessage ().contains ("Invalid location size" )));
256265
257- trappedExpectedLogMessage .set (true );
266+ boolean dataFileCorruption =
267+ msg .contains ("DataFile:" ) && (msg .contains ("Invalid location size" ) || msg .contains ("larger than expected" ));
268+
269+ if (auditCorruption || dataFileCorruption ) {
270+ trappedExpectedLogMessage .set (true );
271+ }
258272 }
259273 }
260274 };
@@ -272,7 +286,7 @@ public void append(LogEvent event) {
272286 assertEquals ("no missing message" , 50 , broker .getAdminView ().getTotalMessageCount ());
273287 assertEquals ("Drain" , 50 , drainQueue (50 ));
274288 assertEquals ("no problem draining messages" , 0 , broker .getAdminView ().getTotalMessageCount ());
275- assertTrue ("Did replay records on invalid location size " , trappedExpectedLogMessage .get ());
289+ assertTrue ("Did not detect corruption via warning " , trappedExpectedLogMessage .get ());
276290 }
277291
278292 @ Test
@@ -419,7 +433,8 @@ private void corruptBatchEndEof(int id) throws Exception{
419433 int pos = batchPositions .get (batchPositions .size () - 3 );
420434 LOG .info ("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos );
421435 randomAccessFile .seek (pos + Journal .BATCH_CONTROL_RECORD_HEADER .length );
422- randomAccessFile .writeInt (31 * 1024 * 1024 );
436+ // Use a bounded bogus size to trigger the corruption path without exhausting heap on read.
437+ randomAccessFile .writeInt (4 * 1024 * 1024 );
423438 randomAccessFile .writeLong (0l );
424439 randomAccessFile .getChannel ().force (true );
425440 }
0 commit comments